From da12c647910d3bd04393a5adace8dda5cbe4b976 Mon Sep 17 00:00:00 2001
From: Ildar Svetlov <svetlov.ildar@gmail.com>
Date: Tue, 30 Jan 2018 00:14:49 +0400
Subject: [PATCH] Add ability to select which queues will be gathered to
 rabbitmq input (#3702)

---
 plugins/inputs/rabbitmq/README.md   |  5 +++
 plugins/inputs/rabbitmq/rabbitmq.go | 48 ++++++++++++++++++++++++-----
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md
index 796dfc7b..5dae5e09 100644
--- a/plugins/inputs/rabbitmq/README.md
+++ b/plugins/inputs/rabbitmq/README.md
@@ -44,6 +44,11 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
   ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
   ## specified, metrics for all exchanges are gathered.
   # exchanges = ["telegraf"]
+
+  ## Queues to include and exclude. Globs accepted.
+  ## Note that an empty array for both will include all queues
+  # queue_name_include = []
+  # queue_name_exclude = []
 ```
 
 ### Measurements & Fields:
diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go
index 64aa6408..e0d12c3d 100644
--- a/plugins/inputs/rabbitmq/rabbitmq.go
+++ b/plugins/inputs/rabbitmq/rabbitmq.go
@@ -9,6 +9,7 @@ import (
 	"time"
 
 	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/filter"
 	"github.com/influxdata/telegraf/internal"
 	"github.com/influxdata/telegraf/plugins/inputs"
 )
@@ -52,7 +53,14 @@ type RabbitMQ struct {
 	Queues    []string
 	Exchanges []string
 
+	QueueInclude []string `toml:"queue_name_include"`
+	QueueExclude []string `toml:"queue_name_exclude"`
+
 	Client *http.Client
+
+	filterCreated     bool
+	excludeEveryQueue bool
+	queueFilter       filter.Filter
 }
 
 // OverviewResponse ...
@@ -195,6 +203,11 @@ var sampleConfig = `
   ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
   ## specified, metrics for all exchanges are gathered.
   # exchanges = ["telegraf"]
+
+  ## Queues to include and exclude. Globs accepted.
+  ## Note that an empty array for both will include all queues
+  queue_name_include = []
+  queue_name_exclude = []
 `
 
 // SampleConfig ...
@@ -225,6 +238,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
 		}
 	}
 
+	// Create queue filter if not already created
+	if !r.filterCreated {
+		err := r.createQueueFilter()
+		if err != nil {
+			return err
+		}
+		r.filterCreated = true
+	}
+
 	var wg sync.WaitGroup
 	wg.Add(len(gatherFunctions))
 	for _, f := range gatherFunctions {
@@ -361,6 +383,9 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
 }
 
 func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
+	if r.excludeEveryQueue {
+		return
+	}
 	// Gather information about queues
 	queues := make([]Queue, 0)
 	err := r.requestJSON("/api/queues", &queues)
@@ -370,7 +395,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
 	}
 
 	for _, queue := range queues {
-		if !r.shouldGatherQueue(queue) {
+		if !r.queueFilter.Match(queue.Name) {
 			continue
 		}
 		tags := map[string]string{
@@ -463,18 +488,25 @@ func (r *RabbitMQ) shouldGatherNode(node Node) bool {
 	return false
 }
 
-func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
-	if len(r.Queues) == 0 {
-		return true
+func (r *RabbitMQ) createQueueFilter() error {
+	// Backwards compatibility for deprecated `queues` parameter.
+	if len(r.Queues) > 0 {
+		r.QueueInclude = append(r.QueueInclude, r.Queues...)
 	}
 
-	for _, name := range r.Queues {
-		if name == queue.Name {
-			return true
+	filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
+	if err != nil {
+		return err
+	}
+	r.queueFilter = filter
+
+	for _, q := range r.QueueExclude {
+		if q == "*" {
+			r.excludeEveryQueue = true
 		}
 	}
 
-	return false
+	return nil
 }
 
 func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
-- 
GitLab