Sfoglia il codice sorgente

Added experimental new version of dpcore

Toby Chui 11 mesi fa
parent
commit
d999140e5e

+ 64 - 55
mod/dynamicproxy/dpcore/dpcore.go

@@ -1,6 +1,7 @@
 package dpcore
 package dpcore
 
 
 import (
 import (
+	"context"
 	"errors"
 	"errors"
 	"io"
 	"io"
 	"log"
 	"log"
@@ -8,12 +9,9 @@ import (
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"strings"
 	"strings"
-	"sync"
 	"time"
 	"time"
 )
 )
 
 
-var onExitFlushLoop func()
-
 // ReverseProxy is an HTTP Handler that takes an incoming request and
 // ReverseProxy is an HTTP Handler that takes an incoming request and
 // sends it to another server, proxying the response back to the
 // sends it to another server, proxying the response back to the
 // client, support http, also support https tunnel using http.hijacker
 // client, support http, also support https tunnel using http.hijacker
@@ -68,7 +66,12 @@ type requestCanceler interface {
 	CancelRequest(req *http.Request)
 	CancelRequest(req *http.Request)
 }
 }
 
 
-func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerification bool) *ReverseProxy {
+type DpcoreOptions struct {
+	IgnoreTLSVerification bool
+	FlushInterval         time.Duration
+}
+
+func NewDynamicProxyCore(target *url.URL, prepender string, dpcOptions *DpcoreOptions) *ReverseProxy {
 	targetQuery := target.RawQuery
 	targetQuery := target.RawQuery
 	director := func(req *http.Request) {
 	director := func(req *http.Request) {
 		req.URL.Scheme = target.Scheme
 		req.URL.Scheme = target.Scheme
@@ -95,16 +98,17 @@ func NewDynamicProxyCore(target *url.URL, prepender string, ignoreTLSVerificatio
 	thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2
 	thisTransporter.(*http.Transport).MaxConnsPerHost = optimalConcurrentConnection * 2
 	thisTransporter.(*http.Transport).DisableCompression = true
 	thisTransporter.(*http.Transport).DisableCompression = true
 
 
-	if ignoreTLSVerification {
+	if dpcOptions.IgnoreTLSVerification {
 		//Ignore TLS certificate validation error
 		//Ignore TLS certificate validation error
 		thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
 		thisTransporter.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
 	}
 	}
 
 
 	return &ReverseProxy{
 	return &ReverseProxy{
-		Director:  director,
-		Prepender: prepender,
-		Verbal:    false,
-		Transport: thisTransporter,
+		Director:      director,
+		Prepender:     prepender,
+		FlushInterval: dpcOptions.FlushInterval,
+		Verbal:        false,
+		Transport:     thisTransporter,
 	}
 	}
 }
 }
 
 
@@ -178,62 +182,64 @@ var hopHeaders = []string{
 	//"Upgrade",
 	//"Upgrade",
 }
 }
 
 
-func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) {
-	if p.FlushInterval != 0 {
-		if wf, ok := dst.(writeFlusher); ok {
-			mlw := &maxLatencyWriter{
-				dst:     wf,
-				latency: p.FlushInterval,
-				done:    make(chan bool),
-			}
-
-			go mlw.flushLoop()
-			defer mlw.stop()
-			dst = mlw
+// Copy response from src to dst with given flush interval, reference from httputil.ReverseProxy
+func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
+	var w io.Writer = dst
+	if flushInterval != 0 {
+		mlw := &maxLatencyWriter{
+			dst:     dst,
+			flush:   http.NewResponseController(dst).Flush,
+			latency: flushInterval,
 		}
 		}
-	}
 
 
-	io.Copy(dst, src)
-}
+		defer mlw.stop()
+		// set up initial timer so headers get flushed even if body writes are delayed
+		mlw.flushPending = true
+		mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
+		w = mlw
+	}
 
 
-type writeFlusher interface {
-	io.Writer
-	http.Flusher
-}
+	var buf []byte
+	_, err := p.copyBuffer(w, src, buf)
+	return err
 
 
-type maxLatencyWriter struct {
-	dst     writeFlusher
-	latency time.Duration
-	mu      sync.Mutex
-	done    chan bool
 }
 }
 
 
-func (m *maxLatencyWriter) Write(b []byte) (int, error) {
-	m.mu.Lock()
-	defer m.mu.Unlock()
-	return m.dst.Write(b)
-}
+// Copy with given buffer size. Default to 64k
+func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
+	if len(buf) == 0 {
+		buf = make([]byte, 64*1024)
+	}
 
 
-func (m *maxLatencyWriter) flushLoop() {
-	t := time.NewTicker(m.latency)
-	defer t.Stop()
+	var written int64
 	for {
 	for {
-		select {
-		case <-m.done:
-			if onExitFlushLoop != nil {
-				onExitFlushLoop()
+		nr, rerr := src.Read(buf)
+		if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
+			p.logf("dpcore read error during body copy: %v", rerr)
+		}
+
+		if nr > 0 {
+			nw, werr := dst.Write(buf[:nr])
+			if nw > 0 {
+				written += int64(nw)
+			}
+
+			if werr != nil {
+				return written, werr
+			}
+
+			if nr != nw {
+				return written, io.ErrShortWrite
 			}
 			}
-			return
-		case <-t.C:
-			m.mu.Lock()
-			m.dst.Flush()
-			m.mu.Unlock()
 		}
 		}
-	}
-}
 
 
-func (m *maxLatencyWriter) stop() {
-	m.done <- true
+		if rerr != nil {
+			if rerr == io.EOF {
+				rerr = nil
+			}
+			return written, rerr
+		}
+	}
 }
 }
 
 
 func (p *ReverseProxy) logf(format string, args ...interface{}) {
 func (p *ReverseProxy) logf(format string, args ...interface{}) {
@@ -438,7 +444,10 @@ func (p *ReverseProxy) ProxyHTTP(rw http.ResponseWriter, req *http.Request, rrr
 		}
 		}
 	}
 	}
 
 
-	p.copyResponse(rw, res.Body)
+	//Get flush interval in real time and start copying the request
+	flushInterval := p.getFlushInterval(req, res)
+	p.copyResponse(rw, res.Body, flushInterval)
+
 	// close now, instead of defer, to populate res.Trailer
 	// close now, instead of defer, to populate res.Trailer
 	res.Body.Close()
 	res.Body.Close()
 	copyHeader(rw.Header(), res.Trailer)
 	copyHeader(rw.Header(), res.Trailer)

+ 38 - 0
mod/dynamicproxy/dpcore/flush.go

@@ -0,0 +1,38 @@
+package dpcore
+
+import (
+	"mime"
+	"net/http"
+	"time"
+)
+
+// Auto sniff of flush interval from header
+func (p *ReverseProxy) getFlushInterval(req *http.Request, res *http.Response) time.Duration {
+	contentType := req.Header.Get("Content-Type")
+	if actualContentType, _, _ := mime.ParseMediaType(contentType); actualContentType == "text/event-stream" {
+		return -1
+	}
+
+	if req.ContentLength == -1 || p.isBidirectionalStream(req, res) {
+		return -1
+	}
+
+	//Cannot sniff anything. Use default value
+	return p.FlushInterval
+
+}
+
+// Check for bidirectional stream, copy from Caddy :D
+func (p *ReverseProxy) isBidirectionalStream(req *http.Request, res *http.Response) bool {
+	// We have to check the encoding here; only flush headers with identity encoding.
+	// Non-identity encoding might combine with "encode" directive, and in that case,
+	// if body size larger than enc.MinLength, upper level encode handle might have
+	// Content-Encoding header to write.
+	// (see https://github.com/caddyserver/caddy/issues/3606 for use case)
+	ae := req.Header.Get("Accept-Encoding")
+
+	return req.ProtoMajor == 2 &&
+		res.ProtoMajor == 2 &&
+		res.ContentLength == -1 &&
+		(ae == "identity" || ae == "")
+}

+ 73 - 0
mod/dynamicproxy/dpcore/maxLatencyWriter.go

@@ -0,0 +1,73 @@
+package dpcore
+
+/*
+
+Max Latency Writer
+
+This script implements a io writer with periodic flushing base on a ticker
+Mostly based on httputil.ReverseProxy
+
+*/
+
+import (
+	"io"
+	"sync"
+	"time"
+)
+
+type maxLatencyWriter struct {
+	dst          io.Writer
+	flush        func() error
+	latency      time.Duration // non-zero; negative means to flush immediately
+	mu           sync.Mutex    // protects t, flushPending, and dst.Flush
+	t            *time.Timer
+	flushPending bool
+}
+
+func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	n, err = m.dst.Write(p)
+	if m.latency < 0 {
+		//Flush immediately
+		m.flush()
+		return
+	}
+
+	if m.flushPending {
+		//Flush in next tick cycle
+		return
+	}
+
+	if m.t == nil {
+		m.t = time.AfterFunc(m.latency, m.delayedFlush)
+	} else {
+		m.t.Reset(m.latency)
+	}
+
+	m.flushPending = true
+	return
+
+}
+
+func (m *maxLatencyWriter) delayedFlush() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if !m.flushPending {
+		// if stop was called but AfterFunc already started this goroutine
+		return
+	}
+
+	m.flush()
+	m.flushPending = false
+}
+
+func (m *maxLatencyWriter) stop() {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+
+	m.flushPending = false
+	if m.t != nil {
+		m.t.Stop()
+	}
+}

+ 6 - 2
mod/dynamicproxy/router.go

@@ -42,7 +42,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint
 	}
 	}
 
 
 	//Create the proxy routing handler
 	//Create the proxy routing handler
-	proxy := dpcore.NewDynamicProxyCore(path, "", endpoint.SkipCertValidations)
+	proxy := dpcore.NewDynamicProxyCore(path, "", &dpcore.DpcoreOptions{
+		IgnoreTLSVerification: endpoint.SkipCertValidations,
+	})
 	endpoint.proxy = proxy
 	endpoint.proxy = proxy
 	endpoint.parent = router
 	endpoint.parent = router
 
 
@@ -69,7 +71,9 @@ func (router *Router) PrepareProxyRoute(endpoint *ProxyEndpoint) (*ProxyEndpoint
 			return nil, err
 			return nil, err
 		}
 		}
 
 
-		proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, vdir.SkipCertValidations)
+		proxy := dpcore.NewDynamicProxyCore(path, vdir.MatchingPath, &dpcore.DpcoreOptions{
+			IgnoreTLSVerification: vdir.SkipCertValidations,
+		})
 		vdir.proxy = proxy
 		vdir.proxy = proxy
 		vdir.parent = endpoint
 		vdir.parent = endpoint
 	}
 	}