agi.ws.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package agi
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/robertkrimen/otto"
  9. user "imuslab.com/arozos/mod/user"
  10. )
  11. /*
  12. AJGI WebSocket Request Library
  13. This is a library for allowing AGI based connection upgrade to WebSocket
  14. Different from other agi module, this do not use the register lib interface
  15. deal to it special nature.
  16. Author: tobychui
  17. */
  18. var upgrader = websocket.Upgrader{
  19. ReadBufferSize: 1024,
  20. WriteBufferSize: 1024,
  21. CheckOrigin: func(r *http.Request) bool {
  22. return true
  23. },
  24. }
  25. var connections = sync.Map{}
  26. //This is a very special function to check if the connection has been updated or not
  27. //Return upgrade status (true for already upgraded) and connection uuid
  28. func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
  29. if value, err := vm.Get("_websocket_conn_id"); err == nil {
  30. //Exists!
  31. //Check if this is undefined
  32. if value == otto.UndefinedValue() {
  33. //WebSocket connection has closed
  34. return false, "", nil
  35. }
  36. //Connection is still live. Try convert it to string
  37. connId, err := value.ToString()
  38. if err != nil {
  39. return false, "", nil
  40. }
  41. //Load the conenction from SyncMap
  42. if c, ok := connections.Load(connId); ok {
  43. //Return the conncetion object
  44. return true, connId, c.(*websocket.Conn)
  45. }
  46. //Connection object not found (Maybe already closed?)
  47. return false, "", nil
  48. }
  49. return false, "", nil
  50. }
  51. func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
  52. vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
  53. //Check if the user specified any timeout time in seconds
  54. //Default to 5 minutes
  55. timeout, err := call.Argument(0).ToInteger()
  56. if err != nil {
  57. timeout = 300
  58. }
  59. //Check if the connection has already been updated
  60. connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
  61. if connState {
  62. //Already upgraded
  63. return otto.TrueValue()
  64. }
  65. //Not upgraded. Upgrade it now
  66. c, err := upgrader.Upgrade(w, r, nil)
  67. if err != nil {
  68. log.Print("*AGI WebSocket* WebSocket upgrade failed:", err)
  69. return otto.FalseValue()
  70. }
  71. //Generate a UUID for this connection
  72. connUUID := newUUIDv4()
  73. vm.Set("_websocket_conn_id", connUUID)
  74. connections.Store(connUUID, c)
  75. //Record its creation time as opr time
  76. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  77. //Create a go routine to monitor the connection status and disconnect it if timeup
  78. if timeout > 0 {
  79. go func() {
  80. time.Sleep(1 * time.Second)
  81. //Check if the last edit time > timeout time
  82. connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  83. for connStatus {
  84. //For this connection exists
  85. if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
  86. lastOprTime, err := value.ToInteger()
  87. if err != nil {
  88. continue
  89. }
  90. //log.Println(time.Now().Unix(), lastOprTime)
  91. if time.Now().Unix()-lastOprTime > timeout {
  92. //Timeout! Kill this socket
  93. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
  94. time.Sleep(300)
  95. conn.Close()
  96. //Clean up the connection in sync map and vm
  97. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  98. connections.Delete(connID)
  99. log.Println("*AGI WebSocket* Closing connection due to timeout")
  100. break
  101. }
  102. }
  103. time.Sleep(1 * time.Second)
  104. connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
  105. }
  106. }()
  107. }
  108. return otto.TrueValue()
  109. })
  110. vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
  111. //Get the content to send
  112. content, err := call.Argument(0).ToString()
  113. if err != nil {
  114. g.raiseError(err)
  115. return otto.FalseValue()
  116. }
  117. //Send it
  118. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  119. if !connState {
  120. //Already upgraded
  121. //log.Println("*AGI WebSocket* Connection id not found in VM")
  122. return otto.FalseValue()
  123. }
  124. err = conn.WriteMessage(1, []byte(content))
  125. if err != nil {
  126. //Client connection could have been closed. Close the connection
  127. conn.Close()
  128. //Clean up the connection in sync map and vm
  129. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  130. connections.Delete(connID)
  131. return otto.FalseValue()
  132. }
  133. //Write succeed
  134. //Update last opr time
  135. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  136. return otto.TrueValue()
  137. })
  138. vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
  139. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  140. if connState == true {
  141. _, message, err := conn.ReadMessage()
  142. if err != nil {
  143. //Client connection could have been closed. Close the connection
  144. conn.Close()
  145. //Clean up the connection in sync map and vm
  146. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  147. connections.Delete(connID)
  148. log.Println("*AGI WebSocket* Trying to read from a closed socket")
  149. return otto.FalseValue()
  150. }
  151. //Update last opr time
  152. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  153. //Parse the incoming message
  154. incomingString, err := otto.ToValue(string(message))
  155. if err != nil {
  156. log.Println(err)
  157. //Unable to parse to JavaScript. Something out of the scope of otto?
  158. return otto.NullValue()
  159. }
  160. //Return the incoming string to the AGI script
  161. return incomingString
  162. } else {
  163. //WebSocket not exists
  164. //log.Println("*AGI WebSocket* Trying to read from a closed socket")
  165. return otto.FalseValue()
  166. }
  167. })
  168. vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
  169. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  170. if connState == true {
  171. //Close the Websocket gracefully
  172. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  173. time.Sleep(300)
  174. conn.Close()
  175. //Clean up the connection in sync map and vm
  176. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  177. connections.Delete(connID)
  178. //Return true value
  179. return otto.TrueValue()
  180. } else {
  181. //Connection not opened or closed already
  182. return otto.FalseValue()
  183. }
  184. })
  185. //Wrap all the native code function into an imagelib class
  186. vm.Run(`
  187. var websocket = {};
  188. websocket.upgrade = _websocket_upgrade;
  189. websocket.send = _websocket_send;
  190. websocket.read = _websocket_read;
  191. websocket.close = _websocket_close;
  192. `)
  193. }