From 6fcd05b855490908061eb4ad535d6b5ca84f44b2 Mon Sep 17 00:00:00 2001
From: Cameron Sparr <cameronsparr@gmail.com>
Date: Tue, 15 Dec 2015 11:08:13 -0600
Subject: [PATCH] 0.3.0 redis & rabbitmq

---
 outputs/influxdb/influxdb.go |  2 ++
 plugins/rabbitmq/rabbitmq.go | 57 ++++++++++++++++++++----------------
 plugins/redis/redis.go       | 12 +++++---
 3 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/outputs/influxdb/influxdb.go b/outputs/influxdb/influxdb.go
index a9fa2edc..14391884 100644
--- a/outputs/influxdb/influxdb.go
+++ b/outputs/influxdb/influxdb.go
@@ -7,6 +7,7 @@ import (
 	"math/rand"
 	"net/url"
 	"strings"
+	"time"
 
 	"github.com/influxdb/influxdb/client/v2"
 	"github.com/influxdb/telegraf/internal"
@@ -110,6 +111,7 @@ func (i *InfluxDB) Connect() error {
 	}
 
 	i.conns = conns
+	rand.Seed(time.Now().UnixNano())
 	return nil
 }
 
diff --git a/plugins/rabbitmq/rabbitmq.go b/plugins/rabbitmq/rabbitmq.go
index 27580a13..7101ab43 100644
--- a/plugins/rabbitmq/rabbitmq.go
+++ b/plugins/rabbitmq/rabbitmq.go
@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"net/http"
 	"strconv"
+	"time"
 
 	"github.com/influxdb/telegraf/plugins"
 )
@@ -199,20 +200,20 @@ func gatherOverview(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan
 	if serv.Name != "" {
 		tags["name"] = serv.Name
 	}
-
-	acc.Add("messages", overview.QueueTotals.Messages, tags)
-	acc.Add("messages_ready", overview.QueueTotals.MessagesReady, tags)
-	acc.Add("messages_unacked", overview.QueueTotals.MessagesUnacknowledged, tags)
-
-	acc.Add("channels", overview.ObjectTotals.Channels, tags)
-	acc.Add("connections", overview.ObjectTotals.Connections, tags)
-	acc.Add("consumers", overview.ObjectTotals.Consumers, tags)
-	acc.Add("exchanges", overview.ObjectTotals.Exchanges, tags)
-	acc.Add("queues", overview.ObjectTotals.Queues, tags)
-
-	acc.Add("messages_acked", overview.MessageStats.Ack, tags)
-	acc.Add("messages_delivered", overview.MessageStats.Deliver, tags)
-	acc.Add("messages_published", overview.MessageStats.Publish, tags)
+	fields := map[string]interface{}{
+		"messages":           overview.QueueTotals.Messages,
+		"messages_ready":     overview.QueueTotals.MessagesReady,
+		"messages_unacked":   overview.QueueTotals.MessagesUnacknowledged,
+		"channels":           overview.ObjectTotals.Channels,
+		"connections":        overview.ObjectTotals.Connections,
+		"consumers":          overview.ObjectTotals.Consumers,
+		"exchanges":          overview.ObjectTotals.Exchanges,
+		"queues":             overview.ObjectTotals.Queues,
+		"messages_acked":     overview.MessageStats.Ack,
+		"messages_delivered": overview.MessageStats.Deliver,
+		"messages_published": overview.MessageStats.Publish,
+	}
+	acc.AddFields("rabbitmq_overview", fields, tags)
 
 	errChan <- nil
 }
@@ -225,6 +226,7 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
 		errChan <- err
 		return
 	}
+	now := time.Now()
 
 	for _, node := range nodes {
 		if !shouldGatherNode(node, serv) {
@@ -234,17 +236,20 @@ func gatherNodes(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan cha
 		tags := map[string]string{"url": serv.URL}
 		tags["node"] = node.Name
 
-		acc.Add("disk_free", node.DiskFree, tags)
-		acc.Add("disk_free_limit", node.DiskFreeLimit, tags)
-		acc.Add("fd_total", node.FdTotal, tags)
-		acc.Add("fd_used", node.FdUsed, tags)
-		acc.Add("mem_limit", node.MemLimit, tags)
-		acc.Add("mem_used", node.MemUsed, tags)
-		acc.Add("proc_total", node.ProcTotal, tags)
-		acc.Add("proc_used", node.ProcUsed, tags)
-		acc.Add("run_queue", node.RunQueue, tags)
-		acc.Add("sockets_total", node.SocketsTotal, tags)
-		acc.Add("sockets_used", node.SocketsUsed, tags)
+		fields := map[string]interface{}{
+			"disk_free":       node.DiskFree,
+			"disk_free_limit": node.DiskFreeLimit,
+			"fd_total":        node.FdTotal,
+			"fd_used":         node.FdUsed,
+			"mem_limit":       node.MemLimit,
+			"mem_used":        node.MemUsed,
+			"proc_total":      node.ProcTotal,
+			"proc_used":       node.ProcUsed,
+			"run_queue":       node.RunQueue,
+			"sockets_total":   node.SocketsTotal,
+			"sockets_used":    node.SocketsUsed,
+		}
+		acc.AddFields("rabbitmq_node", fields, tags, now)
 	}
 
 	errChan <- nil
@@ -273,7 +278,7 @@ func gatherQueues(r *RabbitMQ, serv *Server, acc plugins.Accumulator, errChan ch
 		}
 
 		acc.AddFields(
-			"queue",
+			"rabbitmq_queue",
 			map[string]interface{}{
 				// common information
 				"consumers":            queue.Consumers,
diff --git a/plugins/redis/redis.go b/plugins/redis/redis.go
index 151fb4f4..2e338ff1 100644
--- a/plugins/redis/redis.go
+++ b/plugins/redis/redis.go
@@ -164,6 +164,7 @@ func gatherInfoOutput(
 	var keyspace_hits, keyspace_misses uint64 = 0, 0
 
 	scanner := bufio.NewScanner(rdr)
+	fields := make(map[string]interface{})
 	for scanner.Scan() {
 		line := scanner.Text()
 		if strings.Contains(line, "ERR") {
@@ -199,7 +200,7 @@ func gatherInfoOutput(
 		}
 
 		if err == nil {
-			acc.Add(metric, ival, tags)
+			fields[metric] = ival
 			continue
 		}
 
@@ -208,13 +209,14 @@ func gatherInfoOutput(
 			return err
 		}
 
-		acc.Add(metric, fval, tags)
+		fields[metric] = fval
 	}
 	var keyspace_hitrate float64 = 0.0
 	if keyspace_hits != 0 || keyspace_misses != 0 {
 		keyspace_hitrate = float64(keyspace_hits) / float64(keyspace_hits+keyspace_misses)
 	}
-	acc.Add("keyspace_hitrate", keyspace_hitrate, tags)
+	fields["keyspace_hitrate"] = keyspace_hitrate
+	acc.AddFields("redis", fields, tags)
 	return nil
 }
 
@@ -229,15 +231,17 @@ func gatherKeyspaceLine(
 	tags map[string]string,
 ) {
 	if strings.Contains(line, "keys=") {
+		fields := make(map[string]interface{})
 		tags["database"] = name
 		dbparts := strings.Split(line, ",")
 		for _, dbp := range dbparts {
 			kv := strings.Split(dbp, "=")
 			ival, err := strconv.ParseUint(kv[1], 10, 64)
 			if err == nil {
-				acc.Add(kv[0], ival, tags)
+				fields[kv[0]] = ival
 			}
 		}
+		acc.AddFields("redis_keyspace", fields, tags)
 	}
 }
 
-- 
GitLab