From a6797a44d53c469f97182a25ee346fd83b6a0498 Mon Sep 17 00:00:00 2001
From: Jeremy Doupe <jeremy@doupe.com>
Date: Tue, 24 Oct 2017 18:28:52 -0500
Subject: [PATCH] Add history and summary types to telegraf and prometheus
 plugins (#3337)

---
 accumulator.go                                |  12 +
 agent/accumulator.go                          |  22 ++
 metric.go                                     |   2 +
 plugins/inputs/prometheus/parser.go           |   8 +-
 plugins/inputs/prometheus/prometheus.go       |   4 +
 .../prometheus_client/prometheus_client.go    | 214 +++++++++++++-----
 .../prometheus_client_test.go                 | 120 +++++++++-
 testutil/accumulator.go                       |  18 ++
 8 files changed, 325 insertions(+), 75 deletions(-)

diff --git a/accumulator.go b/accumulator.go
index 13fd6e57..370f0c70 100644
--- a/accumulator.go
+++ b/accumulator.go
@@ -28,6 +28,18 @@ type Accumulator interface {
 		tags map[string]string,
 		t ...time.Time)
 
+	// AddSummary is the same as AddFields, but will add the metric as a "Summary" type
+	AddSummary(measurement string,
+		fields map[string]interface{},
+		tags map[string]string,
+		t ...time.Time)
+
+	// AddHistogram is the same as AddFields, but will add the metric as a "Histogram" type
+	AddHistogram(measurement string,
+		fields map[string]interface{},
+		tags map[string]string,
+		t ...time.Time)
+
 	SetPrecision(precision, interval time.Duration)
 
 	AddError(err error)
diff --git a/agent/accumulator.go b/agent/accumulator.go
index 1f9e2270..1fa9b13e 100644
--- a/agent/accumulator.go
+++ b/agent/accumulator.go
@@ -76,6 +76,28 @@ func (ac *accumulator) AddCounter(
 	}
 }
 
