agi.ws.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package agi
  2. import (
  3. "log"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. "github.com/robertkrimen/otto"
  9. user "imuslab.com/arozos/mod/user"
  10. )
  11. /*
  12. AJGI WebSocket Request Library
  13. This is a library for allowing AGI based connection upgrade to WebSocket
  14. Different from other agi module, this do not use the register lib interface
  15. deal to it special nature.
  16. Author: tobychui
  17. */
  18. var upgrader = websocket.Upgrader{}
  19. var connections = sync.Map{}
  20. //This is a very special function to check if the connection has been updated or not
  21. //Return upgrade status (true for already upgraded) and connection uuid
  22. func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
  23. if value, err := vm.Get("_websocket_conn_id"); err == nil {
  24. //Exists!
  25. //Check if this is undefined
  26. if value == otto.UndefinedValue() {
  27. //WebSocket connection has closed
  28. return false, "", nil
  29. }
  30. //Connection is still live. Try convert it to string
  31. connId, err := value.ToString()
  32. if err != nil {
  33. return false, "", nil
  34. }
  35. //Load the conenction from SyncMap
  36. if c, ok := connections.Load(connId); ok {
  37. //Return the conncetion object
  38. return true, connId, c.(*websocket.Conn)
  39. }
  40. //Connection object not found (Maybe already closed?)
  41. return false, "", nil
  42. }
  43. return false, "", nil
  44. }
  45. func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
  46. vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
  47. //Check if the user specified any timeout time in seconds
  48. //Default to 5 minutes
  49. timeout, err := call.Argument(0).ToInteger()
  50. if err != nil {
  51. timeout = 300
  52. }
  53. //Check if the connection has already been updated
  54. connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
  55. if connState {
  56. //Already upgraded
  57. return otto.TrueValue()
  58. }
  59. //Not upgraded. Upgrade it now
  60. c, err := upgrader.Upgrade(w, r, nil)
  61. if err != nil {
  62. log.Print("*AGI WebSocket* WebSocket upgrade failed:", err)
  63. return otto.FalseValue()
  64. }
  65. //Generate a UUID for this connection
  66. connUUID := newUUIDv4()
  67. vm.Set("_websocket_conn_id", connUUID)
  68. connections.Store(connUUID, c)
  69. //Record its creation time as opr time
  70. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  71. //Create a go routine to monitor the connection status and disconnect it if timeup
  72. if timeout > 0 {
  73. go func() {
  74. time.Sleep(1 * time.Second)
  75. //Check if the last edit time > timeout time
  76. connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  77. for connStatus {
  78. //For this connection exists
  79. if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
  80. lastOprTime, err := value.ToInteger()
  81. if err != nil {
  82. continue
  83. }
  84. //log.Println(time.Now().Unix(), lastOprTime)
  85. if time.Now().Unix()-lastOprTime > timeout {
  86. //Timeout! Kill this socket
  87. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
  88. time.Sleep(300)
  89. conn.Close()
  90. //Clean up the connection in sync map and vm
  91. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  92. connections.Delete(connID)
  93. log.Println("*AGI WebSocket* Closing connection due to timeout")
  94. break
  95. }
  96. }
  97. time.Sleep(1 * time.Second)
  98. connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
  99. }
  100. }()
  101. }
  102. return otto.TrueValue()
  103. })
  104. vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
  105. //Get the content to send
  106. content, err := call.Argument(0).ToString()
  107. if err != nil {
  108. g.raiseError(err)
  109. return otto.FalseValue()
  110. }
  111. //Send it
  112. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  113. if !connState {
  114. //Already upgraded
  115. //log.Println("*AGI WebSocket* Connection id not found in VM")
  116. return otto.FalseValue()
  117. }
  118. err = conn.WriteMessage(1, []byte(content))
  119. if err != nil {
  120. //Client connection could have been closed. Close the connection
  121. conn.Close()
  122. //Clean up the connection in sync map and vm
  123. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  124. connections.Delete(connID)
  125. return otto.FalseValue()
  126. }
  127. //Write succeed
  128. //Update last opr time
  129. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  130. return otto.TrueValue()
  131. })
  132. vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
  133. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  134. if connState == true {
  135. _, message, err := conn.ReadMessage()
  136. if err != nil {
  137. //Client connection could have been closed. Close the connection
  138. conn.Close()
  139. //Clean up the connection in sync map and vm
  140. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  141. connections.Delete(connID)
  142. log.Println("*AGI WebSocket* Trying to read from a closed socket")
  143. return otto.FalseValue()
  144. }
  145. //Update last opr time
  146. vm.Set("_websocket_conn_lastopr", time.Now().Unix())
  147. //Parse the incoming message
  148. incomingString, err := otto.ToValue(string(message))
  149. if err != nil {
  150. log.Println(err)
  151. //Unable to parse to JavaScript. Something out of the scope of otto?
  152. return otto.NullValue()
  153. }
  154. //Return the incoming string to the AGI script
  155. return incomingString
  156. } else {
  157. //WebSocket not exists
  158. //log.Println("*AGI WebSocket* Trying to read from a closed socket")
  159. return otto.FalseValue()
  160. }
  161. })
  162. vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
  163. connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
  164. if connState == true {
  165. //Close the Websocket gracefully
  166. conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  167. time.Sleep(300)
  168. conn.Close()
  169. //Clean up the connection in sync map and vm
  170. vm.Set("_websocket_conn_id", otto.UndefinedValue())
  171. connections.Delete(connID)
  172. //Return true value
  173. return otto.TrueValue()
  174. } else {
  175. //Connection not opened or closed already
  176. return otto.FalseValue()
  177. }
  178. })
  179. //Wrap all the native code function into an imagelib class
  180. vm.Run(`
  181. var websocket = {};
  182. websocket.upgrade = _websocket_upgrade;
  183. websocket.send = _websocket_send;
  184. websocket.read = _websocket_read;
  185. websocket.close = _websocket_close;
  186. `)
  187. }