Просмотр исходного кода

Added automatic load balance online state check

Toby Chui 3 недель назад
Родитель
Сommit
85ab359141

+ 36 - 13
mod/dynamicproxy/loadbalance/loadbalance.go

@@ -3,6 +3,7 @@ package loadbalance
 import (
 	"strings"
 	"sync"
+	"time"
 
 	"github.com/google/uuid"
 	"github.com/gorilla/sessions"
@@ -25,11 +26,12 @@ type Options struct {
 }
 
 type RouteManager struct {
-	SessionStore           *sessions.CookieStore
-	LoadBalanceMap         sync.Map  //Sync map to store the last load balance state of a given node
-	OnlineStatusMap        sync.Map  //Sync map to store the online status of a given ip address or domain name
-	onlineStatusTickerStop chan bool //Stopping channel for the online status pinger
-	Options                Options   //Options for the load balancer
+	SessionStore *sessions.CookieStore
+	OnlineStatus sync.Map //Store the online status notify by uptime monitor
+	Options      Options  //Options for the load balancer
+
+	cacheTicker     *time.Ticker //Ticker for cache cleanup
+	cacheTickerStop chan bool    //Stop the cache cleanup
 }
 
 /* Upstream or Origin Server */
@@ -55,14 +57,31 @@ func NewLoadBalancer(options *Options) *RouteManager {
 		options.SystemUUID = uuid.New().String()
 	}
 
+	//Create a ticker for cache cleanup every 12 hours
+	cacheTicker := time.NewTicker(12 * time.Hour)
+	cacheTickerStop := make(chan bool)
+	go func() {
+		options.Logger.PrintAndLog("LoadBalancer", "Upstream state cache ticker started", nil)
+		for {
+			select {
+			case <-cacheTickerStop:
+				return
+			case <-cacheTicker.C:
+				//Clean up the cache
+				options.Logger.PrintAndLog("LoadBalancer", "Cleaning up upstream state cache", nil)
+			}
+		}
+	}()
+
 	//Generate a session store for stickySession
 	store := sessions.NewCookieStore([]byte(options.SystemUUID))
 	return &RouteManager{
-		SessionStore:           store,
-		LoadBalanceMap:         sync.Map{},
-		OnlineStatusMap:        sync.Map{},
-		onlineStatusTickerStop: nil,
-		Options:                *options,
+		SessionStore: store,
+		OnlineStatus: sync.Map{},
+		Options:      *options,
+
+		cacheTicker:     cacheTicker,
+		cacheTickerStop: cacheTickerStop,
 	}
 }
 
@@ -91,10 +110,14 @@ func GetUpstreamsAsString(upstreams []*Upstream) string {
 }
 
 func (m *RouteManager) Close() {
-	if m.onlineStatusTickerStop != nil {
-		m.onlineStatusTickerStop <- true
-	}
+	//Close the session store
+	m.SessionStore.MaxAge(0)
 
+	//Stop the cache cleanup
+	if m.cacheTicker != nil {
+		m.cacheTicker.Stop()
+	}
+	close(m.cacheTickerStop)
 }
 
 // Log Println, replace all log.Println or fmt.Println with this

+ 51 - 19
mod/dynamicproxy/loadbalance/onlineStatus.go

@@ -1,39 +1,71 @@
 package loadbalance
 
 import (
-	"net/http"
+	"strconv"
+	"strings"
 	"time"
 )
 
