From fa5f1bf6d99350bd338ba4246477f2dfb0af4766 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Wed, 17 Jan 2018 14:57:46 -0800
Subject: [PATCH] Use go-redis for the redis input (#3661)

---
 Godeps                        |   1 +
 plugins/inputs/redis/redis.go | 182 +++++++++++++++++++---------------
 2 files changed, 101 insertions(+), 82 deletions(-)

diff --git a/Godeps b/Godeps
index 784c6044..c659227c 100644
--- a/Godeps
+++ b/Godeps
@@ -27,6 +27,7 @@ github.com/golang/snappy 7db9049039a047d955fe8c19b83c8ff5abd765c7
 github.com/go-ole/go-ole be49f7c07711fcb603cff39e1de7c67926dc0ba7
 github.com/google/go-cmp f94e52cad91c65a63acc1e75d4be223ea22e99bc
 github.com/gorilla/mux 392c28fe23e1c45ddba891b0320b3b5df220beea
+github.com/go-redis/redis 73b70592cdaa9e6abdfcfbf97b4a90d80728c836
 github.com/go-sql-driver/mysql 2e00b5cd70399450106cec6431c2e2ce3cae5034
 github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
 github.com/hashicorp/consul 63d2fc68239b996096a1c55a0d4b400ea4c2583f
diff --git a/plugins/inputs/redis/redis.go b/plugins/inputs/redis/redis.go
index 7f614334..c809aa43 100644
--- a/plugins/inputs/redis/redis.go
+++ b/plugins/inputs/redis/redis.go
@@ -2,21 +2,47 @@ package redis
 
 import (
 	"bufio"
-	"errors"
 	"fmt"
-	"net"
+	"io"
+	"log"
 	"net/url"
 	"strconv"
 	"strings"
 	"sync"
 	"time"
 
+	"github.com/go-redis/redis"
 	"github.com/influxdata/telegraf"
 	"github.com/influxdata/telegraf/plugins/inputs"
 )
 
 type Redis struct {
 	Servers []string
+
+	clients     []Client
+	initialized bool
+}
+
+type Client interface {
+	Info() *redis.StringCmd
+	BaseTags() map[string]string
+}
+
+type RedisClient struct {
+	client *redis.Client
+	tags   map[string]string
+}
+
+func (r *RedisClient) Info() *redis.StringCmd {
+	return r.client.Info()
+}
+
+func (r *RedisClient) BaseTags() map[string]string {
+	tags := make(map[string]string)
+	for k, v := range r.tags {
+		tags[k] = v
+	}
+	return tags
 }
 
 var sampleConfig = `
@@ -32,8 +58,6 @@ var sampleConfig = `
   servers = ["tcp://localhost:6379"]
 `
 
-var defaultTimeout = 5 * time.Second
-
 func (r *Redis) SampleConfig() string {
 	return sampleConfig
 }
@@ -48,111 +72,107 @@ var Tracking = map[string]string{
 	"role":              "replication_role",
 }
 
-var ErrProtocolError = errors.New("redis protocol error")
-
-const defaultPort = "6379"
+func (r *Redis) init(acc telegraf.Accumulator) error {
+	if r.initialized {
+		return nil
+	}
 
-// Reads stats from all configured servers accumulates stats.
-// Returns one of the errors encountered while gather stats (if any).
-func (r *Redis) Gather(acc telegraf.Accumulator) error {
 	if len(r.Servers) == 0 {
-		url := &url.URL{
-			Scheme: "tcp",
-			Host:   ":6379",
-		}
-		r.gatherServer(url, acc)
-		return nil
+		r.Servers = []string{"tcp://localhost:6379"}
 	}
 
-	var wg sync.WaitGroup
-	for _, serv := range r.Servers {
+	r.clients = make([]Client, len(r.Servers))
+
+	for i, serv := range r.Servers {
 		if !strings.HasPrefix(serv, "tcp://") && !strings.HasPrefix(serv, "unix://") {
+			log.Printf("W! [inputs.redis]: server URL found without scheme; please update your configuration file")
 			serv = "tcp://" + serv
 		}
 
 		u, err := url.Parse(serv)
 		if err != nil {
-			acc.AddError(fmt.Errorf("Unable to parse to address '%s': %s", serv, err))
-			continue
-		} else if u.Scheme == "" {
-			// fallback to simple string based address (i.e. "10.0.0.1:10000")
-			u.Scheme = "tcp"
-			u.Host = serv
-			u.Path = ""
+			return fmt.Errorf("Unable to parse to address %q: %v", serv, err)
 		}
-		if u.Scheme == "tcp" {
-			_, _, err := net.SplitHostPort(u.Host)
-			if err != nil {
-				u.Host = u.Host + ":" + defaultPort
+
+		password := ""
+		if u.User != nil {
+			pw, ok := u.User.Password()
+			if ok {
+				password = pw
 			}
 		}
 
-		wg.Add(1)
-		go func(serv string) {
-			defer wg.Done()
-			acc.AddError(r.gatherServer(u, acc))
-		}(serv)
+		var address string
+		if u.Scheme == "unix" {
+			address = u.Path
+		} else {
+			address = u.Host
+		}
+
+		client := redis.NewClient(
+			&redis.Options{
+				Addr:     address,
+				Password: password,
+				Network:  u.Scheme,
+				PoolSize: 1,
+			},
+		)
+
+		tags := map[string]string{}
+		if u.Scheme == "unix" {
+			tags["socket"] = u.Path
+		} else {
+			tags["server"] = u.Hostname()
+			tags["port"] = u.Port()
+		}
+
+		r.clients[i] = &RedisClient{
+			client: client,
+			tags:   tags,
+		}
 	}
 
-	wg.Wait()
+	r.initialized = true
 	return nil
 }
 
-func (r *Redis) gatherServer(addr *url.URL, acc telegraf.Accumulator) error {
-	var address string
-
-	if addr.Scheme == "unix" {
-		address = addr.Path
-	} else {
-		address = addr.Host
-	}
-	c, err := net.DialTimeout(addr.Scheme, address, defaultTimeout)
-	if err != nil {
-		return fmt.Errorf("Unable to connect to redis server '%s': %s", address, err)
+// Reads stats from all configured servers accumulates stats.
+// Returns one of the errors encountered while gather stats (if any).
+func (r *Redis) Gather(acc telegraf.Accumulator) error {
+	if !r.initialized {
+		err := r.init(acc)
+		if err != nil {
+			return err
+		}
 	}
-	defer c.Close()
 
-	// Extend connection
-	c.SetDeadline(time.Now().Add(defaultTimeout))
-
-	if addr.User != nil {
-		pwd, set := addr.User.Password()
-		if set && pwd != "" {
-			c.Write([]byte(fmt.Sprintf("AUTH %s\r\n", pwd)))
-
-			rdr := bufio.NewReader(c)
+	var wg sync.WaitGroup
 
-			line, err := rdr.ReadString('\n')
-			if err != nil {
-				return err
-			}
-			if line[0] != '+' {
-				return fmt.Errorf("%s", strings.TrimSpace(line)[1:])
-			}
-		}
+	for _, client := range r.clients {
+		wg.Add(1)
+		go func(client Client) {
+			defer wg.Done()
+			acc.AddError(r.gatherServer(client, acc))
+		}(client)
 	}
 
-	c.Write([]byte("INFO\r\n"))
-	c.Write([]byte("EOF\r\n"))
-	rdr := bufio.NewReader(c)
-
-	var tags map[string]string
+	wg.Wait()
+	return nil
+}
 
-	if addr.Scheme == "unix" {
-		tags = map[string]string{"socket": addr.Path}
-	} else {
-		// Setup tags for all redis metrics
-		host, port := "unknown", "unknown"
-		// If there's an error, ignore and use 'unknown' tags
-		host, port, _ = net.SplitHostPort(addr.Host)
-		tags = map[string]string{"server": host, "port": port}
+func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
+	info, err := client.Info().Result()
+	if err != nil {
+		return err
 	}
-	return gatherInfoOutput(rdr, acc, tags)
+
+	rdr := strings.NewReader(info)
+	return gatherInfoOutput(rdr, acc, client.BaseTags())
 }
 
 // gatherInfoOutput gathers
 func gatherInfoOutput(
-	rdr *bufio.Reader,
+	rdr io.Reader,
 	acc telegraf.Accumulator,
 	tags map[string]string,
 ) error {
@@ -163,13 +183,11 @@ func gatherInfoOutput(
 	fields := make(map[string]interface{})
 	for scanner.Scan() {
 		line := scanner.Text()
-		if strings.Contains(line, "ERR") {
-			break
-		}
 
 		if len(line) == 0 {
 			continue
 		}
+
 		if line[0] == '#' {
 			if len(line) > 2 {
 				section = line[2:]
-- 
GitLab