From 92ca661662fe276e5642d34e3c1f8d30bdf58fad Mon Sep 17 00:00:00 2001
From: Leandro Piccilli <lpic10@users.noreply.github.com>
Date: Tue, 21 Nov 2017 01:25:36 +0100
Subject: [PATCH] Add support for tags in the index name in elasticsearch
 output (#3470)

---
 plugins/outputs/elasticsearch/README.md       |   7 +
 .../outputs/elasticsearch/elasticsearch.go    |  74 ++++++++--
 .../elasticsearch/elasticsearch_test.go       | 130 +++++++++++++++++-
 3 files changed, 199 insertions(+), 12 deletions(-)

diff --git a/plugins/outputs/elasticsearch/README.md b/plugins/outputs/elasticsearch/README.md
index d2c84a8d..b0d2e6f9 100644
--- a/plugins/outputs/elasticsearch/README.md
+++ b/plugins/outputs/elasticsearch/README.md
@@ -173,6 +173,11 @@ This plugin will format the events in the following way:
   # %d - day of month (e.g., 01)
   # %H - hour (00..23)
   # %V - week of the year (ISO week) (01..53)
+  ## Additionally, you can specify a tag name using the notation {{tag_name}}
+  ## which will be used as part of the index name. If the tag does not exist,
+  ## the default tag value will be used.
+  # index_name = "telegraf-{{host}}-%Y.%m.%d"
+  # default_tag_value = "none"
   index_name = "telegraf-%Y.%m.%d" # required.
 
   ## Optional SSL Config
@@ -202,7 +207,9 @@ This plugin will format the events in the following way:
   %m - month (01..12)
   %d - day of month (e.g., 01)
   %H - hour (00..23)
+  %V - week of the year (ISO week) (01..53)
 ```
+Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead.
 
 ### Optional parameters:
 
diff --git a/plugins/outputs/elasticsearch/elasticsearch.go b/plugins/outputs/elasticsearch/elasticsearch.go
index a9fd5a49..326def1d 100644
--- a/plugins/outputs/elasticsearch/elasticsearch.go
+++ b/plugins/outputs/elasticsearch/elasticsearch.go
@@ -18,6 +18,8 @@ import (
 type Elasticsearch struct {
 	URLs                []string `toml:"urls"`
 	IndexName           string
+	DefaultTagValue     string
+	TagKeys             []string
 	Username            string
 	Password            string
 	EnableSniffer       bool
@@ -38,7 +40,7 @@ var sampleConfig = `
   ## Multiple urls can be specified as part of the same cluster,
   ## this means that only ONE of the urls will be written to each interval.
   urls = [ "http://node1.es.example.com:9200" ] # required.
-  ## Elasticsearch client timeout, defaults to "5s" if not set. 
+  ## Elasticsearch client timeout, defaults to "5s" if not set.
   timeout = "5s"
   ## Set to true to ask Elasticsearch a list of all cluster nodes,
   ## thus it is not necessary to list all nodes in the urls config option.
@@ -60,6 +62,11 @@ var sampleConfig = `
   # %d - day of month (e.g., 01)
   # %H - hour (00..23)
   # %V - week of the year (ISO week) (01..53)
+  ## Additionally, you can specify a tag name using the notation {{tag_name}}
+  ## which will be used as part of the index name. If the tag does not exist,
+  ## the default tag value will be used.
+  # index_name = "telegraf-{{host}}-%Y.%m.%d"
+  # default_tag_value = "none"
   index_name = "telegraf-%Y.%m.%d" # required.
 
   ## Optional SSL Config
@@ -152,6 +159,8 @@ func (a *Elasticsearch) Connect() error {
 		}
 	}
 
+	a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
+
 	return nil
 }
 
@@ -167,7 +176,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
 
 		// index name has to be re-evaluated each time for telegraf
 		// to send the metric to the correct time-based index
-		indexName := a.GetIndexName(a.IndexName, metric.Time())
+		indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
 
 		m := make(map[string]interface{})
 
@@ -214,13 +223,21 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
 		return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
 	}
 
-	templatePattern := a.IndexName + "*"
+	templatePattern := a.IndexName
+
+	if strings.Contains(templatePattern, "%") {
+		templatePattern = templatePattern[0:strings.Index(templatePattern, "%")]
+	}
+
+	if strings.Contains(templatePattern, "{{") {
+		templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
+	}
 
-	if strings.Contains(a.IndexName, "%") {
-		templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*"
+	if templatePattern == "" {
+		return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix")
 	}
 
-	if (a.OverwriteTemplate) || (!templateExists) {
+	if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
 		// Create or update the template
 		tmpl := fmt.Sprintf(`
 			{
@@ -278,7 +295,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
 						]
 					}
 				}
-			}`, templatePattern)
+			}`, templatePattern+"*")
 		_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)
 
 		if errCreateTemplate != nil {
@@ -295,7 +312,35 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
 	return nil
 }
 
-func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
+func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
+
+	tagKeys := []string{}
+	startTag := strings.Index(indexName, "{{")
+
+	for startTag >= 0 {
+		endTag := strings.Index(indexName, "}}")
+
+		if endTag < 0 {
+			startTag = -1
+
+		} else {
+			tagName := indexName[startTag+2 : endTag]
+
+			var tagReplacer = strings.NewReplacer(
+				"{{"+tagName+"}}", "%s",
+			)
+
+			indexName = tagReplacer.Replace(indexName)
+			tagKeys = append(tagKeys, (strings.TrimSpace(tagName)))
+
+			startTag = strings.Index(indexName, "{{")
+		}
+	}
+
+	return indexName, tagKeys
+}
+
+func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagKeys []string, metricTags map[string]string) string {
 	if strings.Contains(indexName, "%") {
 		var dateReplacer = strings.NewReplacer(
 			"%Y", eventTime.UTC().Format("2006"),
@@ -309,7 +354,18 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) stri
 		indexName = dateReplacer.Replace(indexName)
 	}
 
-	return indexName
+	tagValues := []interface{}{}
+
+	for _, key := range tagKeys {
+		if value, ok := metricTags[key]; ok {
+			tagValues = append(tagValues, value)
+		} else {
+			log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue)
+			tagValues = append(tagValues, a.DefaultTagValue)
+		}
+	}
+
+	return fmt.Sprintf(indexName, tagValues...)
 
 }
 
diff --git a/plugins/outputs/elasticsearch/elasticsearch_test.go b/plugins/outputs/elasticsearch/elasticsearch_test.go
index dadd94da..e2a58340 100644
--- a/plugins/outputs/elasticsearch/elasticsearch_test.go
+++ b/plugins/outputs/elasticsearch/elasticsearch_test.go
@@ -2,6 +2,7 @@ package elasticsearch
 
 import (
 	"context"
+	"reflect"
 	"testing"
 	"time"
 
@@ -38,6 +39,10 @@ func TestConnectAndWrite(t *testing.T) {
 }
 
 func TestTemplateManagementEmptyTemplate(t *testing.T) {
+	if testing.Short() {
+		t.Skip("Skipping integration test in short mode")
+	}
+
 	urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
 
 	ctx := context.Background()
@@ -82,54 +87,173 @@ func TestTemplateManagement(t *testing.T) {
 	require.NoError(t, err)
 }
 
+func TestTemplateInvalidIndexPattern(t *testing.T) {
+	if testing.Short() {
+		t.Skip("Skipping integration test in short mode")
+	}
+
+	urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
+
+	e := &Elasticsearch{
+		URLs:              urls,
+		IndexName:         "{{host}}-%Y.%m.%d",
+		Timeout:           internal.Duration{Duration: time.Second * 5},
+		ManageTemplate:    true,
+		TemplateName:      "telegraf",
+		OverwriteTemplate: true,
+	}
+
+	err := e.Connect()
+	require.Error(t, err)
+}
+
+func TestGetTagKeys(t *testing.T) {
+	e := &Elasticsearch{
+		DefaultTagValue: "none",
+	}
+
+	var tests = []struct {
+		IndexName         string
+		ExpectedIndexName string
+		ExpectedTagKeys   []string
+	}{
+		{
+			"indexname",
+			"indexname",
+			[]string{},
+		}, {
+			"indexname-%Y",
+			"indexname-%Y",
+			[]string{},
+		}, {
+			"indexname-%Y-%m",
+			"indexname-%Y-%m",
+			[]string{},
+		}, {
+			"indexname-%Y-%m-%d",
+			"indexname-%Y-%m-%d",
+			[]string{},
+		}, {
+			"indexname-%Y-%m-%d-%H",
+			"indexname-%Y-%m-%d-%H",
+			[]string{},
+		}, {
+			"indexname-%y-%m",
+			"indexname-%y-%m",
+			[]string{},
+		}, {
+			"indexname-{{tag1}}-%y-%m",
+			"indexname-%s-%y-%m",
+			[]string{"tag1"},
+		}, {
+			"indexname-{{tag1}}-{{tag2}}-%y-%m",
+			"indexname-%s-%s-%y-%m",
+			[]string{"tag1", "tag2"},
+		}, {
+			"indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m",
+			"indexname-%s-%s-%s-%y-%m",
+			[]string{"tag1", "tag2", "tag3"},
+		},
+	}
+	for _, test := range tests {
+		indexName, tagKeys := e.GetTagKeys(test.IndexName)
+		if indexName != test.ExpectedIndexName {
+			t.Errorf("Expected indexname %s, got %s\n", test.ExpectedIndexName, indexName)
+		}
+		if !reflect.DeepEqual(tagKeys, test.ExpectedTagKeys) {
+			t.Errorf("Expected tagKeys %s, got %s\n", test.ExpectedTagKeys, tagKeys)
+		}
+	}
+
+}
+
 func TestGetIndexName(t *testing.T) {
-	e := &Elasticsearch{}
+	e := &Elasticsearch{
+		DefaultTagValue: "none",
+	}
 
 	var tests = []struct {
 		EventTime time.Time
+		Tags      map[string]string
+		TagKeys   []string
 		IndexName string
 		Expected  string
 	}{
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname",
 			"indexname",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%Y",
 			"indexname-2014",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%Y-%m",
 			"indexname-2014-12",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%Y-%m-%d",
 			"indexname-2014-12-01",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%Y-%m-%d-%H",
 			"indexname-2014-12-01-23",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%y-%m",
 			"indexname-14-12",
 		},
 		{
 			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{},
 			"indexname-%Y-%V",
 			"indexname-2014-49",
 		},
+		{
+			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{"tag1"},
+			"indexname-%s-%y-%m",
+			"indexname-value1-14-12",
+		},
+		{
+			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{"tag1", "tag2"},
+			"indexname-%s-%s-%y-%m",
+			"indexname-value1-value2-14-12",
+		},
+		{
+			time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
+			map[string]string{"tag1": "value1", "tag2": "value2"},
+			[]string{"tag1", "tag2", "tag3"},
+			"indexname-%s-%s-%s-%y-%m",
+			"indexname-value1-value2-none-14-12",
+		},
 	}
 	for _, test := range tests {
-		indexName := e.GetIndexName(test.IndexName, test.EventTime)
+		indexName := e.GetIndexName(test.IndexName, test.EventTime, test.TagKeys, test.Tags)
 		if indexName != test.Expected {
-			t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected)
+			t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName)
 		}
 	}
 }
-- 
GitLab