Skip to content
Snippets Groups Projects
Commit 92ca6616 authored by Leandro Piccilli's avatar Leandro Piccilli Committed by Daniel Nelson
Browse files

Add support for tags in the index name in elasticsearch output (#3470)

parent 54b0b9e7
No related branches found
No related tags found
No related merge requests found
......@@ -173,6 +173,11 @@ This plugin will format the events in the following way:
# %d - day of month (e.g., 01)
# %H - hour (00..23)
# %V - week of the year (ISO week) (01..53)
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the index name. If the tag does not exist,
## the default tag value will be used.
# index_name = "telegraf-{{host}}-%Y.%m.%d"
# default_tag_value = "none"
index_name = "telegraf-%Y.%m.%d" # required.
## Optional SSL Config
......@@ -202,7 +207,9 @@ This plugin will format the events in the following way:
%m - month (01..12)
%d - day of month (e.g., 01)
%H - hour (00..23)
%V - week of the year (ISO week) (01..53)
```
Additionally, you can specify dynamic index names by using tags with the notation ```{{tag_name}}```. This will store the metrics with different tag values in different indices. If the tag does not exist in a particular metric, the `default_tag_value` will be used instead.
### Optional parameters:
......
......@@ -18,6 +18,8 @@ import (
type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
DefaultTagValue string
TagKeys []string
Username string
Password string
EnableSniffer bool
......@@ -38,7 +40,7 @@ var sampleConfig = `
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Elasticsearch client timeout, defaults to "5s" if not set.
## Elasticsearch client timeout, defaults to "5s" if not set.
timeout = "5s"
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option.
......@@ -60,6 +62,11 @@ var sampleConfig = `
# %d - day of month (e.g., 01)
# %H - hour (00..23)
# %V - week of the year (ISO week) (01..53)
## Additionally, you can specify a tag name using the notation {{tag_name}}
## which will be used as part of the index name. If the tag does not exist,
## the default tag value will be used.
# index_name = "telegraf-{{host}}-%Y.%m.%d"
# default_tag_value = "none"
index_name = "telegraf-%Y.%m.%d" # required.
## Optional SSL Config
......@@ -152,6 +159,8 @@ func (a *Elasticsearch) Connect() error {
}
}
a.IndexName, a.TagKeys = a.GetTagKeys(a.IndexName)
return nil
}
......@@ -167,7 +176,7 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time())
indexName := a.GetIndexName(a.IndexName, metric.Time(), a.TagKeys, metric.Tags())
m := make(map[string]interface{})
......@@ -214,13 +223,21 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
}
templatePattern := a.IndexName + "*"
templatePattern := a.IndexName
if strings.Contains(templatePattern, "%") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "%")]
}
if strings.Contains(templatePattern, "{{") {
templatePattern = templatePattern[0:strings.Index(templatePattern, "{{")]
}
if strings.Contains(a.IndexName, "%") {
templatePattern = a.IndexName[0:strings.Index(a.IndexName, "%")] + "*"
if templatePattern == "" {
return fmt.Errorf("Template cannot be created for dynamic index names without an index prefix")
}
if (a.OverwriteTemplate) || (!templateExists) {
if (a.OverwriteTemplate) || (!templateExists) || (templatePattern != "") {
// Create or update the template
tmpl := fmt.Sprintf(`
{
......@@ -278,7 +295,7 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
]
}
}
}`, templatePattern)
}`, templatePattern+"*")
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)
if errCreateTemplate != nil {
......@@ -295,7 +312,35 @@ func (a *Elasticsearch) manageTemplate(ctx context.Context) error {
return nil
}
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
func (a *Elasticsearch) GetTagKeys(indexName string) (string, []string) {
tagKeys := []string{}
startTag := strings.Index(indexName, "{{")
for startTag >= 0 {
endTag := strings.Index(indexName, "}}")
if endTag < 0 {
startTag = -1
} else {
tagName := indexName[startTag+2 : endTag]
var tagReplacer = strings.NewReplacer(
"{{"+tagName+"}}", "%s",
)
indexName = tagReplacer.Replace(indexName)
tagKeys = append(tagKeys, (strings.TrimSpace(tagName)))
startTag = strings.Index(indexName, "{{")
}
}
return indexName, tagKeys
}
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time, tagKeys []string, metricTags map[string]string) string {
if strings.Contains(indexName, "%") {
var dateReplacer = strings.NewReplacer(
"%Y", eventTime.UTC().Format("2006"),
......@@ -309,7 +354,18 @@ func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) stri
indexName = dateReplacer.Replace(indexName)
}
return indexName
tagValues := []interface{}{}
for _, key := range tagKeys {
if value, ok := metricTags[key]; ok {
tagValues = append(tagValues, value)
} else {
log.Printf("D! Tag '%s' not found, using '%s' on index name instead\n", key, a.DefaultTagValue)
tagValues = append(tagValues, a.DefaultTagValue)
}
}
return fmt.Sprintf(indexName, tagValues...)
}
......
......@@ -2,6 +2,7 @@ package elasticsearch
import (
"context"
"reflect"
"testing"
"time"
......@@ -38,6 +39,10 @@ func TestConnectAndWrite(t *testing.T) {
}
func TestTemplateManagementEmptyTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
ctx := context.Background()
......@@ -82,54 +87,173 @@ func TestTemplateManagement(t *testing.T) {
require.NoError(t, err)
}
func TestTemplateInvalidIndexPattern(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}
urls := []string{"http://" + testutil.GetLocalHost() + ":9200"}
e := &Elasticsearch{
URLs: urls,
IndexName: "{{host}}-%Y.%m.%d",
Timeout: internal.Duration{Duration: time.Second * 5},
ManageTemplate: true,
TemplateName: "telegraf",
OverwriteTemplate: true,
}
err := e.Connect()
require.Error(t, err)
}
func TestGetTagKeys(t *testing.T) {
e := &Elasticsearch{
DefaultTagValue: "none",
}
var tests = []struct {
IndexName string
ExpectedIndexName string
ExpectedTagKeys []string
}{
{
"indexname",
"indexname",
[]string{},
}, {
"indexname-%Y",
"indexname-%Y",
[]string{},
}, {
"indexname-%Y-%m",
"indexname-%Y-%m",
[]string{},
}, {
"indexname-%Y-%m-%d",
"indexname-%Y-%m-%d",
[]string{},
}, {
"indexname-%Y-%m-%d-%H",
"indexname-%Y-%m-%d-%H",
[]string{},
}, {
"indexname-%y-%m",
"indexname-%y-%m",
[]string{},
}, {
"indexname-{{tag1}}-%y-%m",
"indexname-%s-%y-%m",
[]string{"tag1"},
}, {
"indexname-{{tag1}}-{{tag2}}-%y-%m",
"indexname-%s-%s-%y-%m",
[]string{"tag1", "tag2"},
}, {
"indexname-{{tag1}}-{{tag2}}-{{tag3}}-%y-%m",
"indexname-%s-%s-%s-%y-%m",
[]string{"tag1", "tag2", "tag3"},
},
}
for _, test := range tests {
indexName, tagKeys := e.GetTagKeys(test.IndexName)
if indexName != test.ExpectedIndexName {
t.Errorf("Expected indexname %s, got %s\n", test.ExpectedIndexName, indexName)
}
if !reflect.DeepEqual(tagKeys, test.ExpectedTagKeys) {
t.Errorf("Expected tagKeys %s, got %s\n", test.ExpectedTagKeys, tagKeys)
}
}
}
func TestGetIndexName(t *testing.T) {
e := &Elasticsearch{}
e := &Elasticsearch{
DefaultTagValue: "none",
}
var tests = []struct {
EventTime time.Time
Tags map[string]string
TagKeys []string
IndexName string
Expected string
}{
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname",
"indexname",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y",
"indexname-2014",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m",
"indexname-2014-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m-%d",
"indexname-2014-12-01",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%m-%d-%H",
"indexname-2014-12-01-23",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%y-%m",
"indexname-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{},
"indexname-%Y-%V",
"indexname-2014-49",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1"},
"indexname-%s-%y-%m",
"indexname-value1-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1", "tag2"},
"indexname-%s-%s-%y-%m",
"indexname-value1-value2-14-12",
},
{
time.Date(2014, 12, 01, 23, 30, 00, 00, time.UTC),
map[string]string{"tag1": "value1", "tag2": "value2"},
[]string{"tag1", "tag2", "tag3"},
"indexname-%s-%s-%s-%y-%m",
"indexname-value1-value2-none-14-12",
},
}
for _, test := range tests {
indexName := e.GetIndexName(test.IndexName, test.EventTime)
indexName := e.GetIndexName(test.IndexName, test.EventTime, test.TagKeys, test.Tags)
if indexName != test.Expected {
t.Errorf("Expected indexname %s, got %s\n", indexName, test.Expected)
t.Errorf("Expected indexname %s, got %s\n", test.Expected, indexName)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment