conn_broadcast_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. // Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package websocket
  5. import (
  6. "io"
  7. "io/ioutil"
  8. "sync/atomic"
  9. "testing"
  10. )
  11. // broadcastBench allows to run broadcast benchmarks.
  12. // In every broadcast benchmark we create many connections, then send the same
  13. // message into every connection and wait for all writes complete. This emulates
  14. // an application where many connections listen to the same data - i.e. PUB/SUB
  15. // scenarios with many subscribers in one channel.
  16. type broadcastBench struct {
  17. w io.Writer
  18. message *broadcastMessage
  19. closeCh chan struct{}
  20. doneCh chan struct{}
  21. count int32
  22. conns []*broadcastConn
  23. compression bool
  24. usePrepared bool
  25. }
  26. type broadcastMessage struct {
  27. payload []byte
  28. prepared *PreparedMessage
  29. }
  30. type broadcastConn struct {
  31. conn *Conn
  32. msgCh chan *broadcastMessage
  33. }
  34. func newBroadcastConn(c *Conn) *broadcastConn {
  35. return &broadcastConn{
  36. conn: c,
  37. msgCh: make(chan *broadcastMessage, 1),
  38. }
  39. }
  40. func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
  41. bench := &broadcastBench{
  42. w: ioutil.Discard,
  43. doneCh: make(chan struct{}),
  44. closeCh: make(chan struct{}),
  45. usePrepared: usePrepared,
  46. compression: compression,
  47. }
  48. msg := &broadcastMessage{
  49. payload: textMessages(1)[0],
  50. }
  51. if usePrepared {
  52. pm, _ := NewPreparedMessage(TextMessage, msg.payload)
  53. msg.prepared = pm
  54. }
  55. bench.message = msg
  56. bench.makeConns(10000)
  57. return bench
  58. }
  59. func (b *broadcastBench) makeConns(numConns int) {
  60. conns := make([]*broadcastConn, numConns)
  61. for i := 0; i < numConns; i++ {
  62. c := newTestConn(nil, b.w, true)
  63. if b.compression {
  64. c.enableWriteCompression = true
  65. c.newCompressionWriter = compressNoContextTakeover
  66. }
  67. conns[i] = newBroadcastConn(c)
  68. go func(c *broadcastConn) {
  69. for {
  70. select {
  71. case msg := <-c.msgCh:
  72. if b.usePrepared {
  73. c.conn.WritePreparedMessage(msg.prepared)
  74. } else {
  75. c.conn.WriteMessage(TextMessage, msg.payload)
  76. }
  77. val := atomic.AddInt32(&b.count, 1)
  78. if val%int32(numConns) == 0 {
  79. b.doneCh <- struct{}{}
  80. }
  81. case <-b.closeCh:
  82. return
  83. }
  84. }
  85. }(conns[i])
  86. }
  87. b.conns = conns
  88. }
  89. func (b *broadcastBench) close() {
  90. close(b.closeCh)
  91. }
  92. func (b *broadcastBench) runOnce() {
  93. for _, c := range b.conns {
  94. c.msgCh <- b.message
  95. }
  96. <-b.doneCh
  97. }
  98. func BenchmarkBroadcast(b *testing.B) {
  99. benchmarks := []struct {
  100. name string
  101. usePrepared bool
  102. compression bool
  103. }{
  104. {"NoCompression", false, false},
  105. {"WithCompression", false, true},
  106. {"NoCompressionPrepared", true, false},
  107. {"WithCompressionPrepared", true, true},
  108. }
  109. for _, bm := range benchmarks {
  110. b.Run(bm.name, func(b *testing.B) {
  111. bench := newBroadcastBench(bm.usePrepared, bm.compression)
  112. defer bench.close()
  113. b.ResetTimer()
  114. for i := 0; i < b.N; i++ {
  115. bench.runOnce()
  116. }
  117. b.ReportAllocs()
  118. })
  119. }
  120. }