From b99cd14129ddbb7c3dbcf490d008b2eec525b9a9 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Thu, 29 Mar 2018 13:31:43 -0700
Subject: [PATCH] Add influx uint support as a runtime option (#3948)

---
 Makefile                                  |  5 +--
 docs/DATA_FORMATS_OUTPUT.md               | 28 +++++++++---
 internal/config/config.go                 | 13 ++++++
 metric/metric.go                          | 31 ++-----------
 plugins/outputs/influxdb/README.md        |  6 +++
 plugins/outputs/influxdb/http.go          |  3 +-
 plugins/outputs/influxdb/influxdb.go      | 10 +++++
 plugins/parsers/influx/handler.go         | 10 ++---
 plugins/parsers/influx/parser_test.go     | 52 ++++++++++++++++------
 plugins/serializers/influx/influx.go      | 31 ++++++++++---
 plugins/serializers/influx/influx_test.go | 53 ++++++++++++++---------
 plugins/serializers/registry.go           | 10 +++++
 12 files changed, 168 insertions(+), 84 deletions(-)

diff --git a/Makefile b/Makefile
index ffa3be0b..b9832fb6 100644
--- a/Makefile
+++ b/Makefile
@@ -36,7 +36,7 @@ deps:
 	gdm restore
 
 telegraf:
-	go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" $(BUILDFLAGS) ./cmd/telegraf/telegraf.go
+	go build -i -o $(TELEGRAF) -ldflags "$(LDFLAGS)" ./cmd/telegraf/telegraf.go
 
 go-install:
 	go install -ldflags "-w -s $(LDFLAGS)" ./cmd/telegraf
@@ -62,9 +62,6 @@ fmtcheck:
 	fi
 	@echo '[INFO] done.'
 
-uint64:
-	BUILDFLAGS="-tags uint64" $(MAKE) all
-
 lint:
 	golint ./...
 
diff --git a/docs/DATA_FORMATS_OUTPUT.md b/docs/DATA_FORMATS_OUTPUT.md
index 13678038..5716b0ae 100644
--- a/docs/DATA_FORMATS_OUTPUT.md
+++ b/docs/DATA_FORMATS_OUTPUT.md
@@ -2,12 +2,12 @@
 
 Telegraf is able to serialize metrics into the following output data formats:
 
-1. [InfluxDB Line Protocol](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#influx)
-1. [JSON](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#json)
-1. [Graphite](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md#graphite)
+1. [InfluxDB Line Protocol](#influx)
+1. [JSON](#json)
+1. [Graphite](#graphite)
 
 Telegraf metrics, like InfluxDB
-[points](https://docs.influxdata.com/influxdb/v0.10/write_protocols/line/),
+[points](https://docs.influxdata.com/influxdb/latest/concepts/glossary/#point),
 are a combination of four basic parts:
 
 1. Measurement Name
@@ -49,8 +49,10 @@ I'll go over below.
 
 # Influx:
 
-There are no additional configuration options for InfluxDB line-protocol. The
-metrics are serialized directly into InfluxDB line-protocol.
+The `influx` format outputs data as
+[InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/).
+This is the recommended format to use unless another format is required for
+interoperability.
 
 ### Influx Configuration:
 
@@ -64,6 +66,20 @@ metrics are serialized directly into InfluxDB line-protocol.
   ## more about them here:
   ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
   data_format = "influx"
+
+  ## Maximum line length in bytes.  Useful only for debugging.
+  # influx_max_line_bytes = 0
+
+  ## When true, fields will be output in ascending lexical order.  Enabling
+  ## this option will result in decreased performance and is only recommended
+  ## when you need predictable ordering while debugging.
+  # influx_sort_fields = false
+
+  ## When true, Telegraf will output unsigned integers as unsigned values,
+  ## i.e.: `42u`.  You will need a version of InfluxDB supporting unsigned
+  ## integer values.  Enabling this option will result in field type errors if
+  ## existing data has been written.
+  # influx_uint_support = false
 ```
 
 # Graphite:
diff --git a/internal/config/config.go b/internal/config/config.go
index db1e1f82..fe8cac5a 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -1391,6 +1391,18 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
 		}
 	}
 
+	if node, ok := tbl.Fields["influx_uint_support"]; ok {
+		if kv, ok := node.(*ast.KeyValue); ok {
+			if b, ok := kv.Value.(*ast.Boolean); ok {
+				var err error
+				c.InfluxUintSupport, err = b.Boolean()
+				if err != nil {
+					return nil, err
+				}
+			}
+		}
+	}
+
 	if node, ok := tbl.Fields["json_timestamp_units"]; ok {
 		if kv, ok := node.(*ast.KeyValue); ok {
 			if str, ok := kv.Value.(*ast.String); ok {
@@ -1409,6 +1421,7 @@ func buildSerializer(name string, tbl *ast.Table) (serializers.Serializer, error
 
 	delete(tbl.Fields, "influx_max_line_bytes")
 	delete(tbl.Fields, "influx_sort_fields")
+	delete(tbl.Fields, "influx_uint_support")
 	delete(tbl.Fields, "data_format")
 	delete(tbl.Fields, "prefix")
 	delete(tbl.Fields, "template")
diff --git a/metric/metric.go b/metric/metric.go
index 7c8ca113..23c48697 100644
--- a/metric/metric.go
+++ b/metric/metric.go
@@ -9,18 +9,6 @@ import (
 	"github.com/influxdata/telegraf"
 )
 
-const MaxInt = int(^uint(0) >> 1)
-
-// enableUint64Support will enable uint64 support if set to true.
-var enableUint64Support = false
-
-// EnableUintSupport manually enables uint support for convertValue.
-// This function will be removed in the future and only exists for unit tests during the
-// transition.
-func EnableUintSupport() {
-	enableUint64Support = true
-}
-
 type metric struct {
 	name   string
 	tags   []*telegraf.Tag
@@ -269,19 +257,8 @@ func convertField(v interface{}) interface{} {
 	case int:
 		return int64(v)
 	case uint:
-		if v <= uint(MaxInt) {
-			return int64(v)
-		} else {
-			return int64(MaxInt)
-		}
+		return uint64(v)
 	case uint64:
-		if enableUint64Support == false {
-			if v <= uint64(MaxInt) {
-				return int64(v)
-			} else {
-				return int64(MaxInt)
-			}
-		}
 		return uint64(v)
 	case []byte:
 		return string(v)
@@ -292,11 +269,11 @@ func convertField(v interface{}) interface{} {
 	case int8:
 		return int64(v)
 	case uint32:
-		return int64(v)
+		return uint64(v)
 	case uint16:
-		return int64(v)
+		return uint64(v)
 	case uint8:
-		return int64(v)
+		return uint64(v)
 	case float32:
 		return float64(v)
 	default:
diff --git a/plugins/outputs/influxdb/README.md b/plugins/outputs/influxdb/README.md
index aafe83da..bac25cf5 100644
--- a/plugins/outputs/influxdb/README.md
+++ b/plugins/outputs/influxdb/README.md
@@ -59,4 +59,10 @@ This InfluxDB output plugin writes metrics to the [InfluxDB](https://github.com/
   ## HTTP Content-Encoding for write request body, can be set to "gzip" to
   ## compress body or "identity" to apply no encoding.
   # content_encoding = "identity"
+
+  ## When true, Telegraf will output unsigned integers as unsigned values,
+  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
+  ## integer values.  Enabling this option will result in field type errors if
+  ## existing data has been written.
+  # influx_uint_support = false
 ```
diff --git a/plugins/outputs/influxdb/http.go b/plugins/outputs/influxdb/http.go
index fdbb5bd8..f32054f4 100644
--- a/plugins/outputs/influxdb/http.go
+++ b/plugins/outputs/influxdb/http.go
@@ -102,7 +102,8 @@ type HTTPConfig struct {
 	RetentionPolicy string
 	Consistency     string
 
-	Serializer *influx.Serializer
+	InfluxUintSupport bool `toml:"influx_uint_support"`
+	Serializer        *influx.Serializer
 }
 
 type httpClient struct {
diff --git a/plugins/outputs/influxdb/influxdb.go b/plugins/outputs/influxdb/influxdb.go
index 2e6e0d9c..3d1a895a 100644
--- a/plugins/outputs/influxdb/influxdb.go
+++ b/plugins/outputs/influxdb/influxdb.go
@@ -45,6 +45,7 @@ type InfluxDB struct {
 	HTTPHeaders          map[string]string `toml:"http_headers"`
 	ContentEncoding      string            `toml:"content_encoding"`
 	SkipDatabaseCreation bool              `toml:"skip_database_creation"`
+	InfluxUintSupport    bool              `toml:"influx_uint_support"`
 
 	// Path to CA file
 	SSLCA string `toml:"ssl_ca"`
@@ -119,6 +120,12 @@ var sampleConfig = `
   ## HTTP Content-Encoding for write request body, can be set to "gzip" to
   ## compress body or "identity" to apply no encoding.
   # content_encoding = "identity"
+
+  ## When true, Telegraf will output unsigned integers as unsigned values,
+  ## i.e.: "42u".  You will need a version of InfluxDB supporting unsigned
+  ## integer values.  Enabling this option will result in field type errors if
+  ## existing data has been written.
+  # influx_uint_support = false
 `
 
 func (i *InfluxDB) Connect() error {
@@ -135,6 +142,9 @@ func (i *InfluxDB) Connect() error {
 	}
 
 	i.serializer = influx.NewSerializer()
+	if i.InfluxUintSupport {
+		i.serializer.SetFieldTypeSupport(influx.UintSupport)
+	}
 
 	for _, u := range urls {
 		u, err := url.Parse(u)
diff --git a/plugins/parsers/influx/handler.go b/plugins/parsers/influx/handler.go
index 8c17cf9c..a7588379 100644
--- a/plugins/parsers/influx/handler.go
+++ b/plugins/parsers/influx/handler.go
@@ -48,7 +48,7 @@ func (h *MetricHandler) AddInt(key []byte, value []byte) {
 	fk := unescape(key)
 	fv, err := parseIntBytes(bytes.TrimSuffix(value, []byte("i")), 10, 64)
 	if err != nil {
-		log.Errorf("E! Received unparseable int value: %q", value)
+		log.Errorf("E! Received unparseable int value: %q: %v", value, err)
 		return
 	}
 	h.builder.AddField(fk, fv)
@@ -58,7 +58,7 @@ func (h *MetricHandler) AddUint(key []byte, value []byte) {
 	fk := unescape(key)
 	fv, err := parseUintBytes(bytes.TrimSuffix(value, []byte("u")), 10, 64)
 	if err != nil {
-		log.Errorf("E! Received unparseable uint value: %q", value)
+		log.Errorf("E! Received unparseable uint value: %q: %v", value, err)
 		return
 	}
 	h.builder.AddField(fk, fv)
@@ -68,7 +68,7 @@ func (h *MetricHandler) AddFloat(key []byte, value []byte) {
 	fk := unescape(key)
 	fv, err := parseFloatBytes(value, 64)
 	if err != nil {
-		log.Errorf("E! Received unparseable float value: %q", value)
+		log.Errorf("E! Received unparseable float value: %q: %v", value, err)
 		return
 	}
 	h.builder.AddField(fk, fv)
@@ -84,7 +84,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
 	fk := unescape(key)
 	fv, err := parseBoolBytes(value)
 	if err != nil {
-		log.Errorf("E! Received unparseable boolean value: %q", value)
+		log.Errorf("E! Received unparseable boolean value: %q: %v", value, err)
 		return
 	}
 	h.builder.AddField(fk, fv)
@@ -93,7 +93,7 @@ func (h *MetricHandler) AddBool(key []byte, value []byte) {
 func (h *MetricHandler) SetTimestamp(tm []byte) {
 	v, err := parseIntBytes(tm, 10, 64)
 	if err != nil {
-		log.Errorf("E! Received unparseable timestamp: %q", tm)
+		log.Errorf("E! Received unparseable timestamp: %q: %v", tm, err)
 		return
 	}
 	ns := v * int64(h.precision)
diff --git a/plugins/parsers/influx/parser_test.go b/plugins/parsers/influx/parser_test.go
index b57eacfa..59d4d7f1 100644
--- a/plugins/parsers/influx/parser_test.go
+++ b/plugins/parsers/influx/parser_test.go
@@ -16,12 +16,6 @@ func Metric(v telegraf.Metric, err error) telegraf.Metric {
 	return v
 }
 
-const (
-	Uint64Overflow uint64 = 9223372036854775808
-	Uint64Max      uint64 = 18446744073709551615
-	Uint64Test     uint64 = 42
-)
-
 var DefaultTime = func() time.Time {
 	return time.Unix(42, 0)
 }
@@ -263,15 +257,30 @@ var ptests = []struct {
 		err: nil,
 	},
 	{
-		name:  "field uint",
-		input: []byte("cpu value=42u"),
+		name:  "field int overflow dropped",
+		input: []byte("cpu value=9223372036854775808i"),
+		metrics: []telegraf.Metric{
+			Metric(
+				metric.New(
+					"cpu",
+					map[string]string{},
+					map[string]interface{}{},
+					time.Unix(42, 0),
+				),
+			),
+		},
+		err: nil,
+	},
+	{
+		name:  "field int max value",
+		input: []byte("cpu value=9223372036854775807i"),
 		metrics: []telegraf.Metric{
 			Metric(
 				metric.New(
 					"cpu",
 					map[string]string{},
 					map[string]interface{}{
-						"value": Uint64Test,
+						"value": 9223372036854775807,
 					},
 					time.Unix(42, 0),
 				),
@@ -280,15 +289,15 @@ var ptests = []struct {
 		err: nil,
 	},
 	{
-		name:  "field uint int overflow",
-		input: []byte("cpu value=9223372036854775808u"),
+		name:  "field uint",
+		input: []byte("cpu value=42u"),
 		metrics: []telegraf.Metric{
 			Metric(
 				metric.New(
 					"cpu",
 					map[string]string{},
 					map[string]interface{}{
-						"value": Uint64Overflow,
+						"value": uint64(42),
 					},
 					time.Unix(42, 0),
 				),
@@ -297,7 +306,22 @@ var ptests = []struct {
 		err: nil,
 	},
 	{
-		name:  "field uint maximum",
+		name:  "field uint overflow dropped",
+		input: []byte("cpu value=18446744073709551616u"),
+		metrics: []telegraf.Metric{
+			Metric(
+				metric.New(
+					"cpu",
+					map[string]string{},
+					map[string]interface{}{},
+					time.Unix(42, 0),
+				),
+			),
+		},
+		err: nil,
+	},
+	{
+		name:  "field uint max value",
 		input: []byte("cpu value=18446744073709551615u"),
 		metrics: []telegraf.Metric{
 			Metric(
@@ -305,7 +329,7 @@ var ptests = []struct {
 					"cpu",
 					map[string]string{},
 					map[string]interface{}{
-						"value": Uint64Max,
+						"value": uint64(18446744073709551615),
 					},
 					time.Unix(42, 0),
 				),
diff --git a/plugins/serializers/influx/influx.go b/plugins/serializers/influx/influx.go
index 32138001..7d68ed3d 100644
--- a/plugins/serializers/influx/influx.go
+++ b/plugins/serializers/influx/influx.go
@@ -20,6 +20,12 @@ const (
 	SortFields
 )
 
+type FieldTypeSupport int
+
+const (
+	UintSupport FieldTypeSupport = 1 << iota
+)
+
 var (
 	ErrNeedMoreSpace    = errors.New("need more space")
 	ErrInvalidName      = errors.New("invalid name")
@@ -32,9 +38,10 @@ var (
 
 // Serializer is a serializer for line protocol.
 type Serializer struct {
-	maxLineBytes   int
-	bytesWritten   int
-	fieldSortOrder FieldSortOrder
+	maxLineBytes     int
+	bytesWritten     int
+	fieldSortOrder   FieldSortOrder
+	fieldTypeSupport FieldTypeSupport
 
 	buf    bytes.Buffer
 	header []byte
@@ -61,6 +68,10 @@ func (s *Serializer) SetFieldSortOrder(order FieldSortOrder) {
 	s.fieldSortOrder = order
 }
 
+func (s *Serializer) SetFieldTypeSupport(typeSupport FieldTypeSupport) {
+	s.fieldTypeSupport = typeSupport
+}
+
 // Serialize writes the telegraf.Metric to a byte slice.  May produce multiple
 // lines of output if longer than maximum line length.  Lines are terminated
 // with a newline (LF) char.
@@ -142,7 +153,7 @@ func (s *Serializer) buildFieldPair(key string, value interface{}) error {
 
 	s.pair = append(s.pair, key...)
 	s.pair = append(s.pair, '=')
-	pair, err := appendFieldValue(s.pair, value)
+	pair, err := s.appendFieldValue(s.pair, value)
 	if err != nil {
 		return err
 	}
@@ -235,10 +246,18 @@ func (s *Serializer) writeMetric(w io.Writer, m telegraf.Metric) error {
 
 }
 
-func appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
+func (s *Serializer) appendFieldValue(buf []byte, value interface{}) ([]byte, error) {
 	switch v := value.(type) {
 	case uint64:
-		return appendUintField(buf, v), nil
+		if s.fieldTypeSupport&UintSupport != 0 {
+			return appendUintField(buf, v), nil
+		} else {
+			if v <= uint64(MaxInt) {
+				return appendIntField(buf, int64(v)), nil
+			} else {
+				return appendIntField(buf, int64(MaxInt)), nil
+			}
+		}
 	case int64:
 		return appendIntField(buf, v), nil
 	case float64:
diff --git a/plugins/serializers/influx/influx_test.go b/plugins/serializers/influx/influx_test.go
index eaf55a9e..e3151646 100644
--- a/plugins/serializers/influx/influx_test.go
+++ b/plugins/serializers/influx/influx_test.go
@@ -11,26 +11,19 @@ import (
 )
 
 func MustMetric(v telegraf.Metric, err error) telegraf.Metric {
-	// Force uint support to be enabled for testing.
-	metric.EnableUintSupport()
 	if err != nil {
 		panic(err)
 	}
 	return v
 }
 
-const (
-	Uint64Overflow uint64 = 9223372036854775808
-	Uint64Max      uint64 = 18446744073709551615
-	Uint64Test     uint64 = 42
-)
-
 var tests = []struct {
-	name     string
-	maxBytes int
-	input    telegraf.Metric
-	output   []byte
-	err      error
+	name        string
+	maxBytes    int
+	typeSupport FieldTypeSupport
+	input       telegraf.Metric
+	output      []byte
+	err         error
 }{
 	{
 		name: "minimal",
@@ -143,40 +136,56 @@ var tests = []struct {
 				"cpu",
 				map[string]string{},
 				map[string]interface{}{
-					"value": Uint64Test,
+					"value": uint64(42),
+				},
+				time.Unix(0, 0),
+			),
+		),
+		output:      []byte("cpu value=42u 0\n"),
+		typeSupport: UintSupport,
+	},
+	{
+		name: "uint field max value",
+		input: MustMetric(
+			metric.New(
+				"cpu",
+				map[string]string{},
+				map[string]interface{}{
+					"value": uint64(18446744073709551615),
 				},
 				time.Unix(0, 0),
 			),
 		),
-		output: []byte("cpu value=42u 0\n"),
+		output:      []byte("cpu value=18446744073709551615u 0\n"),
+		typeSupport: UintSupport,
 	},
 	{
-		name: "uint field int64 overflow",
+		name: "uint field no uint support",
 		input: MustMetric(
 			metric.New(
 				"cpu",
 				map[string]string{},
 				map[string]interface{}{
-					"value": Uint64Overflow,
+					"value": uint64(42),
 				},
 				time.Unix(0, 0),
 			),
 		),
-		output: []byte("cpu value=9223372036854775808u 0\n"),
+		output: []byte("cpu value=42i 0\n"),
 	},
 	{
-		name: "uint field uint64 max",
+		name: "uint field no uint support overflow",
 		input: MustMetric(
 			metric.New(
 				"cpu",
 				map[string]string{},
 				map[string]interface{}{
-					"value": Uint64Max,
+					"value": uint64(18446744073709551615),
 				},
 				time.Unix(0, 0),
 			),
 		),
-		output: []byte("cpu value=18446744073709551615u 0\n"),
+		output: []byte("cpu value=9223372036854775807i 0\n"),
 	},
 	{
 		name: "bool field",
@@ -358,6 +367,7 @@ func TestSerializer(t *testing.T) {
 			serializer := NewSerializer()
 			serializer.SetMaxLineBytes(tt.maxBytes)
 			serializer.SetFieldSortOrder(SortFields)
+			serializer.SetFieldTypeSupport(tt.typeSupport)
 			output, err := serializer.Serialize(tt.input)
 			require.Equal(t, tt.err, err)
 			require.Equal(t, string(tt.output), string(output))
@@ -370,6 +380,7 @@ func BenchmarkSerializer(b *testing.B) {
 		b.Run(tt.name, func(b *testing.B) {
 			serializer := NewSerializer()
 			serializer.SetMaxLineBytes(tt.maxBytes)
+			serializer.SetFieldTypeSupport(tt.typeSupport)
 			for n := 0; n < b.N; n++ {
 				output, err := serializer.Serialize(tt.input)
 				_ = err
diff --git a/plugins/serializers/registry.go b/plugins/serializers/registry.go
index 3389ec59..431112b2 100644
--- a/plugins/serializers/registry.go
+++ b/plugins/serializers/registry.go
@@ -40,6 +40,9 @@ type Config struct {
 	// than unsorted fields; influx format only
 	InfluxSortFields bool
 
+	// Support unsigned integer output; influx format only
+	InfluxUintSupport bool
+
 	// Prefix to add to all measurements, only supports Graphite
 	Prefix string
 
@@ -77,9 +80,16 @@ func NewInfluxSerializerConfig(config *Config) (Serializer, error) {
 	if config.InfluxSortFields {
 		sort = influx.SortFields
 	}
+
+	var typeSupport influx.FieldTypeSupport
+	if config.InfluxUintSupport {
+		typeSupport = typeSupport + influx.UintSupport
+	}
+
 	s := influx.NewSerializer()
 	s.SetMaxLineBytes(config.InfluxMaxLineBytes)
 	s.SetFieldSortOrder(sort)
+	s.SetFieldTypeSupport(typeSupport)
 	return s, nil
 }
 
-- 
GitLab