1
0

streamproxy.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package streamproxy
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "net"
  6. "os"
  7. "path/filepath"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/google/uuid"
  12. "imuslab.com/zoraxy/mod/info/logger"
  13. "imuslab.com/zoraxy/mod/utils"
  14. )
  15. /*
  16. TCP Proxy
  17. Forward port from one port to another
  18. Also accept active connection and passive
  19. connection
  20. */
  21. type ProxyRelayOptions struct {
  22. Name string
  23. ListeningAddr string
  24. ProxyAddr string
  25. Timeout int
  26. UseTCP bool
  27. UseUDP bool
  28. }
  29. type ProxyRelayConfig struct {
  30. UUID string //A UUIDv4 representing this config
  31. Name string //Name of the config
  32. Running bool //Status, read only
  33. AutoStart bool //If the service suppose to started automatically
  34. ListeningAddress string //Listening Address, usually 127.0.0.1:port
  35. ProxyTargetAddr string //Proxy target address
  36. UseTCP bool //Enable TCP proxy
  37. UseUDP bool //Enable UDP proxy
  38. Timeout int //Timeout for connection in sec
  39. tcpStopChan chan bool //Stop channel for TCP listener
  40. udpStopChan chan bool //Stop channel for UDP listener
  41. aTobAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from A to B
  42. bToaAccumulatedByteTransfer atomic.Int64 //Accumulated byte transfer from B to A
  43. udpClientMap sync.Map //map storing the UDP client-server connections
  44. parent *Manager `json:"-"`
  45. }
  46. type Options struct {
  47. DefaultTimeout int
  48. AccessControlHandler func(net.Conn) bool
  49. ConfigStore string //Folder to store the config files, will be created if not exists
  50. Logger *logger.Logger //Logger for the stream proxy
  51. }
  52. type Manager struct {
  53. //Config and stores
  54. Options *Options
  55. Configs []*ProxyRelayConfig
  56. //Realtime Statistics
  57. Connections int //currently connected connect counts
  58. }
  59. func NewStreamProxy(options *Options) (*Manager, error) {
  60. if !utils.FileExists(options.ConfigStore) {
  61. err := os.MkdirAll(options.ConfigStore, 0775)
  62. if err != nil {
  63. return nil, err
  64. }
  65. }
  66. //Load relay configs from db
  67. previousRules := []*ProxyRelayConfig{}
  68. streamProxyConfigFiles, err := filepath.Glob(options.ConfigStore + "/*.config")
  69. if err != nil {
  70. return nil, err
  71. }
  72. for _, configFile := range streamProxyConfigFiles {
  73. //Read file into bytes
  74. configBytes, err := os.ReadFile(configFile)
  75. if err != nil {
  76. options.Logger.PrintAndLog("stream-prox", "Read stream proxy config failed", err)
  77. continue
  78. }
  79. thisRelayConfig := &ProxyRelayConfig{}
  80. err = json.Unmarshal(configBytes, thisRelayConfig)
  81. if err != nil {
  82. options.Logger.PrintAndLog("stream-prox", "Unmarshal stream proxy config failed", err)
  83. continue
  84. }
  85. //Append the config to the list
  86. previousRules = append(previousRules, thisRelayConfig)
  87. }
  88. //Check if the AccessControlHandler is empty. If yes, set it to always allow access
  89. if options.AccessControlHandler == nil {
  90. options.AccessControlHandler = func(conn net.Conn) bool {
  91. //Always allow access
  92. return true
  93. }
  94. }
  95. //Create a new proxy manager for TCP
  96. thisManager := Manager{
  97. Options: options,
  98. Connections: 0,
  99. }
  100. //Inject manager into the rules
  101. for _, rule := range previousRules {
  102. rule.parent = &thisManager
  103. if rule.Running {
  104. //This was previously running. Start it again
  105. thisManager.logf("Resuming stream proxy rule "+rule.Name, nil)
  106. rule.Start()
  107. }
  108. }
  109. thisManager.Configs = previousRules
  110. return &thisManager, nil
  111. }
  112. // Wrapper function to log error
  113. func (m *Manager) logf(message string, originalError error) {
  114. if m.Options.Logger == nil {
  115. //Print to fmt
  116. if originalError != nil {
  117. message += ": " + originalError.Error()
  118. }
  119. println(message)
  120. return
  121. }
  122. m.Options.Logger.PrintAndLog("stream-prox", message, originalError)
  123. }
  124. func (m *Manager) NewConfig(config *ProxyRelayOptions) string {
  125. //Generate two zero value for atomic int64
  126. aAcc := atomic.Int64{}
  127. bAcc := atomic.Int64{}
  128. aAcc.Store(0)
  129. bAcc.Store(0)
  130. //Generate a new config from options
  131. configUUID := uuid.New().String()
  132. thisConfig := ProxyRelayConfig{
  133. UUID: configUUID,
  134. Name: config.Name,
  135. ListeningAddress: config.ListeningAddr,
  136. ProxyTargetAddr: config.ProxyAddr,
  137. UseTCP: config.UseTCP,
  138. UseUDP: config.UseUDP,
  139. Timeout: config.Timeout,
  140. tcpStopChan: nil,
  141. udpStopChan: nil,
  142. aTobAccumulatedByteTransfer: aAcc,
  143. bToaAccumulatedByteTransfer: bAcc,
  144. udpClientMap: sync.Map{},
  145. parent: m,
  146. }
  147. m.Configs = append(m.Configs, &thisConfig)
  148. m.SaveConfigToDatabase()
  149. return configUUID
  150. }
  151. func (m *Manager) GetConfigByUUID(configUUID string) (*ProxyRelayConfig, error) {
  152. // Find and return the config with the specified UUID
  153. for _, config := range m.Configs {
  154. if config.UUID == configUUID {
  155. return config, nil
  156. }
  157. }
  158. return nil, errors.New("config not found")
  159. }
  160. // Edit the config based on config UUID, leave empty for unchange fields
  161. func (m *Manager) EditConfig(configUUID string, newName string, newListeningAddr string, newProxyAddr string, useTCP bool, useUDP bool, newTimeout int) error {
  162. // Find the config with the specified UUID
  163. foundConfig, err := m.GetConfigByUUID(configUUID)
  164. if err != nil {
  165. return err
  166. }
  167. // Validate and update the fields
  168. if newName != "" {
  169. foundConfig.Name = newName
  170. }
  171. if newListeningAddr != "" {
  172. foundConfig.ListeningAddress = newListeningAddr
  173. }
  174. if newProxyAddr != "" {
  175. foundConfig.ProxyTargetAddr = newProxyAddr
  176. }
  177. foundConfig.UseTCP = useTCP
  178. foundConfig.UseUDP = useUDP
  179. if newTimeout != -1 {
  180. if newTimeout < 0 {
  181. return errors.New("invalid timeout value given")
  182. }
  183. foundConfig.Timeout = newTimeout
  184. }
  185. m.SaveConfigToDatabase()
  186. //Check if config is running. If yes, restart it
  187. if foundConfig.IsRunning() {
  188. foundConfig.Restart()
  189. }
  190. return nil
  191. }
  192. func (m *Manager) RemoveConfig(configUUID string) error {
  193. //Remove the config from file
  194. err := os.Remove(filepath.Join(m.Options.ConfigStore, configUUID+".config"))
  195. if err != nil {
  196. return err
  197. }
  198. // Find and remove the config with the specified UUID
  199. for i, config := range m.Configs {
  200. if config.UUID == configUUID {
  201. m.Configs = append(m.Configs[:i], m.Configs[i+1:]...)
  202. m.SaveConfigToDatabase()
  203. return nil
  204. }
  205. }
  206. return errors.New("config not found")
  207. }
  208. // Save all configs to ConfigStore folder
  209. func (m *Manager) SaveConfigToDatabase() {
  210. for _, config := range m.Configs {
  211. configBytes, err := json.Marshal(config)
  212. if err != nil {
  213. m.logf("Failed to marshal stream proxy config", err)
  214. continue
  215. }
  216. err = os.WriteFile(m.Options.ConfigStore+"/"+config.UUID+".config", configBytes, 0775)
  217. if err != nil {
  218. m.logf("Failed to save stream proxy config", err)
  219. }
  220. }
  221. }
  222. /*
  223. Config Functions
  224. */
  225. // Start a proxy if stopped
  226. func (c *ProxyRelayConfig) Start() error {
  227. if c.IsRunning() {
  228. c.Running = true
  229. return errors.New("proxy already running")
  230. }
  231. // Create a stopChan to control the loop
  232. tcpStopChan := make(chan bool)
  233. udpStopChan := make(chan bool)
  234. //Start the proxy service
  235. if c.UseUDP {
  236. c.udpStopChan = udpStopChan
  237. go func() {
  238. err := c.ForwardUDP(c.ListeningAddress, c.ProxyTargetAddr, udpStopChan)
  239. if err != nil {
  240. if !c.UseTCP {
  241. c.Running = false
  242. c.udpStopChan = nil
  243. c.parent.SaveConfigToDatabase()
  244. }
  245. c.parent.logf("[proto:udp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
  246. }
  247. }()
  248. }
  249. if c.UseTCP {
  250. c.tcpStopChan = tcpStopChan
  251. go func() {
  252. //Default to transport mode
  253. err := c.Port2host(c.ListeningAddress, c.ProxyTargetAddr, tcpStopChan)
  254. if err != nil {
  255. c.Running = false
  256. c.tcpStopChan = nil
  257. c.parent.SaveConfigToDatabase()
  258. c.parent.logf("[proto:tcp] Error starting stream proxy "+c.Name+"("+c.UUID+")", err)
  259. }
  260. }()
  261. }
  262. //Successfully spawned off the proxy routine
  263. c.Running = true
  264. c.parent.SaveConfigToDatabase()
  265. return nil
  266. }
  267. // Return if a proxy config is running
  268. func (c *ProxyRelayConfig) IsRunning() bool {
  269. return c.tcpStopChan != nil || c.udpStopChan != nil
  270. }
  271. // Restart a proxy config
  272. func (c *ProxyRelayConfig) Restart() {
  273. if c.IsRunning() {
  274. c.Stop()
  275. }
  276. time.Sleep(3000 * time.Millisecond)
  277. c.Start()
  278. }
  279. // Stop a running proxy if running
  280. func (c *ProxyRelayConfig) Stop() {
  281. c.parent.logf("Stopping Stream Proxy "+c.Name, nil)
  282. if c.udpStopChan != nil {
  283. c.parent.logf("Stopping UDP for "+c.Name, nil)
  284. c.udpStopChan <- true
  285. c.udpStopChan = nil
  286. }
  287. if c.tcpStopChan != nil {
  288. c.parent.logf("Stopping TCP for "+c.Name, nil)
  289. c.tcpStopChan <- true
  290. c.tcpStopChan = nil
  291. }
  292. c.parent.logf("Stopped Stream Proxy "+c.Name, nil)
  293. c.Running = false
  294. //Update the running status
  295. c.parent.SaveConfigToDatabase()
  296. }