Skip to content
Snippets Groups Projects
Commit da99777f authored by Daniel Nelson's avatar Daniel Nelson Committed by GitHub
Browse files

Only split metrics if there is an udp output (#2799)

parent dd537b33
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,7 @@ package influxdb ...@@ -2,6 +2,7 @@ package influxdb
import ( import (
"fmt" "fmt"
"io"
"log" "log"
"math/rand" "math/rand"
"strings" "strings"
...@@ -41,7 +42,8 @@ type InfluxDB struct { ...@@ -41,7 +42,8 @@ type InfluxDB struct {
// Precision is only here for legacy support. It will be ignored. // Precision is only here for legacy support. It will be ignored.
Precision string Precision string
clients []client.Client clients []client.Client
splitPayload bool
} }
var sampleConfig = ` var sampleConfig = `
...@@ -109,6 +111,7 @@ func (i *InfluxDB) Connect() error { ...@@ -109,6 +111,7 @@ func (i *InfluxDB) Connect() error {
return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err) return fmt.Errorf("Error creating UDP Client [%s]: %s", u, err)
} }
i.clients = append(i.clients, c) i.clients = append(i.clients, c)
i.splitPayload = true
default: default:
// If URL doesn't start with "udp", assume HTTP client // If URL doesn't start with "udp", assume HTTP client
config := client.HTTPConfig{ config := client.HTTPConfig{
...@@ -159,17 +162,26 @@ func (i *InfluxDB) Description() string { ...@@ -159,17 +162,26 @@ func (i *InfluxDB) Description() string {
return "Configuration for influxdb server to send metrics to" return "Configuration for influxdb server to send metrics to"
} }
func (i *InfluxDB) getReader(metrics []telegraf.Metric) io.Reader {
if !i.splitPayload {
return metric.NewReader(metrics)
}
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics {
splitData = append(splitData, m.Split(i.UDPPayload)...)
}
return metric.NewReader(splitData)
}
// Write will choose a random server in the cluster to write to until a successful write // Write will choose a random server in the cluster to write to until a successful write
// occurs, logging each unsuccessful. If all servers fail, return error. // occurs, logging each unsuccessful. If all servers fail, return error.
func (i *InfluxDB) Write(metrics []telegraf.Metric) error { func (i *InfluxDB) Write(metrics []telegraf.Metric) error {
bufsize := 0 bufsize := 0
splitData := make([]telegraf.Metric, 0)
for _, m := range metrics { for _, m := range metrics {
bufsize += m.Len() bufsize += m.Len()
splitData = append(splitData, m.Split(i.UDPPayload)...)
} }
r := metric.NewReader(splitData) r := i.getReader(metrics)
// This will get set to nil if a successful write occurs // This will get set to nil if a successful write occurs
err := fmt.Errorf("Could not write to any InfluxDB server in cluster") err := fmt.Errorf("Could not write to any InfluxDB server in cluster")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment