streamproxy.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package streamproxy
  2. import (
  3. "errors"
  4. "log"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/google/uuid"
  10. "imuslab.com/zoraxy/mod/database"
  11. )
  12. /*
  13. TCP Proxy
  14. Forward port from one port to another
  15. Also accept active connection and passive
  16. connection
  17. */
  18. type ProxyRelayOptions struct {
  19. Name string
  20. ListeningAddr string
  21. ProxyAddr string
  22. Timeout int
  23. UseTCP bool
  24. UseUDP bool
  25. }
  26. type ProxyRelayConfig struct {
  27. UUID string //A UUIDv4 representing this config
  28. Name string //Name of the config
  29. Running bool //Status, read only
  30. AutoStart bool //If the service suppose to started automatically
  31. ListeningAddress string //Listening Address, usually 127.0.0.1:port
  32. ProxyTargetAddr string //Proxy target address
  33. UseTCP bool //Enable TCP proxy
  34. UseUDP bool //Enable UDP proxy
  35. Timeout int //Timeout for connection in sec
  36. tcpStopChan chan bool //Stop channel for TCP listener
  37. udpStopChan chan bool //Stop channel for UDP listener
  38. aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B
  39. bToaAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from B to A
  40. udpClientMap sync.Map //map storing the UDP client-server connections
  41. parent *Manager `json:"-"`
  42. }
  43. type Options struct {
  44. Database *database.Database
  45. DefaultTimeout int
  46. AccessControlHandler func(net.Conn) bool
  47. }
  48. type Manager struct {
  49. //Config and stores
  50. Options *Options
  51. Configs []*ProxyRelayConfig
  52. //Realtime Statistics
  53. Connections int //currently connected connect counts
  54. }
  55. func NewStreamProxy(options *Options) *Manager {
  56. options.Database.NewTable("tcprox")
  57. //Load relay configs from db
  58. previousRules := []*ProxyRelayConfig{}
  59. if options.Database.KeyExists("tcprox", "rules") {
  60. options.Database.Read("tcprox", "rules", &previousRules)
  61. }
  62. //Check if the AccessControlHandler is empty. If yes, set it to always allow access
  63. if options.AccessControlHandler == nil {
  64. options.AccessControlHandler = func(conn net.Conn) bool {
  65. //Always allow access
  66. return true
  67. }
  68. }
  69. //Create a new proxy manager for TCP
  70. thisManager := Manager{
  71. Options: options,
  72. Connections: 0,
  73. }
  74. //Inject manager into the rules
  75. for _, rule := range previousRules {
  76. rule.parent = &thisManager
  77. if rule.Running {
  78. //This was previously running. Start it again
  79. log.Println("[Stream Proxy] Resuming stream proxy rule " + rule.Name)
  80. rule.Start()
  81. }
  82. }
  83. thisManager.Configs = previousRules
  84. return &thisManager
  85. }
  86. func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
  87. //Generate two zero value for atomic int64
  88. aAcc := atomic.Int64{}
  89. bAcc := atomic.Int64{}
  90. aAcc.Store(0)
  91. bAcc.Store(0)
  92. //Generate a new config from options
  93. configUUID := uuid.New().String()
  94. thisConfig := ProxyRelayConfig{
  95. UUID: configUUID,
  96. Name: config.Name,
  97. ListeningAddress: config.ListeningAddr,
  98. ProxyTargetAddr: config.ProxyAddr,
  99. UseTCP: config.UseTCP,
  100. UseUDP: config.UseUDP,
  101. Timeout: config.Timeout,
  102. tcpStopChan: nil,
  103. udpStopChan: nil,
  104. aTobAccumulatedByteTransfer: aAcc,
  105. bToaAccumulatedByteTransfer: bAcc,
  106. udpClientMap: sync.Map{},
  107. parent: m,
  108. }
  109. m.Configs = append(m.Configs, &thisConfig)
  110. m.SaveConfigToDatabase()
  111. return configUUID
  112. }
  113. func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) {
  114. // Find and return the config with the specified UUID
  115. for _, config := range m.Configs {
  116. if config.UUID == configUUID {
  117. return config, nil
  118. }
  119. }
  120. return nil, errors.New("config not found")
  121. }
  122. // Edit the config based on config UUID, leave empty for unchange fields
  123. func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr string, newProxyAddr string, useTCP bool, useUDP bool, newTimeout int) error {
  124. // Find the config with the specified UUID
  125. foundConfig, err := m.GetConfigByUUID(configUUID)
  126. if err != nil {
  127. return err
  128. }
  129. // Validate and update the fields
  130. if newName != "" {
  131. foundConfig.Name = newName
  132. }
  133. if newListeningAddr != "" {
  134. foundConfig.ListeningAddress = newListeningAddr
  135. }
  136. if newProxyAddr != "" {
  137. foundConfig.ProxyTargetAddr = newProxyAddr
  138. }
  139. foundConfig.UseTCP = useTCP
  140. foundConfig.UseUDP = useUDP
  141. if newTimeout != -1 {
  142. if newTimeout < 0 {
  143. return errors.New("invalid timeout value given")
  144. }
  145. foundConfig.Timeout = newTimeout
  146. }
  147. m.SaveConfigToDatabase()
  148. //Check if config is running. If yes, restart it
  149. if foundConfig.IsRunning() {
  150. foundConfig.Restart()
  151. }
  152. return nil
  153. }
  154. func (m *Manager) RemoveConfig(configUUID string) error {
  155. // Find and remove the config with the specified UUID
  156. for i, config := range m.Configs {
  157. if config.UUID == configUUID {
  158. m.Configs = append(m.Configs[:i], m.Configs[i+1:]...)
  159. m.SaveConfigToDatabase()
  160. return nil
  161. }
  162. }
  163. return errors.New("config not found")
  164. }
  165. func (m *Manager) SaveConfigToDatabase() {
  166. m.Options.Database.Write("tcprox", "rules", m.Configs)
  167. }
  168. /*
  169. Config Functions
  170. */
  171. // Start a proxy if stopped
  172. func (c *ProxyRelayConfig) Start() error {
  173. if c.IsRunning() {
  174. c.Running = true
  175. return errors.New("proxy already running")
  176. }
  177. // Create a stopChan to control the loop
  178. tcpStopChan := make(chan bool)
  179. udpStopChan := make(chan bool)
  180. //Start the proxy service
  181. if c.UseUDP {
  182. c.udpStopChan = udpStopChan
  183. go func() {
  184. err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan)
  185. if err != nil {
  186. if !c.UseTCP {
  187. c.Running = false
  188. c.parent.SaveConfigToDatabase()
  189. }
  190. log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
  191. }
  192. }()
  193. }
  194. if c.UseTCP {
  195. c.tcpStopChan = tcpStopChan
  196. go func() {
  197. //Default to transport mode
  198. err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
  199. if err != nil {
  200. c.Running = false
  201. c.parent.SaveConfigToDatabase()
  202. log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
  203. }
  204. }()
  205. }
  206. //Successfully spawned off the proxy routine
  207. c.Running = true
  208. c.parent.SaveConfigToDatabase()
  209. return nil
  210. }
  211. // Return if a proxy config is running
  212. func (c *ProxyRelayConfig) IsRunning() bool {
  213. return c.tcpStopChan != nil || c.udpStopChan != nil
  214. }
  215. // Restart a proxy config
  216. func (c *ProxyRelayConfig) Restart() {
  217. if c.IsRunning() {
  218. c.Stop()
  219. }
  220. time.Sleep(300 * time.Millisecond)
  221. c.Start()
  222. }
  223. // Stop a running proxy if running
  224. func (c *ProxyRelayConfig) Stop() {
  225. log.Println("[STREAM PROXY] Stopping Stream Proxy " + c.Name)
  226. if c.udpStopChan != nil {
  227. log.Println("[STREAM PROXY] Stopping UDP for " + c.Name)
  228. c.udpStopChan <- true
  229. c.udpStopChan = nil
  230. }
  231. if c.tcpStopChan != nil {
  232. log.Println("[STREAM PROXY] Stopping TCP for " + c.Name)
  233. c.tcpStopChan <- true
  234. c.tcpStopChan = nil
  235. }
  236. log.Println("[STREAM PROXY] Stopped Stream Proxy " + c.Name)
  237. c.Running = false
  238. //Update the running status
  239. c.parent.SaveConfigToDatabase()
  240. }