-// Return the last ping status to see if the target is online
-func (m *RouteManager) IsTargetOnline(matchingDomainOrIp string) bool {
-	value, ok := m.LoadBalanceMap.Load(matchingDomainOrIp)
+// Return if the target host is online
+func (m *RouteManager) IsTargetOnline(upstreamIP string) bool {
+	value, ok := m.OnlineStatus.Load(upstreamIP)
 	if !ok {
-		return false
+		// Assume online if not found, also update the map
+		m.OnlineStatus.Store(upstreamIP, true)
+		return true
 	}
 
 	isOnline, ok := value.(bool)
 	return ok && isOnline
 }
 
-// Ping a target to see if it is online
-func PingTarget(targetMatchingDomainOrIp string, requireTLS bool) bool {
-	client := &http.Client{
-		Timeout: 10 * time.Second,
+// Notify the host online state, should be called from uptime monitor
+func (m *RouteManager) NotifyHostOnlineState(upstreamIP string, isOnline bool) {
+	//if the upstream IP contains http or https, strip it
+	upstreamIP = strings.TrimPrefix(upstreamIP, "http://")
+	upstreamIP = strings.TrimPrefix(upstreamIP, "https://")
+
+	//Check previous state and update
+	if m.IsTargetOnline(upstreamIP) == isOnline {
+		return
 	}
 
-	url := targetMatchingDomainOrIp
-	if requireTLS {
-		url = "https://" + url
-	} else {
-		url = "http://" + url
+	m.OnlineStatus.Store(upstreamIP, isOnline)
+	m.println("Updating upstream "+upstreamIP+" online state to "+strconv.FormatBool(isOnline), nil)
+}
+
+// Set this host unreachable for a given amount of time defined in timeout
+// this shall be used in passive fallback. The uptime monitor should call to NotifyHostOnlineState() instead
+func (m *RouteManager) NotifyHostUnreachableWithTimeout(upstreamIp string, timeout int64) {
+	//if the upstream IP contains http or https, strip it
+	upstreamIp = strings.TrimPrefix(upstreamIp, "http://")
+	upstreamIp = strings.TrimPrefix(upstreamIp, "https://")
+	if timeout <= 0 {
+		//Set to the default timeout
+		timeout = 60
 	}
 
-	resp, err := client.Get(url)
-	if err != nil {
-		return false
+	if !m.IsTargetOnline(upstreamIp) {
+		//Already offline
+		return
 	}
-	defer resp.Body.Close()
 
-	return resp.StatusCode >= 200 && resp.StatusCode <= 600
+	m.OnlineStatus.Store(upstreamIp, false)
+	m.println("Setting upstream "+upstreamIp+" unreachable for "+strconv.FormatInt(timeout, 10)+"s", nil)
+	go func() {
+		//Set the upstream back to online after the timeout
+		<-time.After(time.Duration(timeout) * time.Second)
+		m.NotifyHostOnlineState(upstreamIp, true)
+	}()
+}
+
+// FilterOfflineOrigins return only online origins from a list of origins
+func (m *RouteManager) FilterOfflineOrigins(origins []*Upstream) []*Upstream {
+	var onlineOrigins []*Upstream
+	for _, origin := range origins {
+		if m.IsTargetOnline(origin.OriginIpOrDomain) {
+			onlineOrigins = append(onlineOrigins, origin)
+		}
+	}
+	return onlineOrigins
 }

+ 31 - 12
mod/dynamicproxy/loadbalance/originPicker.go

@@ -19,12 +19,20 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R
 	if len(origins) == 0 {
 		return nil, errors.New("no upstream is defined for this host")
 	}
-	var targetOrigin = origins[0]
+
+	//Pick the origin
 	if useStickySession {
 		//Use stick session, check which origins this request previously used
 		targetOriginId, err := m.getSessionHandler(r, origins)
-		if err != nil {
-			//No valid session found. Assign a new upstream
+		if err != nil || !m.IsTargetOnline(origins[targetOriginId].OriginIpOrDomain) {
+			// No valid session found or origin is offline
+			// Filter the offline origins
+			origins = m.FilterOfflineOrigins(origins)
+			if len(origins) == 0 {
+				return nil, errors.New("no online upstream is available for origin: " + r.Host)
+			}
+
+			//Get a random origin
 			targetOrigin, index, err := getRandomUpstreamByWeight(origins)
 			if err != nil {
 				m.println("Unable to get random upstream", err)
@@ -35,23 +43,34 @@ func (m *RouteManager) GetRequestUpstreamTarget(w http.ResponseWriter, r *http.R
 			return targetOrigin, nil
 		}
 
-		//Valid session found. Resume the previous session
+		//Valid session found and origin is online
 		return origins[targetOriginId], nil
-	} else {
-		//Do not use stick session. Get a random one
-		var err error
-		targetOrigin, _, err = getRandomUpstreamByWeight(origins)
-		if err != nil {
-			m.println("Failed to get next origin", err)
-			targetOrigin = origins[0]
-		}
+	}
+	//No sticky session, get a random origin
 
+	//Filter the offline origins
+	origins = m.FilterOfflineOrigins(origins)
+	if len(origins) == 0 {
+		return nil, errors.New("no online upstream is available for origin: " + r.Host)
+	}
+
+	//Get a random origin
+	targetOrigin, _, err := getRandomUpstreamByWeight(origins)
+	if err != nil {
+		m.println("Failed to get next origin", err)
+		targetOrigin = origins[0]
 	}
 
 	//fmt.Println("DEBUG: Picking origin " + targetOrigin.OriginIpOrDomain)
 	return targetOrigin, nil
 }
 
+// GetUsableUpstreamCounts return the number of usable upstreams
+func (m *RouteManager) GetUsableUpstreamCounts(origins []*Upstream) int {
+	origins = m.FilterOfflineOrigins(origins)
+	return len(origins)
+}
+
 /* Features related to session access */
 //Set a new origin for this connection by session
 func (m *RouteManager) setSessionHandler(w http.ResponseWriter, r *http.Request, originIpOrDomain string, index int) error {

+ 10 - 1
mod/dynamicproxy/proxyRequestHandler.go

@@ -1,7 +1,9 @@
 package dynamicproxy
 
 import (
+	"context"
 	"errors"
+	"fmt"
 	"log"
 	"net"
 	"net/http"
@@ -198,14 +200,21 @@ func (h *ProxyHandler) hostRequest(w http.ResponseWriter, r *http.Request, targe
 		Version:             target.parent.Option.HostVersion,
 	})
 
+	//validate the error
 	var dnsError *net.DNSError
 	if err != nil {
 		if errors.As(err, &dnsError) {
 			http.ServeFile(w, r, "./web/hosterror.html")
 			h.Parent.logRequest(r, false, 404, "host-http", r.URL.Hostname())
+		} else if errors.Is(err, context.Canceled) {
+			//Request canceled by client, usually due to manual refresh before page load
+			http.Error(w, "Request canceled", http.StatusRequestTimeout)
+			h.Parent.logRequest(r, false, http.StatusRequestTimeout, "host-http", r.URL.Hostname())
 		} else {
+			//Notify the load balancer that the host is unreachable
+			fmt.Println(err.Error())
+			h.Parent.loadBalancer.NotifyHostUnreachableWithTimeout(selectedUpstream.OriginIpOrDomain, PassiveLoadBalanceNotifyTimeout)
 			http.ServeFile(w, r, "./web/rperror.html")
-			//TODO: Take this upstream offline automatically
 			h.Parent.logRequest(r, false, 521, "host-http", r.URL.Hostname())
 		}
 	}

+ 1 - 1
mod/dynamicproxy/templates/hosterror.html

@@ -123,7 +123,7 @@
             <div class="ui container">
                 <div class="ui stackable grid">
                     <div class="eight wide column">
-                        <h1>What happend?</h1>
+                        <h1>What happened?</h1>
                         <p>The reverse proxy target domain is not found.<br>For more information, see the error message on the reverse proxy terminal.</p>
                     </div>
                     <div class="eight wide column">

+ 2 - 1
mod/dynamicproxy/typedef.go

@@ -28,6 +28,7 @@ import (
 
 type ProxyType int
 
+const PassiveLoadBalanceNotifyTimeout = 60 //Time to assume a passive load balance is unreachable, in seconds
 const (
 	ProxyTypeRoot ProxyType = iota //Root Proxy, everything not matching will be routed here
 	ProxyTypeHost                  //Host Proxy, match by host (domain) name
@@ -193,7 +194,7 @@ type ProxyEndpoint struct {
 	DefaultSiteValue  string //Fallback routing target, optional
 
 	//Internal Logic Elements
-	parent *Router `json:"-"`
+	parent *Router  `json:"-"`
 	Tags   []string // Tags for the proxy endpoint
 }
 

+ 58 - 0
mod/uptime/typedef.go

@@ -0,0 +1,58 @@
+package uptime
+
+import "imuslab.com/zoraxy/mod/info/logger"
+
+const (
+	logModuleName = "uptime-monitor"
+)
+
+type Record struct {
+	Timestamp  int64
+	ID         string
+	Name       string
+	URL        string
+	Protocol   string
+	Online     bool
+	StatusCode int
+	Latency    int64
+}
+
+type ProxyType string
+
+const (
+	ProxyType_Host ProxyType = "Origin Server"
+	ProxyType_Vdir ProxyType = "Virtual Directory"
+)
+
+type Target struct {
+	ID        string
+	Name      string
+	URL       string
+	Protocol  string
+	ProxyType ProxyType
+}
+
+type Config struct {
+	Targets           []*Target
+	Interval          int
+	MaxRecordsStore   int
+	OnlineStateNotify func(upstreamIP string, isOnline bool)
+	Logger            *logger.Logger
+}
+
+type Monitor struct {
+	Config          *Config
+	OnlineStatusLog map[string][]*Record
+}
+
+// Default configs
+var exampleTarget = Target{
+	ID:       "example",
+	Name:     "Example",
+	URL:      "example.com",
+	Protocol: "https",
+}
+
+func defaultNotify(upstreamIP string, isOnline bool) {
+	// Do nothing
+}

+ 7 - 51
mod/uptime/uptime.go

@@ -14,56 +14,6 @@ import (
 	"imuslab.com/zoraxy/mod/utils"
 )
 
-const (
-	logModuleName = "uptime-monitor"
-)
-
-type Record struct {
-	Timestamp  int64
-	ID         string
-	Name       string
-	URL        string
-	Protocol   string
-	Online     bool
-	StatusCode int
-	Latency    int64
-}
-
-type ProxyType string
-
-const (
-	ProxyType_Host ProxyType = "Origin Server"
-	ProxyType_Vdir ProxyType = "Virtual Directory"
-)
-
-type Target struct {
-	ID        string
-	Name      string
-	URL       string
-	Protocol  string
-	ProxyType ProxyType
-}
-
-type Config struct {
-	Targets         []*Target
-	Interval        int
-	MaxRecordsStore int
-	Logger          *logger.Logger
-}
-
-type Monitor struct {
-	Config          *Config
-	OnlineStatusLog map[string][]*Record
-}
-
-// Default configs
-var exampleTarget = Target{
-	ID:       "example",
-	Name:     "Example",
-	URL:      "example.com",
-	Protocol: "https",
-}
-
 // Create a new uptime monitor
 func NewUptimeMonitor(config *Config) (*Monitor, error) {
 	//Create new monitor object
@@ -77,6 +27,11 @@ func NewUptimeMonitor(config *Config) (*Monitor, error) {
 		config.Logger, _ = logger.NewFmtLogger()
 	}
 
+	if config.OnlineStateNotify == nil {
+		//Use default notify function if not provided
+		config.OnlineStateNotify = defaultNotify
+	}
+
 	//Start the endpoint listener
 	ticker := time.NewTicker(time.Duration(config.Interval) * time.Second)
 	done := make(chan bool)
@@ -218,6 +173,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) {
 	end := time.Now().UnixNano() / int64(time.Millisecond)
 	if err != nil {
 		m.Config.Logger.PrintAndLog(logModuleName, "Ping upstream timeout. Assume offline", err)
+		m.Config.OnlineStateNotify(url, false)
 		return false, 0, 0
 	} else {
 		diff := end - start
@@ -231,7 +187,7 @@ func (m *Monitor) getWebsiteStatusWithLatency(url string) (bool, int64, int) {
 		} else {
 			succ = false
 		}
-
+		m.Config.OnlineStateNotify(url, true)
 		return succ, diff, statusCode
 	}
 

+ 5 - 4
reverseproxy.go

@@ -163,10 +163,11 @@ func ReverseProxtInit() {
 	go func() {
 		//This must be done in go routine to prevent blocking on system startup
 		uptimeMonitor, _ = uptime.NewUptimeMonitor(&uptime.Config{
-			Targets:         GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter),
-			Interval:        300,              //5 minutes
-			MaxRecordsStore: 288,              //1 day
-			Logger:          SystemWideLogger, //Logger
+			Targets:           GetUptimeTargetsFromReverseProxyRules(dynamicProxyRouter),
+			Interval:          300,                                //5 minutes
+			MaxRecordsStore:   288,                                //1 day
+			OnlineStateNotify: loadBalancer.NotifyHostOnlineState, //Notify the load balancer for online state
+			Logger:            SystemWideLogger,                   //Logger
 		})
 
 		SystemWideLogger.Println("Uptime Monitor background service started")

+ 1 - 1
web/hosterror.html

@@ -123,7 +123,7 @@
             <div class="ui container">
                 <div class="ui stackable grid">
                     <div class="eight wide column">
-                        <h1>What happend?</h1>
+                        <h1>What happened?</h1>
                         <p>The reverse proxy target domain is not found.<br>For more information, see the error message on the reverse proxy terminal.</p>
                     </div>
                     <div class="eight wide column">

+ 1 - 1
web/rperror.html

@@ -124,7 +124,7 @@
             <div class="ui container">
                 <div class="ui stackable grid">
                     <div class="eight wide column">
-                        <h1>What happend?</h1>
+                        <h1>What happened?</h1>
                         <p>The web server reported a bad gateway error.<br>For more information, see the error message on the reverse proxy terminal.</p>
                     </div>
                     <div class="eight wide column">

+ 1 - 1
www/templates/notfound.html

@@ -123,7 +123,7 @@
             <div class="ui container">
                 <div class="ui stackable grid">
                     <div class="eight wide column">
-                        <h1>What happend?</h1>
+                        <h1>What happened?</h1>
                         <p>The reverse proxy target domain is not found.<br>For more information, see the error message on the reverse proxy terminal.</p>
                     </div>
                     <div class="eight wide column">