1
0

originPicker.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package loadbalance
  2. import (
  3. "errors"
  4. "math/rand"
  5. "net/http"
  6. )
  7. /*
  8. Origin Picker
  9. This script contains the code to pick the best origin
  10. by this request.
  11. */
  12. // GetRequestUpstreamTarget return the upstream target where this
  13. // request should be routed
  14. func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.Request, origins []*Upstream, useStickySession bool) (*Upstream, error) {
  15. if len(origins) == 0 {
  16. return nil, errors.New("no upstream is defined for this host")
  17. }
  18. //Pick the origin
  19. if useStickySession {
  20. //Use stick session, check which origins this request previously used
  21. targetOriginId, err := m.getSessionHandler(r, origins)
  22. if err != nil {
  23. // No valid session found or origin is offline
  24. // Filter the offline origins
  25. origins = m.FilterOfflineOrigins(origins)
  26. if len(origins) == 0 {
  27. return nil, errors.New("no online upstream is available for origin: " + r.Host)
  28. }
  29. //Get a random origin
  30. targetOrigin, index, err := getRandomUpstreamByWeight(origins)
  31. if err != nil {
  32. m.println("Unable to get random upstream", err)
  33. targetOrigin = origins[0]
  34. index = 0
  35. }
  36. //fmt.Println("DEBUG: (Sticky Session) Registering session origin " + origins[index].OriginIpOrDomain)
  37. m.setSessionHandler(w, r, targetOrigin.OriginIpOrDomain, index)
  38. return targetOrigin, nil
  39. }
  40. //Valid session found and origin is online
  41. //fmt.Println("DEBUG: (Sticky Session) Picking origin " + origins[targetOriginId].OriginIpOrDomain)
  42. return origins[targetOriginId], nil
  43. }
  44. //No sticky session, get a random origin
  45. m.clearSessionHandler(w, r) //Clear the session
  46. //Filter the offline origins
  47. origins = m.FilterOfflineOrigins(origins)
  48. if len(origins) == 0 {
  49. return nil, errors.New("no online upstream is available for origin: " + r.Host)
  50. }
  51. //Get a random origin
  52. targetOrigin, _, err := getRandomUpstreamByWeight(origins)
  53. if err != nil {
  54. m.println("Failed to get next origin", err)
  55. targetOrigin = origins[0]
  56. }
  57. //fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain)
  58. return targetOrigin, nil
  59. }
  60. // GetUsableUpstreamCounts return the number of usable upstreams
  61. func (m *RouteManager) GetUsableUpstreamCounts(origins []*Upstream) int {
  62. origins = m.FilterOfflineOrigins(origins)
  63. return len(origins)
  64. }
  65. /* Features related to session access */
  66. //Set a new origin for this connection by session
  67. func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error {
  68. session, err := m.SessionStore.Get(r, "STICKYSESSION")
  69. if err != nil {
  70. return err
  71. }
  72. session.Values["zr_sid_origin"] = originIpOrDomain
  73. session.Values["zr_sid_index"] = index
  74. session.Options.MaxAge = 86400 //1 day
  75. session.Options.Path = "/"
  76. err = session.Save(r, w)
  77. if err != nil {
  78. return err
  79. }
  80. return nil
  81. }
  82. func (m *RouteManager) clearSessionHandler(w http.ResponseWriter, r *http.Request) error {
  83. session, err := m.SessionStore.Get(r, "STICKYSESSION")
  84. if err != nil {
  85. return err
  86. }
  87. session.Options.MaxAge = -1
  88. session.Options.Path = "/"
  89. err = session.Save(r, w)
  90. if err != nil {
  91. return err
  92. }
  93. return nil
  94. }
  95. // Get the previous connected origin from session
  96. func (m *RouteManager) getSessionHandler(r *http.Request, upstreams []*Upstream) (int, error) {
  97. // Get existing session
  98. session, err := m.SessionStore.Get(r, "STICKYSESSION")
  99. if err != nil {
  100. return -1, err
  101. }
  102. // Retrieve session values for origin
  103. originDomainRaw := session.Values["zr_sid_origin"]
  104. originIDRaw := session.Values["zr_sid_index"]
  105. if originDomainRaw == nil || originIDRaw == nil {
  106. return -1, errors.New("no session has been set")
  107. }
  108. originDomain := originDomainRaw.(string)
  109. //originID := originIDRaw.(int)
  110. //Check if the upstream still exists
  111. for i, upstream := range upstreams {
  112. if upstream.OriginIpOrDomain == originDomain {
  113. if !m.IsTargetOnline(originDomain) {
  114. //Origin is offline
  115. return -1, errors.New("origin is offline")
  116. }
  117. //Ok, the origin is still online
  118. return i, nil
  119. }
  120. }
  121. return -1, errors.New("origin is no longer exists")
  122. }
  123. /* Functions related to random upstream picking */
  124. // Get a random upstream by the weights defined in Upstream struct, return the upstream, index value and any error
  125. func getRandomUpstreamByWeight(upstreams []*Upstream) (*Upstream, int, error) {
  126. // If there is only one upstream, return it
  127. if len(upstreams) == 1 {
  128. return upstreams[0], 0, nil
  129. }
  130. // Preserve the index with upstreams
  131. type upstreamWithIndex struct {
  132. Upstream *Upstream
  133. Index int
  134. }
  135. // Calculate total weight for upstreams with weight > 0
  136. totalWeight := 0
  137. fallbackUpstreams := make([]upstreamWithIndex, 0, len(upstreams))
  138. for index, upstream := range upstreams {
  139. if upstream.Weight > 0 {
  140. totalWeight += upstream.Weight
  141. } else {
  142. // Collect fallback upstreams
  143. fallbackUpstreams = append(fallbackUpstreams, upstreamWithIndex{upstream, index})
  144. }
  145. }
  146. // If there are no upstreams with weight > 0, return a fallback upstream if available
  147. if totalWeight == 0 {
  148. if len(fallbackUpstreams) > 0 {
  149. // Randomly select one of the fallback upstreams
  150. randIndex := rand.Intn(len(fallbackUpstreams))
  151. return fallbackUpstreams[randIndex].Upstream, fallbackUpstreams[randIndex].Index, nil
  152. }
  153. // No upstreams available at all
  154. return nil, -1, errors.New("no valid upstream servers available")
  155. }
  156. // Random weight between 0 and total weight
  157. randomWeight := rand.Intn(totalWeight)
  158. // Select an upstream based on the random weight
  159. for index, upstream := range upstreams {
  160. if upstream.Weight > 0 { // Only consider upstreams with weight > 0
  161. if randomWeight < upstream.Weight {
  162. // Return the selected upstream and its index
  163. return upstream, index, nil
  164. }
  165. randomWeight -= upstream.Weight
  166. }
  167. }
  168. // If we reach here, it means we should return a fallback upstream if available
  169. if len(fallbackUpstreams) > 0 {
  170. randIndex := rand.Intn(len(fallbackUpstreams))
  171. return fallbackUpstreams[randIndex].Upstream, fallbackUpstreams[randIndex].Index, nil
  172. }
  173. return nil, -1, errors.New("failed to pick an upstream origin server")
  174. }
  175. // IntRange returns a random integer in the range from min to max.
  176. /*
  177. func intRange(min, max int) (int, error) {
  178. var result int
  179. switch {
  180. case min > max:
  181. // Fail with error
  182. return result, errors.New("min is greater than max")
  183. case max == min:
  184. result = max
  185. case max > min:
  186. b := rand.Intn(max-min) + min
  187. result = min + int(b)
  188. }
  189. return result, nil
  190. }
  191. */