From d5b9e003fee44ff5276e7177e048cdc007ea95f8 Mon Sep 17 00:00:00 2001
From: Josh Hardy <josh@glympse.com>
Date: Fri, 25 Mar 2016 15:16:23 -0700
Subject: [PATCH] Add CloudWatch input plugin

Rebased commit of previously reviewed branch.
Added cloudwatch client Mock and more rich unit tests.

closes #935
closes #936
---
 CHANGELOG.md                                 |   1 +
 plugins/inputs/all/all.go                    |   1 +
 plugins/inputs/cloudwatch/README.md          |  86 ++++++
 plugins/inputs/cloudwatch/cloudwatch.go      | 305 +++++++++++++++++++
 plugins/inputs/cloudwatch/cloudwatch_test.go | 131 ++++++++
 5 files changed, 524 insertions(+)
 create mode 100644 plugins/inputs/cloudwatch/README.md
 create mode 100644 plugins/inputs/cloudwatch/cloudwatch.go
 create mode 100644 plugins/inputs/cloudwatch/cloudwatch_test.go

diff --git a/CHANGELOG.md b/CHANGELOG.md
index b5a2c9f7..09a00f06 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
 ### Features
 - [#976](https://github.com/influxdata/telegraf/pull/976): Reduce allocations in the UDP and statsd inputs.
 - [#979](https://github.com/influxdata/telegraf/pull/979): Reduce allocations in the TCP listener.
+- [#935](https://github.com/influxdata/telegraf/pull/935): AWS Cloudwatch input plugin. Thanks @joshhardy & @ljosa!
 
 ### Bugfixes
 - [#968](https://github.com/influxdata/telegraf/issues/968): Processes plugin gets unknown state when spaces are in (command name)
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 4f7d45f6..52ee6c13 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -4,6 +4,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
 	_ "github.com/influxdata/telegraf/plugins/inputs/apache"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
+	_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
 	_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
 	_ "github.com/influxdata/telegraf/plugins/inputs/couchdb"
 	_ "github.com/influxdata/telegraf/plugins/inputs/disque"
diff --git a/plugins/inputs/cloudwatch/README.md b/plugins/inputs/cloudwatch/README.md
new file mode 100644
index 00000000..04501161
--- /dev/null
+++ b/plugins/inputs/cloudwatch/README.md
@@ -0,0 +1,86 @@
+# Amazon CloudWatch Statistics Input
+
+This plugin will pull Metric Statistics from Amazon CloudWatch.
+
+### Amazon Authentication
+
+This plugin uses a credential chain for Authentication with the CloudWatch
+API endpoint. In the following order the plugin will attempt to authenticate.
+1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
+2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#environment-variables)
+3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk#shared-credentials-file)
+
+### Configuration:
+
+```toml
+[[inputs.cloudwatch]]
+  ## Amazon Region (required)
+  region = 'us-east-1'
+
+  ## Requested CloudWatch aggregation Period (required - must be a multiple of 60s)
+  period = '1m'
+
+  ## Collection Delay (required - must account for metrics availability via CloudWatch API)
+  delay = '1m'
+
+  ## Override global run interval (optional - defaults to global interval)
+  ## Recomended: use metric 'interval' that is a multiple of 'period' to avoid 
+  ## gaps or overlap in pulled data
+  interval = '1m'
+
+  ## Metric Statistic Namespace (required)
+  namespace = 'AWS/ELB'
+
+  ## Metrics to Pull (optional)
+  ## Defaults to all Metrics in Namespace if nothing is provided
+  ## Refreshes Namespace available metrics every 1h
+  [[inputs.cloudwatch.metrics]]
+    names = ['Latency', 'RequestCount']
+	
+    ## Dimension filters for Metric (optional)
+    [[inputs.cloudwatch.metrics.dimensions]]
+      name = 'LoadBalancerName'
+      value = 'p-example'
+```
+#### Requirements and Terminology
+
+Plugin Configuration utilizes [CloudWatch concepts](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html) and access pattern to allow monitoring of any CloudWatch Metric.
+
+- `region` must be a valid AWS [Region](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html#CloudWatchRegions) value
+- `period` must be a valid CloudWatch [Period](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html#CloudWatchPeriods) value
+- `namespace` must be a valid CloudWatch [Namespace](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html#Namespace) value
+- `names` must be valid CloudWatch [Metric](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html#Metric) names
+- `dimensions` must be valid CloudWatch [Dimension](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_concepts.html#Dimension) name/value pairs
+
+#### Restrictions and Limitations
+- CloudWatch metrics are not available instantly via the CloudWatch API. You should adjust your collection `delay` to account for this lag in metrics availability based on your [monitoring subscription level](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-cloudwatch-new.html)
+- CloudWatch API usage incurs cost - see [GetMetricStatistics Pricing](https://aws.amazon.com/cloudwatch/pricing/)
+
+### Measurements & Fields:
+
+Each CloudWatch Namespace monitored records a measurement with fields for each available Metric Statistic
+Namespace and Metrics are represented in [snake case](https://en.wikipedia.org/wiki/Snake_case)
+
+- cloudwatch_{namespace}
+  - {metric}_sum         (metric Sum value)
+  - {metric}_average     (metric Average value)
+  - {metric}_minimum     (metric Minimum value)
+  - {metric}_maximum     (metric Maximum value)
+  - {metric}_sample_count (metric SampleCount value)
+
+
+### Tags:
+Each measurement is tagged with the following identifiers to uniquely identify the associated metric
+Tag Dimension names are represented in [snake case](https://en.wikipedia.org/wiki/Snake_case)
+
+- All measurements have the following tags:
+  - region           (CloudWatch Region)
+  - unit             (CloudWatch Metric Unit)
+  - {dimension-name} (Cloudwatch Dimension value - one for each metric dimension)
+
+### Example Output:
+
+```
+$ ./telegraf -config telegraf.conf -input-filter cloudwatch -test
+> cloudwatch_aws_elb,load_balancer_name=p-example,region=us-east-1,unit=seconds latency_average=0.004810798017284538,latency_maximum=0.1100282669067383,latency_minimum=0.0006084442138671875,latency_sample_count=4029,latency_sum=19.382705211639404 1459542420000000000
+```
diff --git a/plugins/inputs/cloudwatch/cloudwatch.go b/plugins/inputs/cloudwatch/cloudwatch.go
new file mode 100644
index 00000000..e3fa74ba
--- /dev/null
+++ b/plugins/inputs/cloudwatch/cloudwatch.go
@@ -0,0 +1,305 @@
+package cloudwatch
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/aws/credentials"
+	"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
+	"github.com/aws/aws-sdk-go/aws/ec2metadata"
+	"github.com/aws/aws-sdk-go/aws/session"
+
+	"github.com/aws/aws-sdk-go/service/cloudwatch"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/plugins/inputs"
+)
+
+type (
+	CloudWatch struct {
+		Region      string            `toml:"region"`
+		Period      internal.Duration `toml:"period"`
+		Delay       internal.Duration `toml:"delay"`
+		Namespace   string            `toml:"namespace"`
+		Metrics     []*Metric         `toml:"metrics"`
+		client      cloudwatchClient
+		metricCache *MetricCache
+	}
+
+	Metric struct {
+		MetricNames []string     `toml:"names"`
+		Dimensions  []*Dimension `toml:"dimensions"`
+	}
+
+	Dimension struct {
+		Name  string `toml:"name"`
+		Value string `toml:"value"`
+	}
+
+	MetricCache struct {
+		TTL     time.Duration
+		Fetched time.Time
+		Metrics []*cloudwatch.Metric
+	}
+
+	cloudwatchClient interface {
+		ListMetrics(*cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error)
+		GetMetricStatistics(*cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error)
+	}
+)
+
+func (c *CloudWatch) SampleConfig() string {
+	return `
+  ## Amazon Region
+  region = 'us-east-1'
+
+  ## Requested CloudWatch aggregation Period (required - must be a multiple of 60s)
+  period = '1m'
+
+  ## Collection Delay (required - must account for metrics availability via CloudWatch API)
+  delay = '1m'
+
+  ## Recomended: use metric 'interval' that is a multiple of 'period' to avoid 
+  ## gaps or overlap in pulled data
+  interval = '1m'
+
+  ## Metric Statistic Namespace (required)
+  namespace = 'AWS/ELB'
+
+  ## Metrics to Pull (optional)
+  ## Defaults to all Metrics in Namespace if nothing is provided
+  ## Refreshes Namespace available metrics every 1h
+  #[[inputs.cloudwatch.metrics]]
+  #  names = ['Latency', 'RequestCount']
+  #	
+  #  ## Dimension filters for Metric (optional)
+  #  [[inputs.cloudwatch.metrics.dimensions]]
+  #    name = 'LoadBalancerName'
+  #    value = 'p-example'
+`
+}
+
+func (c *CloudWatch) Description() string {
+	return "Pull Metric Statistics from Amazon CloudWatch"
+}
+
+func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
+	if c.client == nil {
+		c.initializeCloudWatch()
+	}
+
+	var metrics []*cloudwatch.Metric
+
+	// check for provided metric filter
+	if c.Metrics != nil {
+		metrics = []*cloudwatch.Metric{}
+		for _, m := range c.Metrics {
+			dimensions := make([]*cloudwatch.Dimension, len(m.Dimensions))
+			for k, d := range m.Dimensions {
+				dimensions[k] = &cloudwatch.Dimension{
+					Name:  aws.String(d.Name),
+					Value: aws.String(d.Value),
+				}
+			}
+			for _, name := range m.MetricNames {
+				metrics = append(metrics, &cloudwatch.Metric{
+					Namespace:  aws.String(c.Namespace),
+					MetricName: aws.String(name),
+					Dimensions: dimensions,
+				})
+			}
+		}
+	} else {
+		var err error
+		metrics, err = c.fetchNamespaceMetrics()
+		if err != nil {
+			return err
+		}
+	}
+
+	metricCount := len(metrics)
+	var errChan = make(chan error, metricCount)
+
+	now := time.Now()
+
+	// limit concurrency or we can easily exhaust user connection limit
+	semaphore := make(chan byte, 64)
+
+	for _, m := range metrics {
+		semaphore <- 0x1
+		go c.gatherMetric(acc, m, now, semaphore, errChan)
+	}
+
+	for i := 1; i <= metricCount; i++ {
+		err := <-errChan
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func init() {
+	inputs.Add("cloudwatch", func() telegraf.Input {
+		return &CloudWatch{}
+	})
+}
+
+/*
+ * Initialize CloudWatch client
+ */
+func (c *CloudWatch) initializeCloudWatch() error {
+	config := &aws.Config{
+		Region: aws.String(c.Region),
+		Credentials: credentials.NewChainCredentials(
+			[]credentials.Provider{
+				&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
+				&credentials.EnvProvider{},
+				&credentials.SharedCredentialsProvider{},
+			}),
+	}
+
+	c.client = cloudwatch.New(session.New(config))
+	return nil
+}
+
+/*
+ * Fetch available metrics for given CloudWatch Namespace
+ */
+func (c *CloudWatch) fetchNamespaceMetrics() (metrics []*cloudwatch.Metric, err error) {
+	if c.metricCache != nil && c.metricCache.IsValid() {
+		metrics = c.metricCache.Metrics
+		return
+	}
+
+	metrics = []*cloudwatch.Metric{}
+
+	var token *string
+	for more := true; more; {
+		params := &cloudwatch.ListMetricsInput{
+			Namespace:  aws.String(c.Namespace),
+			Dimensions: []*cloudwatch.DimensionFilter{},
+			NextToken:  token,
+			MetricName: nil,
+		}
+
+		resp, err := c.client.ListMetrics(params)
+		if err != nil {
+			return nil, err
+		}
+
+		metrics = append(metrics, resp.Metrics...)
+
+		token = resp.NextToken
+		more = token != nil
+	}
+
+	cacheTTL, _ := time.ParseDuration("1hr")
+	c.metricCache = &MetricCache{
+		Metrics: metrics,
+		Fetched: time.Now(),
+		TTL:     cacheTTL,
+	}
+
+	return
+}
+
+/*
+ * Gather given Metric and emit any error
+ */
+func (c *CloudWatch) gatherMetric(acc telegraf.Accumulator, metric *cloudwatch.Metric, now time.Time, semaphore chan byte, errChan chan error) {
+	params := c.getStatisticsInput(metric, now)
+	resp, err := c.client.GetMetricStatistics(params)
+	if err != nil {
+		errChan <- err
+		<-semaphore
+		return
+	}
+
+	for _, point := range resp.Datapoints {
+		tags := map[string]string{
+			"region": c.Region,
+			"unit":   snakeCase(*point.Unit),
+		}
+
+		for _, d := range metric.Dimensions {
+			tags[snakeCase(*d.Name)] = *d.Value
+		}
+
+		// record field for each statistic
+		fields := map[string]interface{}{}
+
+		if point.Average != nil {
+			fields[formatField(*metric.MetricName, cloudwatch.StatisticAverage)] = *point.Average
+		}
+		if point.Maximum != nil {
+			fields[formatField(*metric.MetricName, cloudwatch.StatisticMaximum)] = *point.Maximum
+		}
+		if point.Minimum != nil {
+			fields[formatField(*metric.MetricName, cloudwatch.StatisticMinimum)] = *point.Minimum
+		}
+		if point.SampleCount != nil {
+			fields[formatField(*metric.MetricName, cloudwatch.StatisticSampleCount)] = *point.SampleCount
+		}
+		if point.Sum != nil {
+			fields[formatField(*metric.MetricName, cloudwatch.StatisticSum)] = *point.Sum
+		}
+
+		acc.AddFields(formatMeasurement(c.Namespace), fields, tags, *point.Timestamp)
+	}
+
+	errChan <- nil
+	<-semaphore
+}
+
+/*
+ * Formatting helpers
+ */
+func formatField(metricName string, statistic string) string {
+	return fmt.Sprintf("%s_%s", snakeCase(metricName), snakeCase(statistic))
+}
+
+func formatMeasurement(namespace string) string {
+	namespace = strings.Replace(namespace, "/", "_", -1)
+	namespace = snakeCase(namespace)
+	return fmt.Sprintf("cloudwatch_%s", namespace)
+}
+
+func snakeCase(s string) string {
+	s = internal.SnakeCase(s)
+	s = strings.Replace(s, "__", "_", -1)
+	return s
+}
+
+/*
+ * Map Metric to *cloudwatch.GetMetricStatisticsInput for given timeframe
+ */
+func (c *CloudWatch) getStatisticsInput(metric *cloudwatch.Metric, now time.Time) *cloudwatch.GetMetricStatisticsInput {
+	end := now.Add(-c.Delay.Duration)
+
+	input := &cloudwatch.GetMetricStatisticsInput{
+		StartTime:  aws.Time(end.Add(-c.Period.Duration)),
+		EndTime:    aws.Time(end),
+		MetricName: metric.MetricName,
+		Namespace:  metric.Namespace,
+		Period:     aws.Int64(int64(c.Period.Duration.Seconds())),
+		Dimensions: metric.Dimensions,
+		Statistics: []*string{
+			aws.String(cloudwatch.StatisticAverage),
+			aws.String(cloudwatch.StatisticMaximum),
+			aws.String(cloudwatch.StatisticMinimum),
+			aws.String(cloudwatch.StatisticSum),
+			aws.String(cloudwatch.StatisticSampleCount)},
+	}
+	return input
+}
+
+/*
+ * Check Metric Cache validity
+ */
+func (c *MetricCache) IsValid() bool {
+	return c.Metrics != nil && time.Since(c.Fetched) < c.TTL
+}
diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go
new file mode 100644
index 00000000..8f8a3ad0
--- /dev/null
+++ b/plugins/inputs/cloudwatch/cloudwatch_test.go
@@ -0,0 +1,131 @@
+package cloudwatch
+
+import (
+	"testing"
+	"time"
+
+	"github.com/aws/aws-sdk-go/aws"
+	"github.com/aws/aws-sdk-go/service/cloudwatch"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/stretchr/testify/assert"
+)
+
+type mockCloudWatchClient struct{}
+
+func (m *mockCloudWatchClient) ListMetrics(params *cloudwatch.ListMetricsInput) (*cloudwatch.ListMetricsOutput, error) {
+	metric := &cloudwatch.Metric{
+		Namespace:  params.Namespace,
+		MetricName: aws.String("Latency"),
+		Dimensions: []*cloudwatch.Dimension{
+			&cloudwatch.Dimension{
+				Name:  aws.String("LoadBalancerName"),
+				Value: aws.String("p-example"),
+			},
+		},
+	}
+
+	result := &cloudwatch.ListMetricsOutput{
+		Metrics: []*cloudwatch.Metric{metric},
+	}
+	return result, nil
+}
+
+func (m *mockCloudWatchClient) GetMetricStatistics(params *cloudwatch.GetMetricStatisticsInput) (*cloudwatch.GetMetricStatisticsOutput, error) {
+	dataPoint := &cloudwatch.Datapoint{
+		Timestamp:   params.EndTime,
+		Minimum:     aws.Float64(0.1),
+		Maximum:     aws.Float64(0.3),
+		Average:     aws.Float64(0.2),
+		Sum:         aws.Float64(123),
+		SampleCount: aws.Float64(100),
+		Unit:        aws.String("Seconds"),
+	}
+	result := &cloudwatch.GetMetricStatisticsOutput{
+		Label:      aws.String("Latency"),
+		Datapoints: []*cloudwatch.Datapoint{dataPoint},
+	}
+	return result, nil
+}
+
+func TestGather(t *testing.T) {
+	duration, _ := time.ParseDuration("1m")
+	internalDuration := internal.Duration{
+		Duration: duration,
+	}
+	c := &CloudWatch{
+		Region:    "us-east-1",
+		Namespace: "AWS/ELB",
+		Delay:     internalDuration,
+		Period:    internalDuration,
+	}
+
+	var acc testutil.Accumulator
+	c.client = &mockCloudWatchClient{}
+
+	c.Gather(&acc)
+
+	fields := map[string]interface{}{}
+	fields["latency_minimum"] = 0.1
+	fields["latency_maximum"] = 0.3
+	fields["latency_average"] = 0.2
+	fields["latency_sum"] = 123.0
+	fields["latency_sample_count"] = 100.0
+
+	tags := map[string]string{}
+	tags["unit"] = "seconds"
+	tags["region"] = "us-east-1"
+	tags["load_balancer_name"] = "p-example"
+
+	assert.True(t, acc.HasMeasurement("cloudwatch_aws_elb"))
+	acc.AssertContainsTaggedFields(t, "cloudwatch_aws_elb", fields, tags)
+
+}
+
+func TestGenerateStatisticsInputParams(t *testing.T) {
+	d := &cloudwatch.Dimension{
+		Name:  aws.String("LoadBalancerName"),
+		Value: aws.String("p-example"),
+	}
+
+	m := &cloudwatch.Metric{
+		MetricName: aws.String("Latency"),
+		Dimensions: []*cloudwatch.Dimension{d},
+	}
+
+	duration, _ := time.ParseDuration("1m")
+	internalDuration := internal.Duration{
+		Duration: duration,
+	}
+
+	c := &CloudWatch{
+		Namespace: "AWS/ELB",
+		Delay:     internalDuration,
+		Period:    internalDuration,
+	}
+
+	c.initializeCloudWatch()
+
+	now := time.Now()
+
+	params := c.getStatisticsInput(m, now)
+
+	assert.EqualValues(t, *params.EndTime, now.Add(-c.Delay.Duration))
+	assert.EqualValues(t, *params.StartTime, now.Add(-c.Period.Duration).Add(-c.Delay.Duration))
+	assert.Len(t, params.Dimensions, 1)
+	assert.Len(t, params.Statistics, 5)
+	assert.EqualValues(t, *params.Period, 60)
+}
+
+func TestMetricsCacheTimeout(t *testing.T) {
+	ttl, _ := time.ParseDuration("5ms")
+	cache := &MetricCache{
+		Metrics: []*cloudwatch.Metric{},
+		Fetched: time.Now(),
+		TTL:     ttl,
+	}
+
+	assert.True(t, cache.IsValid())
+	time.Sleep(ttl)
+	assert.False(t, cache.IsValid())
+}
-- 
GitLab