From be379f3dac6bcc2d807d6553449a4aa4df37aba5 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Thu, 7 Apr 2016 12:06:56 -0600
Subject: [PATCH] Refactor UDP & TCP input buffers

closes #991
---
 plugins/inputs/statsd/statsd.go             | 15 +++++++++------
 plugins/inputs/tcp_listener/tcp_listener.go | 12 ++++++++++--
 plugins/inputs/udp_listener/udp_listener.go | 20 ++++++++++++--------
 3 files changed, 31 insertions(+), 16 deletions(-)

diff --git a/plugins/inputs/statsd/statsd.go b/plugins/inputs/statsd/statsd.go
index 84687511..69638af0 100644
--- a/plugins/inputs/statsd/statsd.go
+++ b/plugins/inputs/statsd/statsd.go
@@ -20,7 +20,7 @@ import (
 const (
 	// UDP packet limit, see
 	// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
-	UDP_PACKET_SIZE int = 65507
+	UDP_MAX_PACKET_SIZE int = 64 * 1024
 
 	defaultFieldName = "value"
 
@@ -57,8 +57,10 @@ type Statsd struct {
 	// statsd protocol (http://docs.datadoghq.com/guides/dogstatsd/)
 	ParseDataDogTags bool
 
-	// UDPPacketSize is the size of the read packets for the server listening
-	// for statsd UDP packets. This will default to 1500 bytes.
+	// UDPPacketSize is deprecated, it's only here for legacy support
+	// we now always create 1 max size buffer and then copy only what we need
+	// into the in channel
+	// see https://github.com/influxdata/telegraf/pull/992
 	UDPPacketSize int `toml:"udp_packet_size"`
 
 	sync.Mutex
@@ -272,7 +274,7 @@ func (s *Statsd) udpListen() error {
 	}
 	log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
 
-	buf := make([]byte, s.UDPPacketSize)
+	buf := make([]byte, UDP_MAX_PACKET_SIZE)
 	for {
 		select {
 		case <-s.done:
@@ -283,9 +285,11 @@ func (s *Statsd) udpListen() error {
 				log.Printf("ERROR READ: %s\n", err.Error())
 				continue
 			}
+			bufCopy := make([]byte, n)
+			copy(bufCopy, buf[:n])
 
 			select {
-			case s.in <- buf[:n]:
+			case s.in <- bufCopy:
 			default:
 				log.Printf(dropwarn, string(buf[:n]))
 			}
@@ -631,7 +635,6 @@ func init() {
 	inputs.Add("statsd", func() telegraf.Input {
 		return &Statsd{
 			MetricSeparator: "_",
-			UDPPacketSize:   UDP_PACKET_SIZE,
 		}
 	})
 }
diff --git a/plugins/inputs/tcp_listener/tcp_listener.go b/plugins/inputs/tcp_listener/tcp_listener.go
index 4559a3bf..6f316645 100644
--- a/plugins/inputs/tcp_listener/tcp_listener.go
+++ b/plugins/inputs/tcp_listener/tcp_listener.go
@@ -39,7 +39,7 @@ type TcpListener struct {
 	acc    telegraf.Accumulator
 }
 
-var dropwarn = "ERROR: Message queue full. Discarding metric. " +
+var dropwarn = "ERROR: Message queue full. Discarding metric [%s], " +
 	"You may want to increase allowed_pending_messages in the config\n"
 
 const sampleConfig = `
@@ -193,6 +193,7 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
 		t.forget(id)
 	}()
 
+	var buf []byte
 	scanner := bufio.NewScanner(conn)
 	for {
 		select {
@@ -202,8 +203,15 @@ func (t *TcpListener) handler(conn *net.TCPConn, id string) {
 			if !scanner.Scan() {
 				return
 			}
+			buf = scanner.Bytes()
+			if len(buf) == 0 {
+				continue
+			}
+			bufCopy := make([]byte, len(buf))
+			copy(bufCopy, buf)
+
 			select {
-			case t.in <- scanner.Bytes():
+			case t.in <- bufCopy:
 			default:
 				log.Printf(dropwarn)
 			}
diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go
index 442cf98b..39249de3 100644
--- a/plugins/inputs/udp_listener/udp_listener.go
+++ b/plugins/inputs/udp_listener/udp_listener.go
@@ -12,7 +12,11 @@ import (
 )
 
 type UdpListener struct {
-	ServiceAddress         string
+	ServiceAddress string
+	// UDPPacketSize is deprecated, it's only here for legacy support
+	// we now always create 1 max size buffer and then copy only what we need
+	// into the in channel
+	// see https://github.com/influxdata/telegraf/pull/992
 	UDPPacketSize          int `toml:"udp_packet_size"`
 	AllowedPendingMessages int
 
@@ -32,7 +36,7 @@ type UdpListener struct {
 
 // UDP packet limit, see
 // https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
-const UDP_PACKET_SIZE int = 65507
+const UDP_MAX_PACKET_SIZE int = 64 * 1024
 
 var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
 	"You may want to increase allowed_pending_messages in the config\n"
@@ -104,7 +108,7 @@ func (u *UdpListener) udpListen() error {
 	}
 	log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
 
-	buf := make([]byte, u.UDPPacketSize)
+	buf := make([]byte, UDP_MAX_PACKET_SIZE)
 	for {
 		select {
 		case <-u.done:
@@ -115,11 +119,13 @@ func (u *UdpListener) udpListen() error {
 				log.Printf("ERROR: %s\n", err.Error())
 				continue
 			}
+			bufCopy := make([]byte, n)
+			copy(bufCopy, buf[:n])
 
 			select {
-			case u.in <- buf[:n]:
+			case u.in <- bufCopy:
 			default:
-				log.Printf(dropwarn, string(buf[:n]))
+				log.Printf(dropwarn, string(bufCopy))
 			}
 		}
 	}
@@ -155,8 +161,6 @@ func (u *UdpListener) storeMetrics(metrics []telegraf.Metric) error {
 
 func init() {
 	inputs.Add("udp_listener", func() telegraf.Input {
-		return &UdpListener{
-			UDPPacketSize: UDP_PACKET_SIZE,
-		}
+		return &UdpListener{}
 	})
 }
-- 
GitLab