From 6b0e863556414609d9484f40a1e56cf36f9bc2d2 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Mon, 5 Dec 2016 13:00:37 +0000
Subject: [PATCH] Support a telegraf.Metric.Split function

---
 CHANGELOG.md                    |   1 +
 metric.go                       |   4 +
 metric/metric.go                |  75 ++++++++++++++---
 metric/metric_benchmark_test.go |  15 ++++
 metric/metric_test.go           | 144 ++++++++++++++++++++++++++++++++
 5 files changed, 229 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index c945eccd..42847fd2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,7 @@
 - [#1807](https://github.com/influxdata/telegraf/pull/1807): Option to use device name rather than path for reporting disk stats.
 - [#1348](https://github.com/influxdata/telegraf/issues/1348): Telegraf "internal" plugin for collecting stats on itself.
 - [#2127](https://github.com/influxdata/telegraf/pull/2127): Update Go version to 1.7.4.
+- [#2126](https://github.com/influxdata/telegraf/pull/2126): Support a metric.Split function.
 
 ### Bugfixes
 
diff --git a/metric.go b/metric.go
index a29a6334..cb230512 100644
--- a/metric.go
+++ b/metric.go
@@ -22,6 +22,10 @@ type Metric interface {
 	Serialize() []byte
 	String() string // convenience function for string(Serialize())
 	Copy() Metric
+	// Split will attempt to return multiple metrics with the same timestamp
+	// whose string representations are no longer than maxSize.
+	// Metrics with a single field may exceed the requested size.
+	Split(maxSize int) []Metric
 
 	// Tag functions
 	HasTag(key string) bool
diff --git a/metric/metric.go b/metric/metric.go
index 4514d8ec..8a18c0f2 100644
--- a/metric/metric.go
+++ b/metric/metric.go
@@ -178,6 +178,57 @@ func (m *metric) Serialize() []byte {
 	return tmp
 }
 
+func (m *metric) Split(maxSize int) []telegraf.Metric {
+	if m.Len() < maxSize {
+		return []telegraf.Metric{m}
+	}
+	var out []telegraf.Metric
+
+	// constant number of bytes for each metric (in addition to field bytes)
+	constant := len(m.name) + len(m.tags) + len(m.t) + 3
+	// currently selected fields
+	fields := make([]byte, 0, maxSize)
+
+	i := 0
+	for {
+		if i >= len(m.fields) {
+			// hit the end of the field byte slice
+			if len(fields) > 0 {
+				out = append(out, copyWith(m.name, m.tags, fields, m.t))
+			}
+			break
+		}
+
+		// find the end of the next field
+		j := indexUnescapedByte(m.fields[i:], ',')
+		if j == -1 {
+			j = len(m.fields)
+		} else {
+			j += i
+		}
+
+		// if true, then we need to create a metric _not_ including the currently
+		// selected field
+		if len(m.fields[i:j])+len(fields)+constant > maxSize {
+			// if false, then we'll create a metric including the currently
+			// selected field anyways. This means that the given maxSize is too
+			// small for a single field to fit.
+			if len(fields) > 0 {
+				out = append(out, copyWith(m.name, m.tags, fields, m.t))
+			}
+
+			fields = make([]byte, 0, maxSize)
+		}
+		if len(fields) > 0 {
+			fields = append(fields, ',')
+		}
+		fields = append(fields, m.fields[i:j]...)
+
+		i = j + 1
+	}
+	return out
+}
+
 func (m *metric) Fields() map[string]interface{} {
 	fieldMap := map[string]interface{}{}
 	i := 0
@@ -380,17 +431,21 @@ func (m *metric) RemoveField(key string) error {
 }
 
 func (m *metric) Copy() telegraf.Metric {
-	mOut := metric{
-		name:   make([]byte, len(m.name)),
-		tags:   make([]byte, len(m.tags)),
-		fields: make([]byte, len(m.fields)),
-		t:      make([]byte, len(m.t)),
+	return copyWith(m.name, m.tags, m.fields, m.t)
+}
+
+func copyWith(name, tags, fields, t []byte) telegraf.Metric {
+	out := metric{
+		name:   make([]byte, len(name)),
+		tags:   make([]byte, len(tags)),
+		fields: make([]byte, len(fields)),
+		t:      make([]byte, len(t)),
 	}
-	copy(mOut.name, m.name)
-	copy(mOut.tags, m.tags)
-	copy(mOut.fields, m.fields)
-	copy(mOut.t, m.t)
-	return &mOut
+	copy(out.name, name)
+	copy(out.tags, tags)
+	copy(out.fields, fields)
+	copy(out.t, t)
+	return &out
 }
 
 func (m *metric) HashID() uint64 {
diff --git a/metric/metric_benchmark_test.go b/metric/metric_benchmark_test.go
index 86906483..9383fb0d 100644
--- a/metric/metric_benchmark_test.go
+++ b/metric/metric_benchmark_test.go
@@ -50,6 +50,21 @@ func BenchmarkAddTag(b *testing.B) {
 	s = string(mt.String())
 }
 
+func BenchmarkSplit(b *testing.B) {
+	var mt telegraf.Metric
+	mt = &metric{
+		name:   []byte("cpu"),
+		tags:   []byte(",host=localhost"),
+		fields: []byte("a=101,b=10i,c=10101,d=101010,e=42"),
+		t:      []byte("1480614053000000000"),
+	}
+	var metrics []telegraf.Metric
+	for n := 0; n < b.N; n++ {
+		metrics = mt.Split(60)
+	}
+	s = string(metrics[0].String())
+}
+
 func BenchmarkTags(b *testing.B) {
 	for n := 0; n < b.N; n++ {
 		var mt, _ = New("test_metric",
diff --git a/metric/metric_test.go b/metric/metric_test.go
index b373cabd..f209dc3e 100644
--- a/metric/metric_test.go
+++ b/metric/metric_test.go
@@ -3,6 +3,7 @@ package metric
 import (
 	"fmt"
 	"math"
+	"regexp"
 	"testing"
 	"time"
 
@@ -434,6 +435,149 @@ func TestNewCounterMetric(t *testing.T) {
 	assert.Equal(t, now.UnixNano(), m.UnixNano())
 }
 
+// test splitting metric into various max lengths
+func TestSplitMetric(t *testing.T) {
+	now := time.Unix(0, 1480940990034083306)
+	tags := map[string]string{
+		"host": "localhost",
+	}
+	fields := map[string]interface{}{
+		"float":  float64(100001),
+		"int":    int64(100001),
+		"bool":   true,
+		"false":  false,
+		"string": "test",
+	}
+	m, err := New("cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	split80 := m.Split(80)
+	assert.Len(t, split80, 2)
+
+	split70 := m.Split(70)
+	assert.Len(t, split70, 3)
+
+	split60 := m.Split(60)
+	assert.Len(t, split60, 4)
+}
+
+// test splitting metric into various max lengths
+// use a simple regex check to verify that the split metrics are valid
+func TestSplitMetric_RegexVerify(t *testing.T) {
+	now := time.Unix(0, 1480940990034083306)
+	tags := map[string]string{
+		"host": "localhost",
+	}
+	fields := map[string]interface{}{
+		"foo":     float64(98934259085),
+		"bar":     float64(19385292),
+		"number":  float64(19385292),
+		"another": float64(19385292),
+		"n":       float64(19385292),
+	}
+	m, err := New("cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	// verification regex
+	re := regexp.MustCompile(`cpu,host=localhost \w+=\d+(,\w+=\d+)* 1480940990034083306`)
+
+	split90 := m.Split(90)
+	assert.Len(t, split90, 2)
+	for _, splitM := range split90 {
+		assert.True(t, re.Match(splitM.Serialize()), splitM.String())
+	}
+
+	split70 := m.Split(70)
+	assert.Len(t, split70, 3)
+	for _, splitM := range split70 {
+		assert.True(t, re.Match(splitM.Serialize()), splitM.String())
+	}
+
+	split20 := m.Split(20)
+	assert.Len(t, split20, 5)
+	for _, splitM := range split20 {
+		assert.True(t, re.Match(splitM.Serialize()), splitM.String())
+	}
+}
+
+// test splitting metric even when given length is shorter than
+// shortest possible length
+// Split should split metric as short as possible, ie, 1 field per metric
+func TestSplitMetric_TooShort(t *testing.T) {
+	now := time.Unix(0, 1480940990034083306)
+	tags := map[string]string{
+		"host": "localhost",
+	}
+	fields := map[string]interface{}{
+		"float":  float64(100001),
+		"int":    int64(100001),
+		"bool":   true,
+		"false":  false,
+		"string": "test",
+	}
+	m, err := New("cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	split := m.Split(10)
+	assert.Len(t, split, 5)
+	strings := make([]string, 5)
+	for i, splitM := range split {
+		strings[i] = splitM.String()
+	}
+
+	assert.Contains(t, strings, "cpu,host=localhost float=100001 1480940990034083306\n")
+	assert.Contains(t, strings, "cpu,host=localhost int=100001i 1480940990034083306\n")
+	assert.Contains(t, strings, "cpu,host=localhost bool=true 1480940990034083306\n")
+	assert.Contains(t, strings, "cpu,host=localhost false=false 1480940990034083306\n")
+	assert.Contains(t, strings, "cpu,host=localhost string=\"test\" 1480940990034083306\n")
+}
+
+func TestSplitMetric_NoOp(t *testing.T) {
+	now := time.Unix(0, 1480940990034083306)
+	tags := map[string]string{
+		"host": "localhost",
+	}
+	fields := map[string]interface{}{
+		"float":  float64(100001),
+		"int":    int64(100001),
+		"bool":   true,
+		"false":  false,
+		"string": "test",
+	}
+	m, err := New("cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	split := m.Split(1000)
+	assert.Len(t, split, 1)
+	assert.Equal(t, m, split[0])
+}
+
+func TestSplitMetric_OneField(t *testing.T) {
+	now := time.Unix(0, 1480940990034083306)
+	tags := map[string]string{
+		"host": "localhost",
+	}
+	fields := map[string]interface{}{
+		"float": float64(100001),
+	}
+	m, err := New("cpu", tags, fields, now)
+	assert.NoError(t, err)
+
+	assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", m.String())
+
+	split := m.Split(1000)
+	assert.Len(t, split, 1)
+	assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
+
+	split = m.Split(1)
+	assert.Len(t, split, 1)
+	assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
+
+	split = m.Split(40)
+	assert.Len(t, split, 1)
+	assert.Equal(t, "cpu,host=localhost float=100001 1480940990034083306\n", split[0].String())
+}
+
 func TestNewMetricAggregate(t *testing.T) {
 	now := time.Now()
 
-- 
GitLab