Skip to content
Snippets Groups Projects
Commit f62e5430 authored by DanKans's avatar DanKans Committed by Daniel Nelson
Browse files

Fix address already in use with webhooks input during reload (#3206)

parent be83c8c8
No related branches found
No related tags found
No related merge requests found
...@@ -3,6 +3,7 @@ package webhooks ...@@ -3,6 +3,7 @@ package webhooks
import ( import (
"fmt" "fmt"
"log" "log"
"net"
"net/http" "net/http"
"reflect" "reflect"
...@@ -33,6 +34,8 @@ type Webhooks struct { ...@@ -33,6 +34,8 @@ type Webhooks struct {
Mandrill *mandrill.MandrillWebhook Mandrill *mandrill.MandrillWebhook
Rollbar *rollbar.RollbarWebhook Rollbar *rollbar.RollbarWebhook
Papertrail *papertrail.PapertrailWebhook Papertrail *papertrail.PapertrailWebhook
srv *http.Server
} }
func NewWebhooks() *Webhooks { func NewWebhooks() *Webhooks {
...@@ -70,19 +73,6 @@ func (wb *Webhooks) Gather(_ telegraf.Accumulator) error { ...@@ -70,19 +73,6 @@ func (wb *Webhooks) Gather(_ telegraf.Accumulator) error {
return nil return nil
} }
func (wb *Webhooks) Listen(acc telegraf.Accumulator) {
r := mux.NewRouter()
for _, webhook := range wb.AvailableWebhooks() {
webhook.Register(r, acc)
}
err := http.ListenAndServe(fmt.Sprintf("%s", wb.ServiceAddress), r)
if err != nil {
acc.AddError(fmt.Errorf("E! Error starting server: %v", err))
}
}
// Looks for fields which implement Webhook interface // Looks for fields which implement Webhook interface
func (wb *Webhooks) AvailableWebhooks() []Webhook { func (wb *Webhooks) AvailableWebhooks() []Webhook {
webhooks := make([]Webhook, 0) webhooks := make([]Webhook, 0)
...@@ -105,11 +95,35 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook { ...@@ -105,11 +95,35 @@ func (wb *Webhooks) AvailableWebhooks() []Webhook {
} }
func (wb *Webhooks) Start(acc telegraf.Accumulator) error { func (wb *Webhooks) Start(acc telegraf.Accumulator) error {
go wb.Listen(acc) r := mux.NewRouter()
for _, webhook := range wb.AvailableWebhooks() {
webhook.Register(r, acc)
}
wb.srv = &http.Server{Handler: r}
ln, err := net.Listen("tcp", fmt.Sprintf("%s", wb.ServiceAddress))
if err != nil {
log.Fatalf("E! Error starting server: %v", err)
return err
}
go func() {
if err := wb.srv.Serve(ln); err != nil {
if err != http.ErrServerClosed {
acc.AddError(fmt.Errorf("E! Error listening: %v", err))
}
}
}()
log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress) log.Printf("I! Started the webhooks service on %s\n", wb.ServiceAddress)
return nil return nil
} }
func (rb *Webhooks) Stop() { func (rb *Webhooks) Stop() {
rb.srv.Close()
log.Println("I! Stopping the Webhooks service") log.Println("I! Stopping the Webhooks service")
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment