streamproxy.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package streamproxy
  2. import (
  3. "errors"
  4. "log"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "github.com/google/uuid"
  9. "imuslab.com/zoraxy/mod/database"
  10. )
  11. /*
  12. TCP Proxy
  13. Forward port from one port to another
  14. Also accept active connection and passive
  15. connection
  16. */
  17. type ProxyRelayOptions struct {
  18. Name string
  19. ListeningAddr string
  20. ProxyAddr string
  21. Timeout int
  22. UseTCP bool
  23. UseUDP bool
  24. }
  25. type ProxyRelayConfig struct {
  26. UUID string //A UUIDv4 representing this config
  27. Name string //Name of the config
  28. Running bool //Status, read only
  29. AutoStart bool //If the service suppose to started automatically
  30. ListeningAddress string //Listening Address, usually 127.0.0.1:port
  31. ProxyTargetAddr string //Proxy target address
  32. UseTCP bool //Enable TCP proxy
  33. UseUDP bool //Enable UDP proxy
  34. Timeout int //Timeout for connection in sec
  35. tcpStopChan chan bool //Stop channel for TCP listener
  36. udpStopChan chan bool //Stop channel for UDP listener
  37. aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B
  38. bToaAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from B to A
  39. udpClientMap sync.Map //map storing the UDP client-server connections
  40. parent *Manager `json:"-"`
  41. }
  42. type Options struct {
  43. Database *database.Database
  44. DefaultTimeout int
  45. AccessControlHandler func(net.Conn) bool
  46. }
  47. type Manager struct {
  48. //Config and stores
  49. Options *Options
  50. Configs []*ProxyRelayConfig
  51. //Realtime Statistics
  52. Connections int //currently connected connect counts
  53. }
  54. func NewStreamProxy(options *Options) *Manager {
  55. options.Database.NewTable("tcprox")
  56. //Load relay configs from db
  57. previousRules := []*ProxyRelayConfig{}
  58. if options.Database.KeyExists("tcprox", "rules") {
  59. options.Database.Read("tcprox", "rules", &previousRules)
  60. }
  61. //Check if the AccessControlHandler is empty. If yes, set it to always allow access
  62. if options.AccessControlHandler == nil {
  63. options.AccessControlHandler = func(conn net.Conn) bool {
  64. //Always allow access
  65. return true
  66. }
  67. }
  68. //Create a new proxy manager for TCP
  69. thisManager := Manager{
  70. Options: options,
  71. Connections: 0,
  72. }
  73. //Inject manager into the rules
  74. for _, rule := range previousRules {
  75. rule.parent = &thisManager
  76. }
  77. thisManager.Configs = previousRules
  78. return &thisManager
  79. }
  80. func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
  81. //Generate two zero value for atomic int64
  82. aAcc := atomic.Int64{}
  83. bAcc := atomic.Int64{}
  84. aAcc.Store(0)
  85. bAcc.Store(0)
  86. //Generate a new config from options
  87. configUUID := uuid.New().String()
  88. thisConfig := ProxyRelayConfig{
  89. UUID: configUUID,
  90. Name: config.Name,
  91. ListeningAddress: config.ListeningAddr,
  92. ProxyTargetAddr: config.ProxyAddr,
  93. UseTCP: config.UseTCP,
  94. UseUDP: config.UseUDP,
  95. Timeout: config.Timeout,
  96. tcpStopChan: nil,
  97. udpStopChan: nil,
  98. aTobAccumulatedByteTransfer: aAcc,
  99. bToaAccumulatedByteTransfer: bAcc,
  100. udpClientMap: sync.Map{},
  101. parent: m,
  102. }
  103. m.Configs = append(m.Configs, &thisConfig)
  104. m.SaveConfigToDatabase()
  105. return configUUID
  106. }
  107. func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) {
  108. // Find and return the config with the specified UUID
  109. for _, config := range m.Configs {
  110. if config.UUID == configUUID {
  111. return config, nil
  112. }
  113. }
  114. return nil, errors.New("config not found")
  115. }
  116. // Edit the config based on config UUID, leave empty for unchange fields
  117. func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr string, newProxyAddr string, useTCP bool, useUDP bool, newTimeout int) error {
  118. // Find the config with the specified UUID
  119. foundConfig, err := m.GetConfigByUUID(configUUID)
  120. if err != nil {
  121. return err
  122. }
  123. // Validate and update the fields
  124. if newName != "" {
  125. foundConfig.Name = newName
  126. }
  127. if newListeningAddr != "" {
  128. foundConfig.ListeningAddress = newListeningAddr
  129. }
  130. if newProxyAddr != "" {
  131. foundConfig.ProxyTargetAddr = newProxyAddr
  132. }
  133. foundConfig.UseTCP = useTCP
  134. foundConfig.UseUDP = useUDP
  135. if newTimeout != -1 {
  136. if newTimeout < 0 {
  137. return errors.New("invalid timeout value given")
  138. }
  139. foundConfig.Timeout = newTimeout
  140. }
  141. m.SaveConfigToDatabase()
  142. return nil
  143. }
  144. func (m *Manager) RemoveConfig(configUUID string) error {
  145. // Find and remove the config with the specified UUID
  146. for i, config := range m.Configs {
  147. if config.UUID == configUUID {
  148. m.Configs = append(m.Configs[:i], m.Configs[i+1:]...)
  149. m.SaveConfigToDatabase()
  150. return nil
  151. }
  152. }
  153. return errors.New("config not found")
  154. }
  155. func (m *Manager) SaveConfigToDatabase() {
  156. m.Options.Database.Write("tcprox", "rules", m.Configs)
  157. }
  158. /*
  159. Config Functions
  160. */
  161. // Start a proxy if stopped
  162. func (c *ProxyRelayConfig) Start() error {
  163. if c.IsRunning() {
  164. c.Running = true
  165. return errors.New("proxy already running")
  166. }
  167. // Create a stopChan to control the loop
  168. tcpStopChan := make(chan bool)
  169. c.tcpStopChan = tcpStopChan
  170. udpStopChan := make(chan bool)
  171. c.udpStopChan = udpStopChan
  172. //Start the proxy service
  173. if c.UseUDP {
  174. go func() {
  175. if !c.UseTCP {
  176. //By default running state shows TCP proxy. If TCP is not in use, UDP is shown instead
  177. c.Running = true
  178. }
  179. err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan)
  180. if err != nil {
  181. if !c.UseTCP {
  182. c.Running = false
  183. }
  184. log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
  185. }
  186. }()
  187. }
  188. if c.UseTCP {
  189. go func() {
  190. //Default to transport mode
  191. c.Running = true
  192. err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
  193. if err != nil {
  194. c.Running = false
  195. log.Println("[TCP] Error starting stream proxy " + c.Name + "(" + c.UUID + "): " + err.Error())
  196. }
  197. }()
  198. }
  199. //Successfully spawned off the proxy routine
  200. return nil
  201. }
  202. // Stop a running proxy if running
  203. func (c *ProxyRelayConfig) IsRunning() bool {
  204. return c.tcpStopChan != nil || c.udpStopChan != nil
  205. }
  206. // Stop a running proxy if running
  207. func (c *ProxyRelayConfig) Stop() {
  208. log.Println("[PROXY] Stopping Stream Proxy " + c.Name)
  209. if c.udpStopChan != nil {
  210. c.udpStopChan <- true
  211. c.udpStopChan = nil
  212. }
  213. if c.tcpStopChan != nil {
  214. c.tcpStopChan <- true
  215. c.tcpStopChan = nil
  216. }
  217. log.Println("[PROXY] Stopped Stream Proxy " + c.Name)
  218. c.Running = false
  219. }