1
0

originPicker.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package loadbalance
  2. import (
  3. "errors"
  4. "math/rand"
  5. "net/http"
  6. )
  7. /*
  8. Origin Picker
  9. This script contains the code to pick the best origin
  10. by this request.
  11. */
  12. const (
  13. STICKY_SESSION_NAME = "zr_sticky_session"
  14. )
  15. // GetRequestUpstreamTarget return the upstream target where this
  16. // request should be routed
  17. func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.Request, origins []*Upstream, useStickySession bool) (*Upstream, error) {
  18. if len(origins) == 0 {
  19. return nil, errors.New("no upstream is defined for this host")
  20. }
  21. //Pick the origin
  22. if useStickySession {
  23. //Use stick session, check which origins this request previously used
  24. targetOriginId, err := m.getSessionHandler(r, origins)
  25. if err != nil {
  26. // No valid session found or origin is offline
  27. // Filter the offline origins
  28. origins = m.FilterOfflineOrigins(origins)
  29. if len(origins) == 0 {
  30. return nil, errors.New("no online upstream is available for origin: " + r.Host)
  31. }
  32. //Get a random origin
  33. targetOrigin, index, err := getRandomUpstreamByWeight(origins)
  34. if err != nil {
  35. m.println("Unable to get random upstream", err)
  36. targetOrigin = origins[0]
  37. index = 0
  38. }
  39. //fmt.Println("DEBUG: (Sticky Session) Registering session origin " + origins[index].OriginIpOrDomain)
  40. m.setSessionHandler(w, r, targetOrigin.OriginIpOrDomain, index)
  41. return targetOrigin, nil
  42. }
  43. //Valid session found and origin is online
  44. //fmt.Println("DEBUG: (Sticky Session) Picking origin " + origins[targetOriginId].OriginIpOrDomain)
  45. return origins[targetOriginId], nil
  46. }
  47. //No sticky session, get a random origin
  48. m.clearSessionHandler(w, r)
  49. //Filter the offline origins
  50. origins = m.FilterOfflineOrigins(origins)
  51. if len(origins) == 0 {
  52. return nil, errors.New("no online upstream is available for origin: " + r.Host)
  53. }
  54. //Get a random origin
  55. targetOrigin, _, err := getRandomUpstreamByWeight(origins)
  56. if err != nil {
  57. m.println("Failed to get next origin", err)
  58. targetOrigin = origins[0]
  59. }
  60. //fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain)
  61. return targetOrigin, nil
  62. }
  63. // GetUsableUpstreamCounts return the number of usable upstreams
  64. func (m *RouteManager) GetUsableUpstreamCounts(origins []*Upstream) int {
  65. origins = m.FilterOfflineOrigins(origins)
  66. return len(origins)
  67. }
  68. /* Features related to session access */
  69. //Set a new origin for this connection by session
  70. func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error {
  71. session, err := m.SessionStore.Get(r, STICKY_SESSION_NAME)
  72. if err != nil {
  73. return err
  74. }
  75. session.Values["zr_sid_origin"] = originIpOrDomain
  76. session.Values["zr_sid_index"] = index
  77. session.Options.MaxAge = 86400 //1 day
  78. session.Options.Path = "/"
  79. err = session.Save(r, w)
  80. if err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. // Clear the zoraxy only session values
  86. func (m *RouteManager) clearSessionHandler(w http.ResponseWriter, r *http.Request) error {
  87. session, err := m.SessionStore.Get(r, STICKY_SESSION_NAME)
  88. if err != nil {
  89. return err
  90. }
  91. session.Values["zr_sid_origin"] = ""
  92. session.Values["zr_sid_index"] = -1
  93. err = session.Save(r, w)
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. // Get the previous connected origin from session
  100. func (m *RouteManager) getSessionHandler(r *http.Request, upstreams []*Upstream) (int, error) {
  101. // Get existing session
  102. session, err := m.SessionStore.Get(r, STICKY_SESSION_NAME)
  103. if err != nil {
  104. return -1, err
  105. }
  106. // Retrieve session values for origin
  107. originDomainRaw := session.Values["zr_sid_origin"]
  108. originIDRaw := session.Values["zr_sid_index"]
  109. if originDomainRaw == nil || originIDRaw == nil || originIDRaw == -1 {
  110. return -1, errors.New("no session has been set")
  111. }
  112. originDomain := originDomainRaw.(string)
  113. //originID := originIDRaw.(int)
  114. //Check if the upstream still exists
  115. for i, upstream := range upstreams {
  116. if upstream.OriginIpOrDomain == originDomain {
  117. if !m.IsTargetOnline(originDomain) {
  118. //Origin is offline
  119. return -1, errors.New("origin is offline")
  120. }
  121. //Ok, the origin is still online
  122. return i, nil
  123. }
  124. }
  125. return -1, errors.New("origin is no longer exists")
  126. }
  127. /* Functions related to random upstream picking */
  128. // Get a random upstream by the weights defined in Upstream struct, return the upstream, index value and any error
  129. func getRandomUpstreamByWeight(upstreams []*Upstream) (*Upstream, int, error) {
  130. // If there is only one upstream, return it
  131. if len(upstreams) == 1 {
  132. return upstreams[0], 0, nil
  133. }
  134. // Preserve the index with upstreams
  135. type upstreamWithIndex struct {
  136. Upstream *Upstream
  137. Index int
  138. }
  139. // Calculate total weight for upstreams with weight > 0
  140. totalWeight := 0
  141. fallbackUpstreams := make([]upstreamWithIndex, 0, len(upstreams))
  142. for index, upstream := range upstreams {
  143. if upstream.Weight > 0 {
  144. totalWeight += upstream.Weight
  145. } else {
  146. // Collect fallback upstreams
  147. fallbackUpstreams = append(fallbackUpstreams, upstreamWithIndex{upstream, index})
  148. }
  149. }
  150. // If there are no upstreams with weight > 0, return a fallback upstream if available
  151. if totalWeight == 0 {
  152. if len(fallbackUpstreams) > 0 {
  153. // Randomly select one of the fallback upstreams
  154. randIndex := rand.Intn(len(fallbackUpstreams))
  155. return fallbackUpstreams[randIndex].Upstream, fallbackUpstreams[randIndex].Index, nil
  156. }
  157. // No upstreams available at all
  158. return nil, -1, errors.New("no valid upstream servers available")
  159. }
  160. // Random weight between 0 and total weight
  161. randomWeight := rand.Intn(totalWeight)
  162. // Select an upstream based on the random weight
  163. for index, upstream := range upstreams {
  164. if upstream.Weight > 0 { // Only consider upstreams with weight > 0
  165. if randomWeight < upstream.Weight {
  166. // Return the selected upstream and its index
  167. return upstream, index, nil
  168. }
  169. randomWeight -= upstream.Weight
  170. }
  171. }
  172. // If we reach here, it means we should return a fallback upstream if available
  173. if len(fallbackUpstreams) > 0 {
  174. randIndex := rand.Intn(len(fallbackUpstreams))
  175. return fallbackUpstreams[randIndex].Upstream, fallbackUpstreams[randIndex].Index, nil
  176. }
  177. return nil, -1, errors.New("failed to pick an upstream origin server")
  178. }