+func (ac *accumulator) AddSummary(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	t ...time.Time,
+) {
+	if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Summary, ac.getTime(t)); m != nil {
+		ac.metrics <- m
+	}
+}
+
+func (ac *accumulator) AddHistogram(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	t ...time.Time,
+) {
+	if m := ac.maker.MakeMetric(measurement, fields, tags, telegraf.Histogram, ac.getTime(t)); m != nil {
+		ac.metrics <- m
+	}
+}
+
 // AddError passes a runtime error to the accumulator.
 // The error will be tagged with the plugin name and written to the log.
 func (ac *accumulator) AddError(err error) {
diff --git a/metric.go b/metric.go
index fc479b51..3fb53135 100644
--- a/metric.go
+++ b/metric.go
@@ -13,6 +13,8 @@ const (
 	Counter
 	Gauge
 	Untyped
+	Summary
+	Histogram
 )
 
 type Metric interface {
diff --git a/plugins/inputs/prometheus/parser.go b/plugins/inputs/prometheus/parser.go
index 0807d7e7..6584fbc0 100644
--- a/plugins/inputs/prometheus/parser.go
+++ b/plugins/inputs/prometheus/parser.go
@@ -103,6 +103,10 @@ func valueType(mt dto.MetricType) telegraf.ValueType {
 		return telegraf.Counter
 	case dto.MetricType_GAUGE:
 		return telegraf.Gauge
+	case dto.MetricType_SUMMARY:
+		return telegraf.Summary
+	case dto.MetricType_HISTOGRAM:
+		return telegraf.Histogram
 	default:
 		return telegraf.Untyped
 	}
@@ -145,11 +149,11 @@ func getNameAndValue(m *dto.Metric) map[string]interface{} {
 			fields["gauge"] = float64(m.GetGauge().GetValue())
 		}
 	} else if m.Counter != nil {
-		if !math.IsNaN(m.GetGauge().GetValue()) {
+		if !math.IsNaN(m.GetCounter().GetValue()) {
 			fields["counter"] = float64(m.GetCounter().GetValue())
 		}
 	} else if m.Untyped != nil {
-		if !math.IsNaN(m.GetGauge().GetValue()) {
+		if !math.IsNaN(m.GetUntyped().GetValue()) {
 			fields["value"] = float64(m.GetUntyped().GetValue())
 		}
 	}
diff --git a/plugins/inputs/prometheus/prometheus.go b/plugins/inputs/prometheus/prometheus.go
index 5445a12a..c929a5b2 100644
--- a/plugins/inputs/prometheus/prometheus.go
+++ b/plugins/inputs/prometheus/prometheus.go
@@ -224,6 +224,10 @@ func (p *Prometheus) gatherURL(url UrlAndAddress, acc telegraf.Accumulator) erro
 			acc.AddCounter(metric.Name(), metric.Fields(), tags, metric.Time())
 		case telegraf.Gauge:
 			acc.AddGauge(metric.Name(), metric.Fields(), tags, metric.Time())
+		case telegraf.Summary:
+			acc.AddSummary(metric.Name(), metric.Fields(), tags, metric.Time())
+		case telegraf.Histogram:
+			acc.AddHistogram(metric.Name(), metric.Fields(), tags, metric.Time())
 		default:
 			acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time())
 		}
diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go
index d7702a06..f0b0a767 100644
--- a/plugins/outputs/prometheus_client/prometheus_client.go
+++ b/plugins/outputs/prometheus_client/prometheus_client.go
@@ -8,6 +8,7 @@ import (
 	"os"
 	"regexp"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -28,8 +29,13 @@ type SampleID string
 type Sample struct {
 	// Labels are the Prometheus labels.
 	Labels map[string]string
-	// Value is the value in the Prometheus output.
-	Value float64
+	// Value is the value in the Prometheus output. Only one of these will populated.
+	Value          float64
+	HistogramValue map[float64]uint64
+	SummaryValue   map[float64]float64
+	// Histograms and Summaries need a count and a sum
+	Count uint64
+	Sum   float64
 	// Expiration is the deadline that this Sample is valid until.
 	Expiration time.Time
 }
@@ -38,8 +44,9 @@ type Sample struct {
 type MetricFamily struct {
 	// Samples are the Sample belonging to this MetricFamily.
 	Samples map[SampleID]*Sample
-	// Type of the Value.
-	ValueType prometheus.ValueType
+	// Need the telegraf ValueType because there isn't a Prometheus ValueType
+	// representing Histogram or Summary
+	TelegrafValueType telegraf.ValueType
 	// LabelSet is the label counts for all Samples.
 	LabelSet map[string]int
 }
@@ -189,7 +196,16 @@ func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
 				labels = append(labels, v)
 			}
 
-			metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
+			var metric prometheus.Metric
+			var err error
+			switch family.TelegrafValueType {
+			case telegraf.Summary:
+				metric, err = prometheus.NewConstSummary(desc, sample.Count, sample.Sum, sample.SummaryValue, labels...)
+			case telegraf.Histogram:
+				metric, err = prometheus.NewConstHistogram(desc, sample.Count, sample.Sum, sample.HistogramValue, labels...)
+			default:
+				metric, err = prometheus.NewConstMetric(desc, getPromValueType(family.TelegrafValueType), sample.Value, labels...)
+			}
 			if err != nil {
 				log.Printf("E! Error creating prometheus metric, "+
 					"key: %s, labels: %v,\nerr: %s\n",
@@ -205,7 +221,7 @@ func sanitize(value string) string {
 	return invalidNameCharRE.ReplaceAllString(value, "_")
 }
 
-func valueType(tt telegraf.ValueType) prometheus.ValueType {
+func getPromValueType(tt telegraf.ValueType) prometheus.ValueType {
 	switch tt {
 	case telegraf.Counter:
 		return prometheus.CounterValue
@@ -226,6 +242,30 @@ func CreateSampleID(tags map[string]string) SampleID {
 	return SampleID(strings.Join(pairs, ","))
 }
 
+func addSample(fam *MetricFamily, sample *Sample, sampleID SampleID) {
+
+	for k, _ := range sample.Labels {
+		fam.LabelSet[k]++
+	}
+
+	fam.Samples[sampleID] = sample
+}
+
+func (p *PrometheusClient) addMetricFamily(point telegraf.Metric, sample *Sample, mname string, sampleID SampleID) {
+	var fam *MetricFamily
+	var ok bool
+	if fam, ok = p.fam[mname]; !ok {
+		fam = &MetricFamily{
+			Samples:           make(map[SampleID]*Sample),
+			TelegrafValueType: point.Type(),
+			LabelSet:          make(map[string]int),
+		}
+		p.fam[mname] = fam
+	}
+
+	addSample(fam, sample, sampleID)
+}
+
 func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
 	p.Lock()
 	defer p.Unlock()
@@ -234,7 +274,6 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
 
 	for _, point := range metrics {
 		tags := point.Tags()
-		vt := valueType(point.Type())
 		sampleID := CreateSampleID(tags)
 
 		labels := make(map[string]string)
@@ -251,77 +290,128 @@ func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
 			}
 		}
 
-		for fn, fv := range point.Fields() {
-			// Ignore string and bool fields.
-			var value float64
-			switch fv := fv.(type) {
-			case int64:
-				value = float64(fv)
-			case float64:
-				value = fv
-			default:
-				continue
-			}
+		switch point.Type() {
+		case telegraf.Summary:
+			var mname string
+			var sum float64
+			var count uint64
+			summaryvalue := make(map[float64]float64)
+			for fn, fv := range point.Fields() {
+				var value float64
+				switch fv := fv.(type) {
+				case int64:
+					value = float64(fv)
+				case float64:
+					value = fv
+				default:
+					continue
+				}
 
+				switch fn {
+				case "sum":
+					sum = value
+				case "count":
+					count = uint64(value)
+				default:
+					limit, err := strconv.ParseFloat(fn, 64)
+					if err == nil {
+						summaryvalue[limit] = value
+					}
+				}
+			}
 			sample := &Sample{
-				Labels:     labels,
-				Value:      value,
-				Expiration: now.Add(p.ExpirationInterval.Duration),
+				Labels:       labels,
+				SummaryValue: summaryvalue,
+				Count:        count,
+				Sum:          sum,
+				Expiration:   now.Add(p.ExpirationInterval.Duration),
 			}
+			mname = sanitize(point.Name())
 
-			// Special handling of value field; supports passthrough from
-			// the prometheus input.
+			p.addMetricFamily(point, sample, mname, sampleID)
+
+		case telegraf.Histogram:
 			var mname string
-			switch point.Type() {
-			case telegraf.Counter:
-				if fn == "counter" {
-					mname = sanitize(point.Name())
+			var sum float64
+			var count uint64
+			histogramvalue := make(map[float64]uint64)
+			for fn, fv := range point.Fields() {
+				var value float64
+				switch fv := fv.(type) {
+				case int64:
+					value = float64(fv)
+				case float64:
+					value = fv
+				default:
+					continue
 				}
-			case telegraf.Gauge:
-				if fn == "gauge" {
-					mname = sanitize(point.Name())
+
+				switch fn {
+				case "sum":
+					sum = value
+				case "count":
+					count = uint64(value)
+				default:
+					limit, err := strconv.ParseFloat(fn, 64)
+					if err == nil {
+						histogramvalue[limit] = uint64(value)
+					}
 				}
 			}
-			if mname == "" {
-				if fn == "value" {
-					mname = sanitize(point.Name())
-				} else {
-					mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
-				}
+			sample := &Sample{
+				Labels:         labels,
+				HistogramValue: histogramvalue,
+				Count:          count,
+				Sum:            sum,
+				Expiration:     now.Add(p.ExpirationInterval.Duration),
 			}
+			mname = sanitize(point.Name())
 
-			var fam *MetricFamily
-			var ok bool
-			if fam, ok = p.fam[mname]; !ok {
-				fam = &MetricFamily{
-					Samples:   make(map[SampleID]*Sample),
-					ValueType: vt,
-					LabelSet:  make(map[string]int),
+			p.addMetricFamily(point, sample, mname, sampleID)
+
+		default:
+			for fn, fv := range point.Fields() {
+				// Ignore string and bool fields.
+				var value float64
+				switch fv := fv.(type) {
+				case int64:
+					value = float64(fv)
+				case float64:
+					value = fv
+				default:
+					continue
 				}
-				p.fam[mname] = fam
-			} else {
-				// Metrics can be untyped even though the corresponding plugin
-				// creates them with a type.  This happens when the metric was
-				// transferred over the network in a format that does not
-				// preserve value type and received using an input such as a
-				// queue consumer.  To avoid issues we automatically upgrade
-				// value type from untyped to a typed metric.
-				if fam.ValueType == prometheus.UntypedValue {
-					fam.ValueType = vt
+
+				sample := &Sample{
+					Labels:     labels,
+					Value:      value,
+					Expiration: now.Add(p.ExpirationInterval.Duration),
 				}
 
-				if vt != prometheus.UntypedValue && fam.ValueType != vt {
-					// Don't return an error since this would be a permanent error
-					log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
-					break
+				// Special handling of value field; supports passthrough from
+				// the prometheus input.
+				var mname string
+				switch point.Type() {
+				case telegraf.Counter:
+					if fn == "counter" {
+						mname = sanitize(point.Name())
+					}
+				case telegraf.Gauge:
+					if fn == "gauge" {
+						mname = sanitize(point.Name())
+					}
+				}
+				if mname == "" {
+					if fn == "value" {
+						mname = sanitize(point.Name())
+					} else {
+						mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
+					}
 				}
-			}
 
-			for k, _ := range sample.Labels {
-				fam.LabelSet[k]++
-			}
+				p.addMetricFamily(point, sample, mname, sampleID)
 
-			fam.Samples[sampleID] = sample
+			}
 		}
 	}
 	return nil
diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go
index 1bb1cc83..69509ae1 100644
--- a/plugins/outputs/prometheus_client/prometheus_client_test.go
+++ b/plugins/outputs/prometheus_client/prometheus_client_test.go
@@ -9,7 +9,6 @@ import (
 	"github.com/influxdata/telegraf/metric"
 	prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
 	"github.com/influxdata/telegraf/testutil"
-	"github.com/prometheus/client_golang/prometheus"
 	"github.com/stretchr/testify/require"
 )
 
@@ -45,7 +44,7 @@ func TestWrite_Basic(t *testing.T) {
 
 	fam, ok := client.fam["foo"]
 	require.True(t, ok)
-	require.Equal(t, prometheus.UntypedValue, fam.ValueType)
+	require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
 	require.Equal(t, map[string]int{}, fam.LabelSet)
 
 	sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
@@ -119,7 +118,7 @@ func TestWrite_Counters(t *testing.T) {
 		args       args
 		err        error
 		metricName string
-		promType   prometheus.ValueType
+		valueType  telegraf.ValueType
 	}{
 		{
 			name: "field named value is not added to metric name",
@@ -129,7 +128,7 @@ func TestWrite_Counters(t *testing.T) {
 				valueType:   telegraf.Counter,
 			},
 			metricName: "foo",
-			promType:   prometheus.CounterValue,
+			valueType:  telegraf.Counter,
 		},
 		{
 			name: "field named counter is not added to metric name",
@@ -139,7 +138,7 @@ func TestWrite_Counters(t *testing.T) {
 				valueType:   telegraf.Counter,
 			},
 			metricName: "foo",
-			promType:   prometheus.CounterValue,
+			valueType:  telegraf.Counter,
 		},
 		{
 			name: "field with any other name is added to metric name",
@@ -149,7 +148,7 @@ func TestWrite_Counters(t *testing.T) {
 				valueType:   telegraf.Counter,
 			},
 			metricName: "foo_other",
-			promType:   prometheus.CounterValue,
+			valueType:  telegraf.Counter,
 		},
 	}
 	for _, tt := range tests {
@@ -167,7 +166,7 @@ func TestWrite_Counters(t *testing.T) {
 
 			fam, ok := client.fam[tt.metricName]
 			require.True(t, ok)
-			require.Equal(t, tt.promType, fam.ValueType)
+			require.Equal(t, tt.valueType, fam.TelegrafValueType)
 		})
 	}
 }
@@ -196,20 +195,119 @@ func TestWrite_Sanitize(t *testing.T) {
 }
 
 func TestWrite_Gauge(t *testing.T) {
+	type args struct {
+		measurement string
+		tags        map[string]string
+		fields      map[string]interface{}
+		valueType   telegraf.ValueType
+	}
+	var tests = []struct {
+		name       string
+		args       args
+		err        error
+		metricName string
+		valueType  telegraf.ValueType
+	}{
+		{
+			name: "field named value is not added to metric name",
+			args: args{
+				measurement: "foo",
+				fields:      map[string]interface{}{"value": 42},
+				valueType:   telegraf.Gauge,
+			},
+			metricName: "foo",
+			valueType:  telegraf.Gauge,
+		},
+		{
+			name: "field named gauge is not added to metric name",
+			args: args{
+				measurement: "foo",
+				fields:      map[string]interface{}{"gauge": 42},
+				valueType:   telegraf.Gauge,
+			},
+			metricName: "foo",
+			valueType:  telegraf.Gauge,
+		},
+		{
+			name: "field with any other name is added to metric name",
+			args: args{
+				measurement: "foo",
+				fields:      map[string]interface{}{"other": 42},
+				valueType:   telegraf.Gauge,
+			},
+			metricName: "foo_other",
+			valueType:  telegraf.Gauge,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m, err := metric.New(
+				tt.args.measurement,
+				tt.args.tags,
+				tt.args.fields,
+				time.Now(),
+				tt.args.valueType,
+			)
+			client := NewClient()
+			err = client.Write([]telegraf.Metric{m})
+			require.Equal(t, tt.err, err)
+
+			fam, ok := client.fam[tt.metricName]
+			require.True(t, ok)
+			require.Equal(t, tt.valueType, fam.TelegrafValueType)
+
+		})
+	}
+}
+
+func TestWrite_Summary(t *testing.T) {
 	client := NewClient()
 
 	p1, err := metric.New(
 		"foo",
 		make(map[string]string),
-		map[string]interface{}{"value": 42},
+		map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
 		time.Now(),
-		telegraf.Gauge)
+		telegraf.Summary)
+
 	err = client.Write([]telegraf.Metric{p1})
 	require.NoError(t, err)
 
 	fam, ok := client.fam["foo"]
 	require.True(t, ok)
-	require.Equal(t, prometheus.GaugeValue, fam.ValueType)
+	require.Equal(t, 1, len(fam.Samples))
+
+	sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, 84.0, sample1.Sum)
+	require.Equal(t, uint64(42), sample1.Count)
+	require.Equal(t, 3, len(sample1.SummaryValue))
+}
+
+func TestWrite_Histogram(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"sum": 84, "count": 42, "0": 2, "0.5": 3, "1": 4},
+		time.Now(),
+		telegraf.Histogram)
+
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, 1, len(fam.Samples))
+
+	sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, 84.0, sample1.Sum)
+	require.Equal(t, uint64(42), sample1.Count)
+	require.Equal(t, 3, len(sample1.HistogramValue))
 }
 
 func TestWrite_MixedValueType(t *testing.T) {
@@ -307,7 +405,7 @@ func TestWrite_Tags(t *testing.T) {
 
 	fam, ok := client.fam["foo"]
 	require.True(t, ok)
-	require.Equal(t, prometheus.UntypedValue, fam.ValueType)
+	require.Equal(t, telegraf.Untyped, fam.TelegrafValueType)
 
 	require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
 
diff --git a/testutil/accumulator.go b/testutil/accumulator.go
index c478400e..29c362c8 100644
--- a/testutil/accumulator.go
+++ b/testutil/accumulator.go
@@ -122,6 +122,24 @@ func (a *Accumulator) AddMetrics(metrics []telegraf.Metric) {
 	}
 }
 
+func (a *Accumulator) AddSummary(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	timestamp ...time.Time,
+) {
+	a.AddFields(measurement, fields, tags, timestamp...)
+}
+
+func (a *Accumulator) AddHistogram(
+	measurement string,
+	fields map[string]interface{},
+	tags map[string]string,
+	timestamp ...time.Time,
+) {
+	a.AddFields(measurement, fields, tags, timestamp...)
+}
+
 // AddError appends the given error to Accumulator.Errors.
 func (a *Accumulator) AddError(err error) {
 	if err == nil {
-- 
GitLab