Skip to content
Snippets Groups Projects
Commit 30464396 authored by Cameron Sparr's avatar Cameron Sparr
Browse files

Make the UDP input buffer only once

parent 64066c4e
No related branches found
No related tags found
No related merge requests found
## v0.12.1 [unreleased]
### Features
- [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
### Bugfixes
- [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)
......
......@@ -18,7 +18,9 @@ import (
)
const (
UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
UDP_PACKET_SIZE int = 65507
defaultFieldName = "value"
......@@ -157,10 +159,6 @@ const sampleConfig = `
## calculation of percentiles. Raising this limit increases the accuracy
## of percentiles but also increases the memory usage and cpu time.
percentile_limit = 1000
## UDP packet size for the server to listen for. This will depend on the size
## of the packets that the client is sending, which is usually 1500 bytes.
udp_packet_size = 1500
`
func (_ *Statsd) SampleConfig() string {
......@@ -274,12 +272,12 @@ func (s *Statsd) udpListen() error {
}
log.Println("Statsd listener listening on: ", s.listener.LocalAddr().String())
buf := make([]byte, s.UDPPacketSize)
for {
select {
case <-s.done:
return nil
default:
buf := make([]byte, s.UDPPacketSize)
n, _, err := s.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR READ: %s\n", err.Error())
......@@ -300,11 +298,12 @@ func (s *Statsd) udpListen() error {
// single statsd metric into a struct.
func (s *Statsd) parser() error {
defer s.wg.Done()
var packet []byte
for {
select {
case <-s.done:
return nil
case packet := <-s.in:
case packet = <-s.in:
lines := strings.Split(string(packet), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
......@@ -631,8 +630,8 @@ func (s *Statsd) Stop() {
func init() {
inputs.Add("statsd", func() telegraf.Input {
return &Statsd{
ConvertNames: true,
UDPPacketSize: UDP_PACKET_SIZE,
MetricSeparator: "_",
UDPPacketSize: UDP_PACKET_SIZE,
}
})
}
......@@ -30,7 +30,9 @@ type UdpListener struct {
listener *net.UDPConn
}
const UDP_PACKET_SIZE int = 1500
// UDP packet limit, see
// https://en.wikipedia.org/wiki/User_Datagram_Protocol#Packet_structure
const UDP_PACKET_SIZE int = 65507
var dropwarn = "ERROR: Message queue full. Discarding line [%s] " +
"You may want to increase allowed_pending_messages in the config\n"
......@@ -43,11 +45,6 @@ const sampleConfig = `
## UDP listener will start dropping packets.
allowed_pending_messages = 10000
## UDP packet size for the server to listen for. This will depend
## on the size of the packets that the client is sending, which is
## usually 1500 bytes, but can be as large as 65,535 bytes.
udp_packet_size = 1500
## Data format to consume.
## Each data format has it's own unique set of configuration options, read
## more about them here:
......@@ -107,12 +104,12 @@ func (u *UdpListener) udpListen() error {
}
log.Println("UDP server listening on: ", u.listener.LocalAddr().String())
buf := make([]byte, u.UDPPacketSize)
for {
select {
case <-u.done:
return nil
default:
buf := make([]byte, u.UDPPacketSize)
n, _, err := u.listener.ReadFromUDP(buf)
if err != nil && !strings.Contains(err.Error(), "closed network") {
log.Printf("ERROR: %s\n", err.Error())
......@@ -130,11 +127,13 @@ func (u *UdpListener) udpListen() error {
func (u *UdpListener) udpParser() error {
defer u.wg.Done()
var packet []byte
for {
select {
case <-u.done:
return nil
case packet := <-u.in:
case packet = <-u.in:
metrics, err := u.parser.Parse(packet)
if err == nil {
u.storeMetrics(metrics)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment