From 063f3f68df5ee488ac392007a51d94750b267f7c Mon Sep 17 00:00:00 2001
From: Agniva De Sarker <agnivade@yahoo.co.in>
Date: Mon, 25 Sep 2017 23:25:02 +0530
Subject: [PATCH] Improve statsd plugin perf by using a byte buffer pool
 (#3254)

---
 plugins/inputs/statsd/statsd.go      | 39 +++++++++++++++++-----------
 plugins/inputs/statsd/statsd_test.go |  7 ++---
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go
index 76dd2505..67ce29cd 100644
--- a/plugins/inputs/statsd/statsd.go
+++ b/plugins/inputs/statsd/statsd.go
@@ -2,6 +2,7 @@ package statsd
 
 import (
 	"bufio"
+	"bytes"
 	"errors"
 	"fmt"
 	"log"
@@ -89,7 +90,7 @@ type Statsd struct {
 	malformed int
 
 	// Channel for all incoming statsd packets
-	in   chan []byte
+	in   chan *bytes.Buffer
 	done chan struct{}
 
 	// Cache gauges, counters & sets so they can be aggregated as they arrive
@@ -121,6 +122,9 @@ type Statsd struct {
 	TotalConnections   selfstat.Stat
 	PacketsRecv        selfstat.Stat
 	BytesRecv          selfstat.Stat
+
+	// A pool of byte slices to handle parsing
+	bufPool sync.Pool
 }
 
 // One statsd metric, form is <bucket>:<value>|<mtype>|@<samplerate>
@@ -281,9 +285,6 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
 
 func (s *Statsd) Start(_ telegraf.Accumulator) error {
 	// Make data structures
-	s.done = make(chan struct{})
-	s.in = make(chan []byte, s.AllowedPendingMessages)
-
 	s.gauges = make(map[string]cachedgauge)
 	s.counters = make(map[string]cachedcounter)
 	s.sets = make(map[string]cachedset)
@@ -302,10 +303,15 @@ func (s *Statsd) Start(_ telegraf.Accumulator) error {
 	s.PacketsRecv = selfstat.Register("statsd", "tcp_packets_received", tags)
 	s.BytesRecv = selfstat.Register("statsd", "tcp_bytes_received", tags)
 
-	s.in = make(chan []byte, s.AllowedPendingMessages)
+	s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages)
 	s.done = make(chan struct{})
 	s.accept = make(chan bool, s.MaxTCPConnections)
 	s.conns = make(map[string]*net.TCPConn)
+	s.bufPool = sync.Pool{
+		New: func() interface{} {
+			return new(bytes.Buffer)
+		},
+	}
 	for i := 0; i < s.MaxTCPConnections; i++ {
 		s.accept <- true
 	}
@@ -394,11 +400,12 @@ func (s *Statsd) udpListen() error {
 				log.Printf("E! Error READ: %s\n", err.Error())
 				continue
 			}
-			bufCopy := make([]byte, n)
-			copy(bufCopy, buf[:n])
+			b := s.bufPool.Get().(*bytes.Buffer)
+			b.Reset()
+			b.Write(buf[:n])
 
 			select {
-			case s.in <- bufCopy:
+			case s.in <- b:
 			default:
 				s.drops++
 				if s.drops == 1 || s.AllowedPendingMessages == 0 || s.drops%s.AllowedPendingMessages == 0 {
@@ -414,19 +421,19 @@ func (s *Statsd) udpListen() error {
 // single statsd metric into a struct.
 func (s *Statsd) parser() error {
 	defer s.wg.Done()
-	var packet []byte
 	for {
 		select {
 		case <-s.done:
 			return nil
-		case packet = <-s.in:
-			lines := strings.Split(string(packet), "\n")
+		case buf := <-s.in:
+			lines := strings.Split(buf.String(), "\n")
 			for _, line := range lines {
 				line = strings.TrimSpace(line)
 				if line != "" {
 					s.parseStatsdLine(line)
 				}
 			}
+			s.bufPool.Put(buf)
 		}
 	}
 }
@@ -774,12 +781,14 @@ func (s *Statsd) handler(conn *net.TCPConn, id string) {
 			}
 			s.BytesRecv.Incr(int64(n))
 			s.PacketsRecv.Incr(1)
-			bufCopy := make([]byte, n+1)
-			copy(bufCopy, scanner.Bytes())
-			bufCopy[n] = '\n'
+
+			b := s.bufPool.Get().(*bytes.Buffer)
+			b.Reset()
+			b.Write(scanner.Bytes())
+			b.WriteByte('\n')
 
 			select {
-			case s.in <- bufCopy:
+			case s.in <- b:
 			default:
 				s.drops++
 				if s.drops == 1 || s.drops%s.AllowedPendingMessages == 0 {
diff --git a/plugins/inputs/statsd/statsd_test.go b/plugins/inputs/statsd/statsd_test.go
index 91af51d6..1331e1f4 100644
--- a/plugins/inputs/statsd/statsd_test.go
+++ b/plugins/inputs/statsd/statsd_test.go
@@ -1,6 +1,7 @@
 package statsd
 
 import (
+	"bytes"
 	"errors"
 	"fmt"
 	"net"
@@ -16,8 +17,8 @@ const (
 	testMsg = "test.tcp.msg:100|c"
 )
 
-func newTestTcpListener() (*Statsd, chan []byte) {
-	in := make(chan []byte, 1500)
+func newTestTcpListener() (*Statsd, chan *bytes.Buffer) {
+	in := make(chan *bytes.Buffer, 1500)
 	listener := &Statsd{
 		Protocol:               "tcp",
 		ServiceAddress:         ":8125",
@@ -34,7 +35,7 @@ func NewTestStatsd() *Statsd {
 
 	// Make data structures
 	s.done = make(chan struct{})
-	s.in = make(chan []byte, s.AllowedPendingMessages)
+	s.in = make(chan *bytes.Buffer, s.AllowedPendingMessages)
 	s.gauges = make(map[string]cachedgauge)
 	s.counters = make(map[string]cachedcounter)
 	s.sets = make(map[string]cachedset)
-- 
GitLab