websocket.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package agi
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/robertkrimen/otto"
  9. uuid "github.com/satori/go.uuid"
  10. user "imuslab.com/arozos/mod/user"
  11. )
  12. /*
  13. AJGI WebSocket Request Library
  14. This is a library for allowing AGI based connection upgrade to WebSocket
  15. Different from other agi module, this do not use the register lib interface
  16. deal to it special nature.
  17. Author: tobychui
  18. */
  19. var upgrader = websocket.Upgrader{
  20. ReadBufferSize: 1024,
  21. WriteBufferSize: 1024,
  22. CheckOrigin: func(r *http.Request) bool {
  23. return true
  24. },
  25. }
  26. var connections = sync.Map{}
  27. // This is a very special function to check if the connection has been updated or not
  28. // Return upgrade status (true for already upgraded) and connection uuid
  29. func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
  30. if value, err := vm.Get("_websocket_conn_id"); err == nil {
  31. //Exists!
  32. //Check if this is undefined
  33. if value == otto.UndefinedValue() {
  34. //WebSocket connection has closed
  35. return false, "", nil
  36. }
  37. //Connection is still live. Try convert it to string
  38. connId, err := value.ToString()
  39. if err != nil {
  40. return false, "", nil
  41. }
  42. //Load the conenction from SyncMap
  43. if c, ok := connections.Load(connId); ok {
  44. //Return the conncetion object
  45. return true, connId, c.(*websocket.Conn)
  46. }
  47. //Connection object not found (Maybe already closed?)
  48. return false, "", nil
  49. }
  50. return false, "", nil
  51. }
  52. func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
  53. vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
  54. //Check if the user specified any timeout time in seconds
  55. //Default to 5 minutes
  56. timeout, err := call.Argument(0).ToInteger()
  57. if err != nil {
  58. timeout = 300
  59. }
  60. //Check if the connection has already been updated
  61. connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
  62. if connState {
  63. //Already upgraded
  64. return otto.TrueValue()
  65. }
  66. //Not upgraded. Upgrade it now
  67. c, err := upgrader.Upgrade(w, r, nil)
  68. if err != nil {
  69. log.Print("*AGI WebSocket* WebSocket upgrade failed:", err)
  70. return otto.FalseValue()
  71. }
  72. //Generate a UUID for this connection
  73. connUUID := uuid.NewV4().String()
  74. vm.Set("_websocket_conn_id", connUUID)
  75. connections.Store(connUUID, c)
  76. //Record its creation time as opr time
  77. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  78. //Create a go routine to monitor the connection status and disconnect it if timeup
  79. if timeout > 0 {
  80. go func() {
  81. time.Sleep(1 * time.Second)
  82. //Check if the last edit time > timeout time
  83. connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  84. for connStatus {
  85. //For this connection exists
  86. if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
  87. lastOprTime, err := value.ToInteger()
  88. if err != nil {
  89. continue
  90. }
  91. //log.Println(time.Now().Unix(), lastOprTime)
  92. if time.Now().Unix()-lastOprTime > timeout {
  93. //Timeout! Kill this socket
  94. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
  95. time.Sleep(300)
  96. conn.Close()
  97. //Clean up the connection in sync map and vm
  98. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  99. connections.Delete(connID)
  100. log.Println("*AGI WebSocket* Closing connection due to timeout")
  101. break
  102. }
  103. }
  104. time.Sleep(1 * time.Second)
  105. connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
  106. }
  107. }()
  108. }
  109. return otto.TrueValue()
  110. })
  111. vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
  112. //Get the content to send
  113. content, err := call.Argument(0).ToString()
  114. if err != nil {
  115. g.RaiseError(err)
  116. return otto.FalseValue()
  117. }
  118. //Send it
  119. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  120. if !connState {
  121. //Already upgraded
  122. //log.Println("*AGI WebSocket* Connection id not found in VM")
  123. return otto.FalseValue()
  124. }
  125. err = conn.WriteMessage(1, []byte(content))
  126. if err != nil {
  127. //Client connection could have been closed. Close the connection
  128. conn.Close()
  129. //Clean up the connection in sync map and vm
  130. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  131. connections.Delete(connID)
  132. return otto.FalseValue()
  133. }
  134. //Write succeed
  135. //Update last opr time
  136. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  137. return otto.TrueValue()
  138. })
  139. vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
  140. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  141. if connState == true {
  142. _, message, err := conn.ReadMessage()
  143. if err != nil {
  144. //Client connection could have been closed. Close the connection
  145. conn.Close()
  146. //Clean up the connection in sync map and vm
  147. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  148. connections.Delete(connID)
  149. log.Println("*AGI WebSocket* Trying to read from a closed socket")
  150. return otto.FalseValue()
  151. }
  152. //Update last opr time
  153. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  154. //Parse the incoming message
  155. incomingString, err := otto.ToValue(string(message))
  156. if err != nil {
  157. log.Println(err)
  158. //Unable to parse to JavaScript. Something out of the scope of otto?
  159. return otto.NullValue()
  160. }
  161. //Return the incoming string to the AGI script
  162. return incomingString
  163. } else {
  164. //WebSocket not exists
  165. //log.Println("*AGI WebSocket* Trying to read from a closed socket")
  166. return otto.FalseValue()
  167. }
  168. })
  169. vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
  170. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  171. if connState == true {
  172. //Close the Websocket gracefully
  173. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  174. time.Sleep(300)
  175. conn.Close()
  176. //Clean up the connection in sync map and vm
  177. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  178. connections.Delete(connID)
  179. //Return true value
  180. return otto.TrueValue()
  181. } else {
  182. //Connection not opened or closed already
  183. return otto.FalseValue()
  184. }
  185. })
  186. //Wrap all the native code function into an imagelib class
  187. vm.Run(`
  188. var websocket = {};
  189. websocket.upgrade = _websocket_upgrade;
  190. websocket.send = _websocket_send;
  191. websocket.read = _websocket_read;
  192. websocket.close = _websocket_close;
  193. `)
  194. }