From 1a407ceaf95fecfb50b94d65bae3a917f8cda1a6 Mon Sep 17 00:00:00 2001
From: Daniel Nelson <daniel@wavesofdawn.com>
Date: Mon, 21 May 2018 11:59:39 -0700
Subject: [PATCH] Add aurora input plugin (#4158)

---
 plugins/inputs/all/all.go            |   1 +
 plugins/inputs/aurora/README.md      |  63 ++++++
 plugins/inputs/aurora/aurora.go      | 280 +++++++++++++++++++++++++++
 plugins/inputs/aurora/aurora_test.go | 259 +++++++++++++++++++++++++
 testutil/accumulator.go              |   7 +
 5 files changed, 610 insertions(+)
 create mode 100644 plugins/inputs/aurora/README.md
 create mode 100644 plugins/inputs/aurora/aurora.go
 create mode 100644 plugins/inputs/aurora/aurora_test.go

diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index bb2316c9..80db99bf 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -4,6 +4,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
 	_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
 	_ "github.com/influxdata/telegraf/plugins/inputs/apache"
+	_ "github.com/influxdata/telegraf/plugins/inputs/aurora"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bcache"
 	_ "github.com/influxdata/telegraf/plugins/inputs/bond"
 	_ "github.com/influxdata/telegraf/plugins/inputs/cassandra"
diff --git a/plugins/inputs/aurora/README.md b/plugins/inputs/aurora/README.md
new file mode 100644
index 00000000..cef7ac6c
--- /dev/null
+++ b/plugins/inputs/aurora/README.md
@@ -0,0 +1,63 @@
+# Aurora Input Plugin
+
+The Aurora Input Plugin gathers metrics from [Apache Aurora](https://aurora.apache.org/) schedulers.
+
+For monitoring recommendations reference [Monitoring your Aurora cluster](https://aurora.apache.org/documentation/latest/operations/monitoring/)
+
+### Configuration:
+
+```toml
+[[inputs.aurora]]
+  ## Schedulers are the base addresses of your Aurora Schedulers
+  schedulers = ["http://127.0.0.1:8081"]
+
+  ## Set of role types to collect metrics from.
+  ##
+  ## The scheduler roles are checked each interval by contacting the
+  ## scheduler nodes; zookeeper is not contacted.
+  # roles = ["leader", "follower"]
+
+  ## Timeout is the max time for total network operations.
+  # timeout = "5s"
+
+  ## Username and password are sent using HTTP Basic Auth.
+  # username = "username"
+  # password = "pa$$word"
+
+  ## Optional TLS Config
+  # tls_ca = "/etc/telegraf/ca.pem"
+  # tls_cert = "/etc/telegraf/cert.pem"
+  # tls_key = "/etc/telegraf/key.pem"
+  ## Use TLS but skip chain & host verification
+  # insecure_skip_verify = false
+```
+
+### Metrics:
+
+- aurora
+  - tags:
+    - scheduler (URL of scheduler)
+    - role (leader or follower)
+  - fields:
+    - Numeric metrics are collected from the `/vars` endpoint; string fields
+      are not gathered.
+
+
+### Troubleshooting:
+
+Check the Scheduler role, the leader will return a 200 status:
+```
+curl -v http://127.0.0.1:8081/leaderhealth
+```
+
+Get available metrics:
+```
+curl http://127.0.0.1:8081/vars
+```
+
+### Example Output:
+
+The example output below has been trimmed.
+```
+> aurora,role=leader,scheduler=http://debian-stretch-aurora-coordinator-3.virt:8081 CronBatchWorker_batch_locked_events=0i,CronBatchWorker_batch_locked_events_per_sec=0,CronBatchWorker_batch_locked_nanos_per_event=0,CronBatchWorker_batch_locked_nanos_total=0i,CronBatchWorker_batch_locked_nanos_total_per_sec=0,CronBatchWorker_batch_unlocked_events=0i,CronBatchWorker_batch_unlocked_events_per_sec=0,CronBatchWorker_batch_unlocked_nanos_per_event=0,CronBatchWorker_batch_unlocked_nanos_total=0i,CronBatchWorker_batch_unlocked_nanos_total_per_sec=0,CronBatchWorker_batches_processed=0i,CronBatchWorker_items_processed=0i,CronBatchWorker_last_processed_batch_size=0i,CronBatchWorker_queue_size=0i,TaskEventBatchWorker_batch_locked_events=0i,TaskEventBatchWorker_batch_locked_events_per_sec=0,TaskEventBatchWorker_batch_locked_nanos_per_event=0,TaskEventBatchWorker_batch_locked_nanos_total=0i,TaskEventBatchWorker_batch_locked_nanos_total_per_sec=0,TaskEventBatchWorker_batch_unlocked_events=0i,TaskEventBatchWorker_batch_unlocked_events_per_sec=0,TaskEventBatchWorker_batch_unlocked_nanos_per_event=0,TaskEventBatchWorker_batch_unlocked_nanos_total=0i,TaskEventBatchWorker_batch_unlocked_nanos_total_per_sec=0,TaskEventBatchWorker_batches_processed=0i,TaskEventBatchWorker_items_processed=0i,TaskEventBatchWorker_last_processed_batch_size=0i,TaskEventBatchWorker_queue_size=0i,TaskGroupBatchWorker_batch_locked_events=0i,TaskGroupBatchWorker_batch_locked_events_per_sec=0,TaskGroupBatchWorker_batch_locked_nanos_per_event=0,TaskGroupBatchWorker_batch_locked_nanos_total=0i,TaskGroupBatchWorker_batch_locked_nanos_total_per_sec=0,TaskGroupBatchWorker_batch_unlocked_events=0i,TaskGroupBatchWorker_batch_unlocked_events_per_sec=0,TaskGroupBatchWorker_batch_unlocked_nanos_per_event=0,TaskGroupBatchWorker_batch_unlocked_nanos_total=0i,TaskGroupBatchWorker_batch_unlocked_nanos_total_per_sec=0,TaskGroupBatchWorker_batches_processed=0i,TaskGroupBatchWorker_items_processed=0i,TaskGroupBatchWorker_last_processed_batch_size=0i,TaskGroupBatchWorker_queue_size=0i,assigner_launch_failures=0i,async_executor_uncaught_exceptions=0i,async_tasks_completed=1i,cron_job_collisions=0i,cron_job_concurrent_runs=0i,cron_job_launch_failures=0i,cron_job_misfires=0i,cron_job_parse_failures=0i,cron_job_triggers=0i,cron_jobs_loaded=1i,empty_slots_dedicated_large=0i,empty_slots_dedicated_medium=0i,empty_slots_dedicated_revocable_large=0i,empty_slots_dedicated_revocable_medium=0i,empty_slots_dedicated_revocable_small=0i,empty_slots_dedicated_revocable_xlarge=0i,empty_slots_dedicated_small=0i,empty_slots_dedicated_xlarge=0i,empty_slots_large=0i,empty_slots_medium=0i,empty_slots_revocable_large=0i,empty_slots_revocable_medium=0i,empty_slots_revocable_small=0i,empty_slots_revocable_xlarge=0i,empty_slots_small=0i,empty_slots_xlarge=0i,event_bus_dead_events=0i,event_bus_exceptions=1i,framework_registered=1i,globally_banned_offers_size=0i,http_200_responses_events=55i,http_200_responses_events_per_sec=0,http_200_responses_nanos_per_event=0,http_200_responses_nanos_total=310416694i,http_200_responses_nanos_total_per_sec=0,job_update_delete_errors=0i,job_update_recovery_errors=0i,job_update_state_change_errors=0i,job_update_store_delete_all_events=1i,job_update_store_delete_all_events_per_sec=0,job_update_store_delete_all_nanos_per_event=0,job_update_store_delete_all_nanos_total=1227254i,job_update_store_delete_all_nanos_total_per_sec=0,job_update_store_fetch_details_query_events=74i,job_update_store_fetch_details_query_events_per_sec=0,job_update_store_fetch_details_query_nanos_per_event=0,job_update_store_fetch_details_query_nanos_total=24643149i,job_update_store_fetch_details_query_nanos_total_per_sec=0,job_update_store_prune_history_events=59i,job_update_store_prune_history_events_per_sec=0,job_update_store_prune_history_nanos_per_event=0,job_update_store_prune_history_nanos_total=262868218i,job_update_store_prune_history_nanos_total_per_sec=0,job_updates_pruned=0i,jvm_available_processors=2i,jvm_class_loaded_count=6707i,jvm_class_total_loaded_count=6732i,jvm_class_unloaded_count=25i,jvm_gc_PS_MarkSweep_collection_count=2i,jvm_gc_PS_MarkSweep_collection_time_ms=223i,jvm_gc_PS_Scavenge_collection_count=27i,jvm_gc_PS_Scavenge_collection_time_ms=1691i,jvm_gc_collection_count=29i,jvm_gc_collection_time_ms=1914i,jvm_memory_free_mb=65i,jvm_memory_heap_mb_committed=157i,jvm_memory_heap_mb_max=446i,jvm_memory_heap_mb_used=91i,jvm_memory_max_mb=446i,jvm_memory_mb_total=157i,jvm_memory_non_heap_mb_committed=50i,jvm_memory_non_heap_mb_max=0i,jvm_memory_non_heap_mb_used=49i,jvm_threads_active=47i,jvm_threads_daemon=28i,jvm_threads_peak=48i,jvm_threads_started=62i,jvm_time_ms=1526530686927i,jvm_uptime_secs=79947i,log_entry_serialize_events=16i,log_entry_serialize_events_per_sec=0,log_entry_serialize_nanos_per_event=0,log_entry_serialize_nanos_total=4815321i,log_entry_serialize_nanos_total_per_sec=0,log_manager_append_events=16i,log_manager_append_events_per_sec=0,log_manager_append_nanos_per_event=0,log_manager_append_nanos_total=506453428i,log_manager_append_nanos_total_per_sec=0,log_manager_deflate_events=14i,log_manager_deflate_events_per_sec=0,log_manager_deflate_nanos_per_event=0,log_manager_deflate_nanos_total=21010565i,log_manager_deflate_nanos_total_per_sec=0 1526530687000000000
+```
diff --git a/plugins/inputs/aurora/aurora.go b/plugins/inputs/aurora/aurora.go
new file mode 100644
index 00000000..9a5cafa5
--- /dev/null
+++ b/plugins/inputs/aurora/aurora.go
@@ -0,0 +1,280 @@
+package aurora
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/internal"
+	"github.com/influxdata/telegraf/internal/tls"
+	"github.com/influxdata/telegraf/plugins/inputs"
+)
+
+type RoleType int
+
+const (
+	Unknown RoleType = iota
+	Leader
+	Follower
+)
+
+func (r RoleType) String() string {
+	switch r {
+	case Leader:
+		return "leader"
+	case Follower:
+		return "follower"
+	default:
+		return "unknown"
+	}
+}
+
+var (
+	defaultTimeout = 5 * time.Second
+	defaultRoles   = []string{"leader", "follower"}
+)
+
+type Vars map[string]interface{}
+
+type Aurora struct {
+	Schedulers []string          `toml:"schedulers"`
+	Roles      []string          `toml:"roles"`
+	Timeout    internal.Duration `toml:"timeout"`
+	Username   string            `toml:"username"`
+	Password   string            `toml:"password"`
+	tls.ClientConfig
+
+	client *http.Client
+	urls   []*url.URL
+}
+
+var sampleConfig = `
+  ## Schedulers are the base addresses of your Aurora Schedulers
+  schedulers = ["http://127.0.0.1:8081"]
+
+  ## Set of role types to collect metrics from.
+  ##
+  ## The scheduler roles are checked each interval by contacting the
+  ## scheduler nodes; zookeeper is not contacted.
+  # roles = ["leader", "follower"]
+
+  ## Timeout is the max time for total network operations.
+  # timeout = "5s"
+
+  ## Username and password are sent using HTTP Basic Auth.
+  # username = "username"
+  # password = "pa$$word"
+
+  ## Optional TLS Config
+  # tls_ca = "/etc/telegraf/ca.pem"
+  # tls_cert = "/etc/telegraf/cert.pem"
+  # tls_key = "/etc/telegraf/key.pem"
+  ## Use TLS but skip chain & host verification
+  # insecure_skip_verify = false
+`
+
+func (a *Aurora) SampleConfig() string {
+	return sampleConfig
+}
+
+func (a *Aurora) Description() string {
+	return "Gather metrics from Apache Aurora schedulers"
+}
+
+func (a *Aurora) Gather(acc telegraf.Accumulator) error {
+	if a.client == nil {
+		err := a.initialize()
+		if err != nil {
+			return err
+		}
+	}
+
+	ctx, cancel := context.WithTimeout(context.Background(), a.Timeout.Duration)
+	defer cancel()
+
+	var wg sync.WaitGroup
+	for _, u := range a.urls {
+		wg.Add(1)
+		go func(u *url.URL) {
+			defer wg.Done()
+			role, err := a.gatherRole(ctx, u)
+			if err != nil {
+				acc.AddError(fmt.Errorf("%s: %v", u, err))
+				return
+			}
+
+			if !a.roleEnabled(role) {
+				return
+			}
+
+			err = a.gatherScheduler(ctx, u, role, acc)
+			if err != nil {
+				acc.AddError(fmt.Errorf("%s: %v", u, err))
+			}
+		}(u)
+	}
+	wg.Wait()
+
+	return nil
+}
+
+func (a *Aurora) initialize() error {
+	tlsCfg, err := a.ClientConfig.TLSConfig()
+	if err != nil {
+		return err
+	}
+
+	client := &http.Client{
+		Transport: &http.Transport{
+			Proxy:           http.ProxyFromEnvironment,
+			TLSClientConfig: tlsCfg,
+		},
+	}
+
+	urls := make([]*url.URL, 0, len(a.Schedulers))
+	for _, s := range a.Schedulers {
+		loc, err := url.Parse(s)
+		if err != nil {
+			return err
+		}
+
+		urls = append(urls, loc)
+	}
+
+	if a.Timeout.Duration < time.Second {
+		a.Timeout.Duration = defaultTimeout
+	}
+
+	if len(a.Roles) == 0 {
+		a.Roles = defaultRoles
+	}
+
+	a.client = client
+	a.urls = urls
+	return nil
+}
+
+func (a *Aurora) roleEnabled(role RoleType) bool {
+	if len(a.Roles) == 0 {
+		return true
+	}
+
+	for _, v := range a.Roles {
+		if role.String() == v {
+			return true
+		}
+	}
+	return false
+}
+
+func (a *Aurora) gatherRole(ctx context.Context, origin *url.URL) (RoleType, error) {
+	loc := *origin
+	loc.Path = "leaderhealth"
+	req, err := http.NewRequest("GET", loc.String(), nil)
+	if err != nil {
+		return Unknown, err
+	}
+
+	if a.Username != "" || a.Password != "" {
+		req.SetBasicAuth(a.Username, a.Password)
+	}
+	req.Header.Add("Accept", "text/plain")
+
+	resp, err := a.client.Do(req.WithContext(ctx))
+	if err != nil {
+		return Unknown, err
+	}
+	resp.Body.Close()
+
+	switch resp.StatusCode {
+	case http.StatusOK:
+		return Leader, nil
+	case http.StatusBadGateway:
+		fallthrough
+	case http.StatusServiceUnavailable:
+		return Follower, nil
+	default:
+		return Unknown, fmt.Errorf("%v", resp.Status)
+	}
+}
+
+func (a *Aurora) gatherScheduler(
+	ctx context.Context, origin *url.URL, role RoleType, acc telegraf.Accumulator,
+) error {
+	loc := *origin
+	loc.Path = "vars.json"
+	req, err := http.NewRequest("GET", loc.String(), nil)
+	if err != nil {
+		return err
+	}
+
+	if a.Username != "" || a.Password != "" {
+		req.SetBasicAuth(a.Username, a.Password)
+	}
+	req.Header.Add("Accept", "application/json")
+
+	resp, err := a.client.Do(req.WithContext(ctx))
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("%v", resp.Status)
+	}
+
+	var vars Vars
+	decoder := json.NewDecoder(resp.Body)
+	decoder.UseNumber()
+	err = decoder.Decode(&vars)
+	if err != nil {
+		return fmt.Errorf("decoding response: %v", err)
+	}
+
+	var fields = make(map[string]interface{}, len(vars))
+	for k, v := range vars {
+		switch v := v.(type) {
+		case json.Number:
+			// Aurora encodes numbers as you would specify them as a literal,
+			// use this to determine if a value is a float or int.
+			if strings.ContainsAny(v.String(), ".eE") {
+				fv, err := v.Float64()
+				if err != nil {
+					acc.AddError(err)
+					continue
+				}
+				fields[k] = fv
+			} else {
+				fi, err := v.Int64()
+				if err != nil {
+					acc.AddError(err)
+					continue
+				}
+				fields[k] = fi
+			}
+		default:
+			continue
+		}
+	}
+
+	acc.AddFields("aurora",
+		fields,
+		map[string]string{
+			"scheduler": origin.String(),
+			"role":      role.String(),
+		},
+	)
+	return nil
+}
+
+func init() {
+	inputs.Add("aurora", func() telegraf.Input {
+		return &Aurora{}
+	})
+}
diff --git a/plugins/inputs/aurora/aurora_test.go b/plugins/inputs/aurora/aurora_test.go
new file mode 100644
index 00000000..6e2c004f
--- /dev/null
+++ b/plugins/inputs/aurora/aurora_test.go
@@ -0,0 +1,259 @@
+package aurora
+
+import (
+	"fmt"
+	"net/http"
+	"net/http/httptest"
+	"net/url"
+	"testing"
+
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/stretchr/testify/require"
+)
+
+type (
+	TestHandlerFunc func(t *testing.T, w http.ResponseWriter, r *http.Request)
+	CheckFunc       func(t *testing.T, err error, acc *testutil.Accumulator)
+)
+
+func TestAurora(t *testing.T) {
+	ts := httptest.NewServer(http.NotFoundHandler())
+	defer ts.Close()
+
+	u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
+	require.NoError(t, err)
+
+	tests := []struct {
+		name         string
+		plugin       *Aurora
+		schedulers   []string
+		roles        []string
+		leaderhealth TestHandlerFunc
+		varsjson     TestHandlerFunc
+		check        CheckFunc
+	}{
+		{
+			name: "minimal",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				body := `{
+					"variable_scrape_events": 2958,
+					"variable_scrape_events_per_sec": 1.0,
+					"variable_scrape_micros_per_event": 1484.0,
+					"variable_scrape_micros_total": 4401084,
+					"variable_scrape_micros_total_per_sec": 1485.0
+				}`
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.Equal(t, 1, len(acc.Metrics))
+				acc.AssertContainsTaggedFields(t,
+					"aurora",
+					map[string]interface{}{
+						"variable_scrape_events":               int64(2958),
+						"variable_scrape_events_per_sec":       1.0,
+						"variable_scrape_micros_per_event":     1484.0,
+						"variable_scrape_micros_total":         int64(4401084),
+						"variable_scrape_micros_total_per_sec": 1485.0,
+					},
+					map[string]string{
+						"scheduler": u.String(),
+						"role":      "leader",
+					},
+				)
+			},
+		},
+		{
+			name:  "disabled role",
+			roles: []string{"leader"},
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusServiceUnavailable)
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.NoError(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "no metrics available",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte("{}"))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.NoError(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "string metrics skipped",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				body := `{
+					"foo": "bar"
+				}`
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.NoError(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "float64 unparseable",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				// too large
+				body := `{
+					"foo": 1e309
+				}`
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.Error(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "int64 unparseable",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				// too large
+				body := `{
+					"foo": 9223372036854775808
+				}`
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.Error(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "bad json",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				body := `{]`
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.Error(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+		{
+			name: "wrong status code",
+			leaderhealth: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				w.WriteHeader(http.StatusOK)
+			},
+			varsjson: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				body := `{
+					"value": 42
+				}`
+				w.WriteHeader(http.StatusServiceUnavailable)
+				w.Write([]byte(body))
+			},
+			check: func(t *testing.T, err error, acc *testutil.Accumulator) {
+				require.NoError(t, err)
+				require.Error(t, acc.FirstError())
+				require.Equal(t, 0, len(acc.Metrics))
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				switch r.URL.Path {
+				case "/leaderhealth":
+					tt.leaderhealth(t, w, r)
+				case "/vars.json":
+					tt.varsjson(t, w, r)
+				default:
+					w.WriteHeader(http.StatusNotFound)
+				}
+			})
+
+			var acc testutil.Accumulator
+			plugin := &Aurora{}
+			plugin.Schedulers = []string{u.String()}
+			plugin.Roles = tt.roles
+			err := plugin.Gather(&acc)
+			tt.check(t, err, &acc)
+		})
+	}
+}
+
+func TestBasicAuth(t *testing.T) {
+	ts := httptest.NewServer(http.NotFoundHandler())
+	defer ts.Close()
+
+	u, err := url.Parse(fmt.Sprintf("http://%s", ts.Listener.Addr().String()))
+	require.NoError(t, err)
+
+	tests := []struct {
+		name     string
+		username string
+		password string
+	}{
+		{
+			name: "no auth",
+		},
+		{
+			name:     "basic auth",
+			username: "username",
+			password: "pa$$word",
+		},
+		{
+			name:     "username only",
+			username: "username",
+		},
+		{
+			name:     "password only",
+			password: "pa$$word",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			ts.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				username, password, _ := r.BasicAuth()
+				require.Equal(t, tt.username, username)
+				require.Equal(t, tt.password, password)
+				w.WriteHeader(http.StatusOK)
+				w.Write([]byte("{}"))
+			})
+
+			var acc testutil.Accumulator
+			plugin := &Aurora{}
+			plugin.Schedulers = []string{u.String()}
+			plugin.Username = tt.username
+			plugin.Password = tt.password
+			err := plugin.Gather(&acc)
+			require.NoError(t, err)
+		})
+	}
+}
diff --git a/testutil/accumulator.go b/testutil/accumulator.go
index 03e92538..8784cc1d 100644
--- a/testutil/accumulator.go
+++ b/testutil/accumulator.go
@@ -42,6 +42,13 @@ func (a *Accumulator) NMetrics() uint64 {
 	return atomic.LoadUint64(&a.nMetrics)
 }
 
+func (a *Accumulator) FirstError() error {
+	if len(a.Errors) == 0 {
+		return nil
+	}
+	return a.Errors[0]
+}
+
 func (a *Accumulator) ClearMetrics() {
 	a.Lock()
 	defer a.Unlock()
-- 
GitLab