Skip to content
Snippets Groups Projects
Commit 312116c1 authored by Mike Gent's avatar Mike Gent Committed by Daniel Nelson
Browse files

Add passive mode exchange declaration option to amqp consumer input (#3995)

parent 2cc2913d
No related branches found
No related tags found
No related merge requests found
...@@ -3036,6 +3036,8 @@ ...@@ -3036,6 +3036,8 @@
# url = "amqp://localhost:5672/influxdb" # url = "amqp://localhost:5672/influxdb"
# ## AMQP exchange # ## AMQP exchange
# exchange = "telegraf" # exchange = "telegraf"
# ## Exchange passive mode
# exchange_passive = false
# ## AMQP queue name # ## AMQP queue name
# queue = "telegraf" # queue = "telegraf"
# ## Binding Key # ## Binding Key
......
...@@ -19,6 +19,8 @@ The following defaults are known to work with RabbitMQ: ...@@ -19,6 +19,8 @@ The following defaults are known to work with RabbitMQ:
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## AMQP queue name ## AMQP queue name
queue = "telegraf" queue = "telegraf"
## Binding Key ## Binding Key
......
...@@ -20,6 +20,8 @@ type AMQPConsumer struct { ...@@ -20,6 +20,8 @@ type AMQPConsumer struct {
URL string URL string
// AMQP exchange // AMQP exchange
Exchange string Exchange string
// Exchange passive mode
ExchangePassive bool
// Queue Name // Queue Name
Queue string Queue string
// Binding Key // Binding Key
...@@ -58,6 +60,8 @@ func (a *AMQPConsumer) SampleConfig() string { ...@@ -58,6 +60,8 @@ func (a *AMQPConsumer) SampleConfig() string {
url = "amqp://localhost:5672/influxdb" url = "amqp://localhost:5672/influxdb"
## AMQP exchange ## AMQP exchange
exchange = "telegraf" exchange = "telegraf"
## Exchange passive mode
exchange_passive = false
## AMQP queue name ## AMQP queue name
queue = "telegraf" queue = "telegraf"
## Binding Key ## Binding Key
...@@ -174,15 +178,27 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err ...@@ -174,15 +178,27 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, fmt.Errorf("Failed to open a channel: %s", err) return nil, fmt.Errorf("Failed to open a channel: %s", err)
} }
err = ch.ExchangeDeclare( if a.ExchangePassive == true {
a.Exchange, // name err = ch.ExchangeDeclarePassive(
"topic", // type a.Exchange, // name
true, // durable "topic", // type
false, // auto-deleted true, // durable
false, // internal false, // auto-deleted
false, // no-wait false, // internal
nil, // arguments false, // no-wait
) nil, // arguments
)
} else {
err = ch.ExchangeDeclare(
a.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to declare an exchange: %s", err) return nil, fmt.Errorf("Failed to declare an exchange: %s", err)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment