maxLatencyWriter.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package dpcore
  2. /*
  3. Max Latency Writer
  4. This script implements a io writer with periodic flushing base on a ticker
  5. Mostly based on httputil.ReverseProxy
  6. */
  7. import (
  8. "io"
  9. "sync"
  10. "time"
  11. )
  12. type maxLatencyWriter struct {
  13. dst io.Writer
  14. flush func() error
  15. latency time.Duration // non-zero; negative means to flush immediately
  16. mu sync.Mutex // protects t, flushPending, and dst.Flush
  17. t *time.Timer
  18. flushPending bool
  19. }
  20. func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
  21. m.mu.Lock()
  22. defer m.mu.Unlock()
  23. n, err = m.dst.Write(p)
  24. if m.latency < 0 {
  25. //Flush immediately
  26. m.flush()
  27. return
  28. }
  29. if m.flushPending {
  30. //Flush in next tick cycle
  31. return
  32. }
  33. if m.t == nil {
  34. m.t = time.AfterFunc(m.latency, m.delayedFlush)
  35. } else {
  36. m.t.Reset(m.latency)
  37. }
  38. m.flushPending = true
  39. return
  40. }
  41. func (m *maxLatencyWriter) delayedFlush() {
  42. m.mu.Lock()
  43. defer m.mu.Unlock()
  44. if !m.flushPending {
  45. // if stop was called but AfterFunc already started this goroutine
  46. return
  47. }
  48. m.flush()
  49. m.flushPending = false
  50. }
  51. func (m *maxLatencyWriter) stop() {
  52. m.mu.Lock()
  53. defer m.mu.Unlock()
  54. m.flushPending = false
  55. if m.t != nil {
  56. m.t.Stop()
  57. }
  58. }