1
0

loadbalance.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package loadbalance
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. "github.com/google/uuid"
  7. "github.com/gorilla/sessions"
  8. "imuslab.com/zoraxy/mod/dynamicproxy/dpcore"
  9. "imuslab.com/zoraxy/mod/geodb"
  10. "imuslab.com/zoraxy/mod/info/logger"
  11. )
  12. /*
  13. Load Balancer
  14. Handleing load balance request for upstream destinations
  15. */
  16. type Options struct {
  17. SystemUUID string //Use for the session store
  18. UseActiveHealthCheck bool //Use active health check, default to false
  19. Geodb *geodb.Store //GeoIP resolver for checking incoming request origin country
  20. Logger *logger.Logger
  21. }
  22. type RouteManager struct {
  23. SessionStore *sessions.CookieStore
  24. OnlineStatus sync.Map //Store the online status notify by uptime monitor
  25. Options Options //Options for the load balancer
  26. cacheTicker *time.Ticker //Ticker for cache cleanup
  27. cacheTickerStop chan bool //Stop the cache cleanup
  28. }
  29. /* Upstream or Origin Server */
  30. type Upstream struct {
  31. //Upstream Proxy Configs
  32. OriginIpOrDomain string //Target IP address or domain name with port
  33. RequireTLS bool //Require TLS connection
  34. SkipCertValidations bool //Set to true to accept self signed certs
  35. SkipWebSocketOriginCheck bool //Skip origin check on websocket upgrade connections
  36. //Load balancing configs
  37. Weight int //Random weight for round robin, 0 for fallback only
  38. //HTTP Transport Config
  39. MaxConn int //Maxmium concurrent requests to this upstream dpcore instance
  40. RespTimeout int64 //Response header timeout in milliseconds
  41. IdleTimeout int64 //Idle connection timeout in milliseconds
  42. //currentConnectionCounts atomic.Uint64 //Counter for number of client currently connected
  43. proxy *dpcore.ReverseProxy
  44. }
  45. // Create a new load balancer
  46. func NewLoadBalancer(options *Options) *RouteManager {
  47. if options.SystemUUID == "" {
  48. //System UUID not passed in. Use random key
  49. options.SystemUUID = uuid.New().String()
  50. }
  51. //Create a ticker for cache cleanup every 12 hours
  52. cacheTicker := time.NewTicker(12 * time.Hour)
  53. cacheTickerStop := make(chan bool)
  54. go func() {
  55. options.Logger.PrintAndLog("LoadBalancer", "Upstream state cache ticker started", nil)
  56. for {
  57. select {
  58. case <-cacheTickerStop:
  59. return
  60. case <-cacheTicker.C:
  61. //Clean up the cache
  62. options.Logger.PrintAndLog("LoadBalancer", "Cleaning up upstream state cache", nil)
  63. }
  64. }
  65. }()
  66. //Generate a session store for stickySession
  67. store := sessions.NewCookieStore([]byte(options.SystemUUID))
  68. return &RouteManager{
  69. SessionStore: store,
  70. OnlineStatus: sync.Map{},
  71. Options: *options,
  72. cacheTicker: cacheTicker,
  73. cacheTickerStop: cacheTickerStop,
  74. }
  75. }
  76. // UpstreamsReady checks if the group of upstreams contains at least one
  77. // origin server that is ready
  78. func (m *RouteManager) UpstreamsReady(upstreams []*Upstream) bool {
  79. for _, upstream := range upstreams {
  80. if upstream.IsReady() {
  81. return true
  82. }
  83. }
  84. return false
  85. }
  86. // String format and convert a list of upstream into a string representations
  87. func GetUpstreamsAsString(upstreams []*Upstream) string {
  88. targets := []string{}
  89. for _, upstream := range upstreams {
  90. targets = append(targets, upstream.String())
  91. }
  92. if len(targets) == 0 {
  93. //No upstream
  94. return "(no upstream config)"
  95. }
  96. return strings.Join(targets, ", ")
  97. }
  98. // Reset the current session store and clear all previous sessions
  99. func (m *RouteManager) ResetSessions() {
  100. m.SessionStore = sessions.NewCookieStore([]byte(m.Options.SystemUUID))
  101. }
  102. func (m *RouteManager) Close() {
  103. //Close the session store
  104. m.SessionStore.MaxAge(0)
  105. //Stop the cache cleanup
  106. if m.cacheTicker != nil {
  107. m.cacheTicker.Stop()
  108. }
  109. close(m.cacheTickerStop)
  110. }
  111. // Log Println, replace all log.Println or fmt.Println with this
  112. func (m *RouteManager) println(message string, err error) {
  113. m.Options.Logger.PrintAndLog("LoadBalancer", message, err)
  114. }