123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- package agi
- import (
- "log"
- "net/http"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "github.com/robertkrimen/otto"
- user "imuslab.com/arozos/mod/user"
- )
- /*
- AJGI WebSocket Request Library
- This is a library for allowing AGI based connection upgrade to WebSocket
- Different from other agi module, this do not use the register lib interface
- deal to it special nature.
- Author: tobychui
- */
- var upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- }
- var connections = sync.Map{}
- //This is a very special function to check if the connection has been updated or not
- //Return upgrade status (true for already upgraded) and connection uuid
- func checkWebSocketConnectionUpgradeStatus(vm *otto.Otto) (bool, string, *websocket.Conn) {
- if value, err := vm.Get("_websocket_conn_id"); err == nil {
- //Exists!
- //Check if this is undefined
- if value == otto.UndefinedValue() {
- //WebSocket connection has closed
- return false, "", nil
- }
- //Connection is still live. Try convert it to string
- connId, err := value.ToString()
- if err != nil {
- return false, "", nil
- }
- //Load the conenction from SyncMap
- if c, ok := connections.Load(connId); ok {
- //Return the conncetion object
- return true, connId, c.(*websocket.Conn)
- }
- //Connection object not found (Maybe already closed?)
- return false, "", nil
- }
- return false, "", nil
- }
- func (g *Gateway) injectWebSocketFunctions(vm *otto.Otto, u *user.User, w http.ResponseWriter, r *http.Request) {
- vm.Set("_websocket_upgrade", func(call otto.FunctionCall) otto.Value {
- //Check if the user specified any timeout time in seconds
- //Default to 5 minutes
- timeout, err := call.Argument(0).ToInteger()
- if err != nil {
- timeout = 300
- }
- //Check if the connection has already been updated
- connState, _, _ := checkWebSocketConnectionUpgradeStatus(vm)
- if connState {
- //Already upgraded
- return otto.TrueValue()
- }
- //Not upgraded. Upgrade it now
- c, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- log.Print("*AGI WebSocket* WebSocket upgrade failed:", err)
- return otto.FalseValue()
- }
- //Generate a UUID for this connection
- connUUID := newUUIDv4()
- vm.Set("_websocket_conn_id", connUUID)
- connections.Store(connUUID, c)
- //Record its creation time as opr time
- vm.Set("_websocket_conn_lastopr", time.Now().Unix())
- //Create a go routine to monitor the connection status and disconnect it if timeup
- if timeout > 0 {
- go func() {
- time.Sleep(1 * time.Second)
- //Check if the last edit time > timeout time
- connStatus, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
- for connStatus {
- //For this connection exists
- if value, err := vm.Get("_websocket_conn_lastopr"); err == nil {
- lastOprTime, err := value.ToInteger()
- if err != nil {
- continue
- }
- //log.Println(time.Now().Unix(), lastOprTime)
- if time.Now().Unix()-lastOprTime > timeout {
- //Timeout! Kill this socket
- conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Timeout"))
- time.Sleep(300)
- conn.Close()
- //Clean up the connection in sync map and vm
- vm.Set("_websocket_conn_id", otto.UndefinedValue())
- connections.Delete(connID)
- log.Println("*AGI WebSocket* Closing connection due to timeout")
- break
- }
- }
- time.Sleep(1 * time.Second)
- connStatus, _, _ = checkWebSocketConnectionUpgradeStatus(vm)
- }
- }()
- }
- return otto.TrueValue()
- })
- vm.Set("_websocket_send", func(call otto.FunctionCall) otto.Value {
- //Get the content to send
- content, err := call.Argument(0).ToString()
- if err != nil {
- g.raiseError(err)
- return otto.FalseValue()
- }
- //Send it
- connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
- if !connState {
- //Already upgraded
- //log.Println("*AGI WebSocket* Connection id not found in VM")
- return otto.FalseValue()
- }
- err = conn.WriteMessage(1, []byte(content))
- if err != nil {
- //Client connection could have been closed. Close the connection
- conn.Close()
- //Clean up the connection in sync map and vm
- vm.Set("_websocket_conn_id", otto.UndefinedValue())
- connections.Delete(connID)
- return otto.FalseValue()
- }
- //Write succeed
- //Update last opr time
- vm.Set("_websocket_conn_lastopr", time.Now().Unix())
- return otto.TrueValue()
- })
- vm.Set("_websocket_read", func(call otto.FunctionCall) otto.Value {
- connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
- if connState == true {
- _, message, err := conn.ReadMessage()
- if err != nil {
- //Client connection could have been closed. Close the connection
- conn.Close()
- //Clean up the connection in sync map and vm
- vm.Set("_websocket_conn_id", otto.UndefinedValue())
- connections.Delete(connID)
- log.Println("*AGI WebSocket* Trying to read from a closed socket")
- return otto.FalseValue()
- }
- //Update last opr time
- vm.Set("_websocket_conn_lastopr", time.Now().Unix())
- //Parse the incoming message
- incomingString, err := otto.ToValue(string(message))
- if err != nil {
- log.Println(err)
- //Unable to parse to JavaScript. Something out of the scope of otto?
- return otto.NullValue()
- }
- //Return the incoming string to the AGI script
- return incomingString
- } else {
- //WebSocket not exists
- //log.Println("*AGI WebSocket* Trying to read from a closed socket")
- return otto.FalseValue()
- }
- })
- vm.Set("_websocket_close", func(call otto.FunctionCall) otto.Value {
- connState, connID, conn := checkWebSocketConnectionUpgradeStatus(vm)
- if connState == true {
- //Close the Websocket gracefully
- conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
- time.Sleep(300)
- conn.Close()
- //Clean up the connection in sync map and vm
- vm.Set("_websocket_conn_id", otto.UndefinedValue())
- connections.Delete(connID)
- //Return true value
- return otto.TrueValue()
- } else {
- //Connection not opened or closed already
- return otto.FalseValue()
- }
- })
- //Wrap all the native code function into an imagelib class
- vm.Run(`
- var websocket = {};
- websocket.upgrade = _websocket_upgrade;
- websocket.send = _websocket_send;
- websocket.read = _websocket_read;
- websocket.close = _websocket_close;
-
- `)
- }
|