From 949072e8dc9f000f77db70bf1daed5408d33f035 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Tue, 13 Jun 2017 18:04:26 -0700
Subject: [PATCH] Ensure prometheus metrics have same set of labels (#2857)

---
 .../prometheus_client/prometheus_client.go    | 215 ++++++----
 .../prometheus_client_test.go                 | 369 +++++++++++++++---
 2 files changed, 455 insertions(+), 129 deletions(-)

diff --git a/plugins/outputs/prometheus_client/prometheus_client.go b/plugins/outputs/prometheus_client/prometheus_client.go
index 8c52b335..137bf309 100644
--- a/plugins/outputs/prometheus_client/prometheus_client.go
+++ b/plugins/outputs/prometheus_client/prometheus_client.go
@@ -6,6 +6,8 @@ import (
 	"log"
 	"net/http"
 	"regexp"
+	"sort"
+	"strings"
 	"sync"
 	"time"
 
@@ -17,19 +19,40 @@ import (
 
 var invalidNameCharRE = regexp.MustCompile(`[^a-zA-Z0-9_]`)
 
-type MetricWithExpiration struct {
-	Metric     prometheus.Metric
+// SampleID uniquely identifies a Sample
+type SampleID string
+
+// Sample represents the current value of a series.
+type Sample struct {
+	// Labels are the Prometheus labels.
+	Labels map[string]string
+	// Value is the value in the Prometheus output.
+	Value float64
+	// Expiration is the deadline that this Sample is valid until.
 	Expiration time.Time
 }
 
+// MetricFamily contains the data required to build valid prometheus Metrics.
+type MetricFamily struct {
+	// Samples are the Sample belonging to this MetricFamily.
+	Samples map[SampleID]*Sample
+	// Type of the Value.
+	ValueType prometheus.ValueType
+	// LabelSet is the label counts for all Samples.
+	LabelSet map[string]int
+}
+
 type PrometheusClient struct {
 	Listen             string
 	ExpirationInterval internal.Duration `toml:"expiration_interval"`
-	server             *http.Server
 
-	metrics map[string]*MetricWithExpiration
+	server *http.Server
 
 	sync.Mutex
+	// fam is the non-expired MetricFamily by Prometheus metric name.
+	fam map[string]*MetricFamily
+	// now returns the current time.
+	now func() time.Time
 }
 
 var sampleConfig = `
@@ -41,7 +64,6 @@ var sampleConfig = `
 `
 
 func (p *PrometheusClient) Start() error {
-	p.metrics = make(map[string]*MetricWithExpiration)
 	prometheus.Register(p)
 
 	if p.Listen == "" {
@@ -88,96 +110,153 @@ func (p *PrometheusClient) Describe(ch chan<- *prometheus.Desc) {
 	prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
 }
 
-// Implements prometheus.Collector
+// Expire removes Samples that have expired.
+func (p *PrometheusClient) Expire() {
+	now := p.now()
+	for name, family := range p.fam {
+		for key, sample := range family.Samples {
+			if p.ExpirationInterval.Duration != 0 && now.After(sample.Expiration) {
+				for k, _ := range sample.Labels {
+					family.LabelSet[k]--
+				}
+				delete(family.Samples, key)
+
+				if len(family.Samples) == 0 {
+					delete(p.fam, name)
+				}
+			}
+		}
+	}
+}
+
+// Collect implements prometheus.Collector
 func (p *PrometheusClient) Collect(ch chan<- prometheus.Metric) {
 	p.Lock()
 	defer p.Unlock()
 
-	for key, m := range p.metrics {
-		if p.ExpirationInterval.Duration != 0 && time.Now().After(m.Expiration) {
-			delete(p.metrics, key)
-		} else {
-			ch <- m.Metric
+	p.Expire()
+
+	for name, family := range p.fam {
+		// Get list of all labels on MetricFamily
+		var labelNames []string
+		for k, v := range family.LabelSet {
+			if v > 0 {
+				labelNames = append(labelNames, k)
+			}
+		}
+		desc := prometheus.NewDesc(name, "Telegraf collected metric", labelNames, nil)
+
+		for _, sample := range family.Samples {
+			// Get labels for this sample; unset labels will be set to the
+			// empty string
+			var labels []string
+			for _, label := range labelNames {
+				v := sample.Labels[label]
+				labels = append(labels, v)
+			}
+
+			metric, err := prometheus.NewConstMetric(desc, family.ValueType, sample.Value, labels...)
+			if err != nil {
+				log.Printf("E! Error creating prometheus metric, "+
+					"key: %s, labels: %v,\nerr: %s\n",
+					name, labels, err.Error())
+			}
+
+			ch <- metric
 		}
 	}
 }
 
+func sanitize(value string) string {
+	return invalidNameCharRE.ReplaceAllString(value, "_")
+}
+
+func valueType(tt telegraf.ValueType) prometheus.ValueType {
+	switch tt {
+	case telegraf.Counter:
+		return prometheus.CounterValue
+	case telegraf.Gauge:
+		return prometheus.GaugeValue
+	default:
+		return prometheus.UntypedValue
+	}
+}
+
+// CreateSampleID creates a SampleID based on the tags of a telegraf.Metric.
+func CreateSampleID(tags map[string]string) SampleID {
+	pairs := make([]string, 0, len(tags))
+	for k, v := range tags {
+		pairs = append(pairs, fmt.Sprintf("%s=%s", k, v))
+	}
+	sort.Strings(pairs)
+	return SampleID(strings.Join(pairs, ","))
+}
+
 func (p *PrometheusClient) Write(metrics []telegraf.Metric) error {
 	p.Lock()
 	defer p.Unlock()
 
-	if len(metrics) == 0 {
-		return nil
-	}
+	now := p.now()
 
 	for _, point := range metrics {
-		key := point.Name()
-		key = invalidNameCharRE.ReplaceAllString(key, "_")
-
-		// convert tags into prometheus labels
-		var labels []string
-		l := prometheus.Labels{}
-		for k, v := range point.Tags() {
-			k = invalidNameCharRE.ReplaceAllString(k, "_")
-			if len(k) == 0 {
-				continue
-			}
-			labels = append(labels, k)
-			l[k] = v
-		}
+		tags := point.Tags()
+		vt := valueType(point.Type())
+		sampleID := CreateSampleID(tags)
 
-		// Get a type if it's available, defaulting to Untyped
-		var mType prometheus.ValueType
-		switch point.Type() {
-		case telegraf.Counter:
-			mType = prometheus.CounterValue
-		case telegraf.Gauge:
-			mType = prometheus.GaugeValue
-		default:
-			mType = prometheus.UntypedValue
+		labels := make(map[string]string)
+		for k, v := range tags {
+			labels[sanitize(k)] = sanitize(v)
 		}
 
-		for n, val := range point.Fields() {
+		for fn, fv := range point.Fields() {
 			// Ignore string and bool fields.
-			switch val.(type) {
-			case string:
-				continue
-			case bool:
+			var value float64
+			switch fv := fv.(type) {
+			case int64:
+				value = float64(fv)
+			case float64:
+				value = fv
+			default:
 				continue
 			}
 
-			// sanitize the measurement name
-			n = invalidNameCharRE.ReplaceAllString(n, "_")
+			sample := &Sample{
+				Labels:     labels,
+				Value:      value,
+				Expiration: now.Add(p.ExpirationInterval.Duration),
+			}
+
+			// Special handling of value field; supports passthrough from
+			// the prometheus input.
 			var mname string
-			if n == "value" {
-				mname = key
+			if fn == "value" {
+				mname = sanitize(point.Name())
 			} else {
-				mname = fmt.Sprintf("%s_%s", key, n)
+				mname = sanitize(fmt.Sprintf("%s_%s", point.Name(), fn))
 			}
 
-			desc := prometheus.NewDesc(mname, "Telegraf collected metric", nil, l)
-			var metric prometheus.Metric
-			var err error
-
-			// switch for field type
-			switch val := val.(type) {
-			case int64:
-				metric, err = prometheus.NewConstMetric(desc, mType, float64(val))
-			case float64:
-				metric, err = prometheus.NewConstMetric(desc, mType, val)
-			default:
-				continue
-			}
-			if err != nil {
-				log.Printf("E! Error creating prometheus metric, "+
-					"key: %s, labels: %v,\nerr: %s\n",
-					mname, l, err.Error())
+			var fam *MetricFamily
+			var ok bool
+			if fam, ok = p.fam[mname]; !ok {
+				fam = &MetricFamily{
+					Samples:   make(map[SampleID]*Sample),
+					ValueType: vt,
+					LabelSet:  make(map[string]int),
+				}
+				p.fam[mname] = fam
+			} else {
+				if fam.ValueType != vt {
+					// Don't return an error since this would be a permanent error
+					log.Printf("Mixed ValueType for measurement %q; dropping point", point.Name())
+					break
+				}
 			}
 
-			p.metrics[desc.String()] = &MetricWithExpiration{
-				Metric:     metric,
-				Expiration: time.Now().Add(p.ExpirationInterval.Duration),
+			for k, _ := range sample.Labels {
+				fam.LabelSet[k]++
 			}
+
+			fam.Samples[sampleID] = sample
 		}
 	}
 	return nil
@@ -187,6 +266,8 @@ func init() {
 	outputs.Add("prometheus_client", func() telegraf.Output {
 		return &PrometheusClient{
 			ExpirationInterval: internal.Duration{Duration: time.Second * 60},
+			fam:                make(map[string]*MetricFamily),
+			now:                time.Now,
 		}
 	})
 }
diff --git a/plugins/outputs/prometheus_client/prometheus_client_test.go b/plugins/outputs/prometheus_client/prometheus_client_test.go
index 0251de78..3ce211be 100644
--- a/plugins/outputs/prometheus_client/prometheus_client_test.go
+++ b/plugins/outputs/prometheus_client/prometheus_client_test.go
@@ -4,16 +4,314 @@ import (
 	"testing"
 	"time"
 
-	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/require"
-
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/internal"
 	"github.com/influxdata/telegraf/metric"
-	"github.com/influxdata/telegraf/plugins/inputs/prometheus"
+	prometheus_input "github.com/influxdata/telegraf/plugins/inputs/prometheus"
 	"github.com/influxdata/telegraf/testutil"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/stretchr/testify/require"
 )
 
+func setUnixTime(client *PrometheusClient, sec int64) {
+	client.now = func() time.Time {
+		return time.Unix(sec, 0)
+	}
+}
+
+// NewClient initializes a PrometheusClient.
+func NewClient() *PrometheusClient {
+	return &PrometheusClient{
+		ExpirationInterval: internal.Duration{Duration: time.Second * 60},
+		fam:                make(map[string]*MetricFamily),
+		now:                time.Now,
+	}
+}
+
+func TestWrite_Basic(t *testing.T) {
+	now := time.Now()
+	pt1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 0.0},
+		now)
+	var metrics = []telegraf.Metric{
+		pt1,
+	}
+
+	client := NewClient()
+	err = client.Write(metrics)
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, prometheus.UntypedValue, fam.ValueType)
+	require.Equal(t, map[string]int{}, fam.LabelSet)
+
+	sample, ok := fam.Samples[CreateSampleID(pt1.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, 0.0, sample.Value)
+	require.True(t, now.Before(sample.Expiration))
+}
+
+func TestWrite_IntField(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 42},
+		time.Now())
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	for _, v := range fam.Samples {
+		require.Equal(t, 42.0, v.Value)
+	}
+
+}
+
+func TestWrite_FieldNotValue(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"howdy": 0.0},
+		time.Now())
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo_howdy"]
+	require.True(t, ok)
+	for _, v := range fam.Samples {
+		require.Equal(t, 0.0, v.Value)
+	}
+}
+
+func TestWrite_SkipNonNumberField(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": "howdy"},
+		time.Now())
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	_, ok := client.fam["foo"]
+	require.False(t, ok)
+}
+
+func TestWrite_Counter(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 42},
+		time.Now(),
+		telegraf.Counter)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, prometheus.CounterValue, fam.ValueType)
+}
+
+func TestWrite_Sanitize(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo.bar",
+		map[string]string{"tag-with-dash": "localhost.local"},
+		map[string]interface{}{"field-with-dash": 42},
+		time.Now(),
+		telegraf.Counter)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo_bar_field_with_dash"]
+	require.True(t, ok)
+	require.Equal(t, map[string]int{"tag_with_dash": 1}, fam.LabelSet)
+
+	sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, map[string]string{
+		"tag_with_dash": "localhost_local"}, sample1.Labels)
+}
+
+func TestWrite_Gauge(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 42},
+		time.Now(),
+		telegraf.Gauge)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, prometheus.GaugeValue, fam.ValueType)
+}
+
+func TestWrite_MixedValueType(t *testing.T) {
+	now := time.Now()
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 1.0},
+		now,
+		telegraf.Counter)
+	p2, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 2.0},
+		now,
+		telegraf.Gauge)
+	var metrics = []telegraf.Metric{p1, p2}
+
+	client := NewClient()
+	err = client.Write(metrics)
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, 1, len(fam.Samples))
+}
+
+func TestWrite_Tags(t *testing.T) {
+	now := time.Now()
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 1.0},
+		now)
+	p2, err := metric.New(
+		"foo",
+		map[string]string{"host": "localhost"},
+		map[string]interface{}{"value": 2.0},
+		now)
+	var metrics = []telegraf.Metric{p1, p2}
+
+	client := NewClient()
+	err = client.Write(metrics)
+	require.NoError(t, err)
+
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, prometheus.UntypedValue, fam.ValueType)
+
+	require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
+
+	sample1, ok := fam.Samples[CreateSampleID(p1.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, 1.0, sample1.Value)
+	require.True(t, now.Before(sample1.Expiration))
+
+	sample2, ok := fam.Samples[CreateSampleID(p2.Tags())]
+	require.True(t, ok)
+
+	require.Equal(t, 2.0, sample2.Value)
+	require.True(t, now.Before(sample2.Expiration))
+}
+
+func TestExpire(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 1.0},
+		time.Now())
+	setUnixTime(client, 0)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	p2, err := metric.New(
+		"bar",
+		make(map[string]string),
+		map[string]interface{}{"value": 2.0},
+		time.Now())
+	setUnixTime(client, 1)
+	err = client.Write([]telegraf.Metric{p2})
+
+	setUnixTime(client, 61)
+	require.Equal(t, 2, len(client.fam))
+	client.Expire()
+	require.Equal(t, 1, len(client.fam))
+}
+
+func TestExpire_TagsNoDecrement(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 1.0},
+		time.Now())
+	setUnixTime(client, 0)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	p2, err := metric.New(
+		"foo",
+		map[string]string{"host": "localhost"},
+		map[string]interface{}{"value": 2.0},
+		time.Now())
+	setUnixTime(client, 1)
+	err = client.Write([]telegraf.Metric{p2})
+
+	setUnixTime(client, 61)
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, 2, len(fam.Samples))
+	client.Expire()
+	require.Equal(t, 1, len(fam.Samples))
+
+	require.Equal(t, map[string]int{"host": 1}, fam.LabelSet)
+}
+
+func TestExpire_TagsWithDecrement(t *testing.T) {
+	client := NewClient()
+
+	p1, err := metric.New(
+		"foo",
+		map[string]string{"host": "localhost"},
+		map[string]interface{}{"value": 1.0},
+		time.Now())
+	setUnixTime(client, 0)
+	err = client.Write([]telegraf.Metric{p1})
+	require.NoError(t, err)
+
+	p2, err := metric.New(
+		"foo",
+		make(map[string]string),
+		map[string]interface{}{"value": 2.0},
+		time.Now())
+	setUnixTime(client, 1)
+	err = client.Write([]telegraf.Metric{p2})
+
+	setUnixTime(client, 61)
+	fam, ok := client.fam["foo"]
+	require.True(t, ok)
+	require.Equal(t, 2, len(fam.Samples))
+	client.Expire()
+	require.Equal(t, 1, len(fam.Samples))
+
+	require.Equal(t, map[string]int{"host": 0}, fam.LabelSet)
+}
+
 var pTesting *PrometheusClient
 
 func TestPrometheusWritePointEmptyTag(t *testing.T) {
@@ -93,74 +391,21 @@ func TestPrometheusWritePointEmptyTag(t *testing.T) {
 	}
 }
 
-func TestPrometheusExpireOldMetrics(t *testing.T) {
-	if testing.Short() {
-		t.Skip("Skipping integration test in short mode")
-	}
-
-	pClient, p, err := setupPrometheus()
-	pClient.ExpirationInterval = internal.Duration{Duration: time.Second * 10}
-	require.NoError(t, err)
-	defer pClient.Stop()
-
-	now := time.Now()
-	tags := make(map[string]string)
-	pt1, _ := metric.New(
-		"test_point_1",
-		tags,
-		map[string]interface{}{"value": 0.0},
-		now)
-	var metrics = []telegraf.Metric{pt1}
-	require.NoError(t, pClient.Write(metrics))
-
-	for _, m := range pClient.metrics {
-		m.Expiration = now.Add(time.Duration(-15) * time.Second)
-	}
-
-	pt2, _ := metric.New(
-		"test_point_2",
-		tags,
-		map[string]interface{}{"value": 1.0},
-		now)
-	var metrics2 = []telegraf.Metric{pt2}
-	require.NoError(t, pClient.Write(metrics2))
-
-	expected := []struct {
-		name  string
-		value float64
-		tags  map[string]string
-	}{
-		{"test_point_2", 1.0, tags},
-	}
-
-	var acc testutil.Accumulator
-
-	require.NoError(t, p.Gather(&acc))
-	for _, e := range expected {
-		acc.AssertContainsFields(t, e.name,
-			map[string]interface{}{"value": e.value})
-	}
-
-	acc.AssertDoesNotContainMeasurement(t, "test_point_1")
-
-	// Confirm that it's not in the PrometheusClient map anymore
-	assert.Equal(t, 1, len(pClient.metrics))
-}
-
-func setupPrometheus() (*PrometheusClient, *prometheus.Prometheus, error) {
+func setupPrometheus() (*PrometheusClient, *prometheus_input.Prometheus, error) {
 	if pTesting == nil {
-		pTesting = &PrometheusClient{Listen: "localhost:9127"}
+		pTesting = NewClient()
+		pTesting.Listen = "localhost:9127"
 		err := pTesting.Start()
 		if err != nil {
 			return nil, nil, err
 		}
 	} else {
-		pTesting.metrics = make(map[string]*MetricWithExpiration)
+		pTesting.fam = make(map[string]*MetricFamily)
 	}
 
 	time.Sleep(time.Millisecond * 200)
 
-	p := &prometheus.Prometheus{
+	p := &prometheus_input.Prometheus{
 		Urls: []string{"http://localhost:9127/metrics"},
 	}
 
-- 
GitLab