Skip to content
Snippets Groups Projects
Commit da12c647 authored by Ildar Svetlov's avatar Ildar Svetlov Committed by Daniel Nelson
Browse files

Add ability to select which queues will be gathered to rabbitmq input (#3702)

parent de03ee3c
No related branches found
No related tags found
No related merge requests found
...@@ -44,6 +44,11 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ...@@ -44,6 +44,11 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered. ## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"] # exchanges = ["telegraf"]
## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
# queue_name_include = []
# queue_name_exclude = []
``` ```
### Measurements & Fields: ### Measurements & Fields:
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
...@@ -52,7 +53,14 @@ type RabbitMQ struct { ...@@ -52,7 +53,14 @@ type RabbitMQ struct {
Queues []string Queues []string
Exchanges []string Exchanges []string
QueueInclude []string `toml:"queue_name_include"`
QueueExclude []string `toml:"queue_name_exclude"`
Client *http.Client Client *http.Client
filterCreated bool
excludeEveryQueue bool
queueFilter filter.Filter
} }
// OverviewResponse ... // OverviewResponse ...
...@@ -195,6 +203,11 @@ var sampleConfig = ` ...@@ -195,6 +203,11 @@ var sampleConfig = `
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered. ## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"] # exchanges = ["telegraf"]
## Queues to include and exclude. Globs accepted.
## Note that an empty array for both will include all queues
queue_name_include = []
queue_name_exclude = []
` `
// SampleConfig ... // SampleConfig ...
...@@ -225,6 +238,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error { ...@@ -225,6 +238,15 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
} }
} }
// Create queue filter if not already created
if !r.filterCreated {
err := r.createQueueFilter()
if err != nil {
return err
}
r.filterCreated = true
}
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(gatherFunctions)) wg.Add(len(gatherFunctions))
for _, f := range gatherFunctions { for _, f := range gatherFunctions {
...@@ -361,6 +383,9 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -361,6 +383,9 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
} }
func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
if r.excludeEveryQueue {
return
}
// Gather information about queues // Gather information about queues
queues := make([]Queue, 0) queues := make([]Queue, 0)
err := r.requestJSON("/api/queues", &queues) err := r.requestJSON("/api/queues", &queues)
...@@ -370,7 +395,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { ...@@ -370,7 +395,7 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
} }
for _, queue := range queues { for _, queue := range queues {
if !r.shouldGatherQueue(queue) { if !r.queueFilter.Match(queue.Name) {
continue continue
} }
tags := map[string]string{ tags := map[string]string{
...@@ -463,18 +488,25 @@ func (r *RabbitMQ) shouldGatherNode(node Node) bool { ...@@ -463,18 +488,25 @@ func (r *RabbitMQ) shouldGatherNode(node Node) bool {
return false return false
} }
func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { func (r *RabbitMQ) createQueueFilter() error {
if len(r.Queues) == 0 { // Backwards compatibility for deprecated `queues` parameter.
return true if len(r.Queues) > 0 {
r.QueueInclude = append(r.QueueInclude, r.Queues...)
} }
for _, name := range r.Queues { filter, err := filter.NewIncludeExcludeFilter(r.QueueInclude, r.QueueExclude)
if name == queue.Name { if err != nil {
return true return err
}
r.queueFilter = filter
for _, q := range r.QueueExclude {
if q == "*" {
r.excludeEveryQueue = true
} }
} }
return false return nil
} }
func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool { func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment