From 1402c158b74789132af2b885315d619648003f83 Mon Sep 17 00:00:00 2001
From: Patrick Hemmer <phemmer@users.noreply.github.com>
Date: Fri, 24 Mar 2017 15:03:36 -0400
Subject: [PATCH] remove sleep from tests (#2555)

---
 plugins/inputs/cloudwatch/cloudwatch_test.go  |  5 +-
 .../http_listener/http_listener_test.go       | 47 ++--------
 .../http_response/http_response_test.go       |  2 +-
 .../inputs/kafka_consumer/kafka_consumer.go   |  9 +-
 .../kafka_consumer/kafka_consumer_test.go     | 11 ++-
 plugins/inputs/logparser/logparser_test.go    | 10 +--
 plugins/inputs/mongodb/mongodb_server_test.go |  4 +-
 plugins/inputs/mqtt_consumer/mqtt_consumer.go | 10 +--
 .../mqtt_consumer/mqtt_consumer_test.go       | 15 ++--
 plugins/inputs/nats_consumer/nats_consumer.go |  8 +-
 .../nats_consumer/nats_consumer_test.go       | 20 ++---
 .../socket_listener/socket_listener_test.go   | 49 ++++-------
 plugins/inputs/tail/tail.go                   | 23 +++--
 plugins/inputs/tail/tail_test.go              | 19 ++--
 .../inputs/tcp_listener/tcp_listener_test.go  | 63 ++++++-------
 plugins/inputs/udp_listener/udp_listener.go   | 25 ++++--
 .../inputs/udp_listener/udp_listener_test.go  | 57 +++++++-----
 plugins/outputs/graphite/graphite_test.go     | 48 +++++-----
 plugins/outputs/influxdb/client/udp_test.go   |  1 -
 .../outputs/instrumental/instrumental_test.go | 88 +++++++++----------
 testutil/accumulator.go                       | 26 +++++-
 21 files changed, 261 insertions(+), 279 deletions(-)

diff --git a/plugins/inputs/cloudwatch/cloudwatch_test.go b/plugins/inputs/cloudwatch/cloudwatch_test.go
index f2d58a00..3aaab7d4 100644
--- a/plugins/inputs/cloudwatch/cloudwatch_test.go
+++ b/plugins/inputs/cloudwatch/cloudwatch_test.go
@@ -207,14 +207,13 @@ func TestGenerateStatisticsInputParams(t *testing.T) {
 }
 
 func TestMetricsCacheTimeout(t *testing.T) {
-	ttl, _ := time.ParseDuration("5ms")
 	cache := &MetricCache{
 		Metrics: []*cloudwatch.Metric{},
 		Fetched: time.Now(),
-		TTL:     ttl,
+		TTL:     time.Minute,
 	}
 
 	assert.True(t, cache.IsValid())
-	time.Sleep(ttl)
+	cache.Fetched = time.Now().Add(-time.Minute)
 	assert.False(t, cache.IsValid())
 }
diff --git a/plugins/inputs/http_listener/http_listener_test.go b/plugins/inputs/http_listener/http_listener_test.go
index b5f858fd..7e6fbc8a 100644
--- a/plugins/inputs/http_listener/http_listener_test.go
+++ b/plugins/inputs/http_listener/http_listener_test.go
@@ -6,7 +6,6 @@ import (
 	"net/http"
 	"sync"
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/testutil"
 
@@ -43,14 +42,12 @@ func TestWriteHTTP(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post single message to listener
 	resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsg)))
 	require.NoError(t, err)
 	require.EqualValues(t, 204, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -61,7 +58,7 @@ func TestWriteHTTP(t *testing.T) {
 	require.NoError(t, err)
 	require.EqualValues(t, 204, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(2)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
 	for _, hostTag := range hostTags {
@@ -76,7 +73,7 @@ func TestWriteHTTP(t *testing.T) {
 	require.NoError(t, err)
 	require.EqualValues(t, 400, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(3)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -91,14 +88,12 @@ func TestWriteHTTPNoNewline(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post single message to listener
 	resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(testMsgNoNewline)))
 	require.NoError(t, err)
 	require.EqualValues(t, 204, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -115,8 +110,6 @@ func TestWriteHTTPMaxLineSizeIncrease(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// Post a gigantic metric to the listener and verify that it writes OK this time:
 	resp, err := http.Post("http://localhost:8296/write?db=mydb", "", bytes.NewBuffer([]byte(hugeMetric)))
 	require.NoError(t, err)
@@ -133,8 +126,6 @@ func TestWriteHTTPVerySmallMaxBody(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	resp, err := http.Post("http://localhost:8297/write", "", bytes.NewBuffer([]byte(hugeMetric)))
 	require.NoError(t, err)
 	require.EqualValues(t, 413, resp.StatusCode)
@@ -150,15 +141,13 @@ func TestWriteHTTPVerySmallMaxLineSize(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	resp, err := http.Post("http://localhost:8298/write", "", bytes.NewBuffer([]byte(testMsgs)))
 	require.NoError(t, err)
 	require.EqualValues(t, 204, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
+	acc.Wait(len(hostTags))
 	for _, hostTag := range hostTags {
 		acc.AssertContainsTaggedFields(t, "cpu_load_short",
 			map[string]interface{}{"value": float64(12)},
@@ -177,15 +166,13 @@ func TestWriteHTTPLargeLinesSkipped(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	resp, err := http.Post("http://localhost:8300/write", "", bytes.NewBuffer([]byte(hugeMetric+testMsgs)))
 	require.NoError(t, err)
 	require.EqualValues(t, 400, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 15)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
+	acc.Wait(len(hostTags))
 	for _, hostTag := range hostTags {
 		acc.AssertContainsTaggedFields(t, "cpu_load_short",
 			map[string]interface{}{"value": float64(12)},
@@ -204,8 +191,6 @@ func TestWriteHTTPGzippedData(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	data, err := ioutil.ReadFile("./testdata/testmsgs.gz")
 	require.NoError(t, err)
 
@@ -218,9 +203,9 @@ func TestWriteHTTPGzippedData(t *testing.T) {
 	require.NoError(t, err)
 	require.EqualValues(t, 204, resp.StatusCode)
 
-	time.Sleep(time.Millisecond * 50)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
+	acc.Wait(len(hostTags))
 	for _, hostTag := range hostTags {
 		acc.AssertContainsTaggedFields(t, "cpu_load_short",
 			map[string]interface{}{"value": float64(12)},
@@ -237,8 +222,6 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post many messages to listener
 	var wg sync.WaitGroup
 	for i := 0; i < 10; i++ {
@@ -254,9 +237,9 @@ func TestWriteHTTPHighTraffic(t *testing.T) {
 	}
 
 	wg.Wait()
-	time.Sleep(time.Millisecond * 250)
 	listener.Gather(acc)
 
+	acc.Wait(25000)
 	require.Equal(t, int64(25000), int64(acc.NMetrics()))
 }
 
@@ -267,8 +250,6 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post single message to listener
 	resp, err := http.Post("http://localhost:8186/foobar", "", bytes.NewBuffer([]byte(testMsg)))
 	require.NoError(t, err)
@@ -276,16 +257,12 @@ func TestReceive404ForInvalidEndpoint(t *testing.T) {
 }
 
 func TestWriteHTTPInvalid(t *testing.T) {
-	time.Sleep(time.Millisecond * 250)
-
 	listener := newTestHTTPListener()
 
 	acc := &testutil.Accumulator{}
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post single message to listener
 	resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(badMsg)))
 	require.NoError(t, err)
@@ -293,16 +270,12 @@ func TestWriteHTTPInvalid(t *testing.T) {
 }
 
 func TestWriteHTTPEmpty(t *testing.T) {
-	time.Sleep(time.Millisecond * 250)
-
 	listener := newTestHTTPListener()
 
 	acc := &testutil.Accumulator{}
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post single message to listener
 	resp, err := http.Post("http://localhost:8186/write?db=mydb", "", bytes.NewBuffer([]byte(emptyMsg)))
 	require.NoError(t, err)
@@ -310,16 +283,12 @@ func TestWriteHTTPEmpty(t *testing.T) {
 }
 
 func TestQueryAndPingHTTP(t *testing.T) {
-	time.Sleep(time.Millisecond * 250)
-
 	listener := newTestHTTPListener()
 
 	acc := &testutil.Accumulator{}
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
-
 	// post query to listener
 	resp, err := http.Post("http://localhost:8186/query?db=&q=CREATE+DATABASE+IF+NOT+EXISTS+%22mydb%22", "", nil)
 	require.NoError(t, err)
diff --git a/plugins/inputs/http_response/http_response_test.go b/plugins/inputs/http_response/http_response_test.go
index 236e5d88..b65b1f95 100644
--- a/plugins/inputs/http_response/http_response_test.go
+++ b/plugins/inputs/http_response/http_response_test.go
@@ -329,7 +329,7 @@ func TestTimeout(t *testing.T) {
 		Address:         ts.URL + "/twosecondnap",
 		Body:            "{ 'test': 'data'}",
 		Method:          "GET",
-		ResponseTimeout: internal.Duration{Duration: time.Second * 1},
+		ResponseTimeout: internal.Duration{Duration: time.Millisecond},
 		Headers: map[string]string{
 			"Content-Type": "application/json",
 		},
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go
index f4176edd..6f1f4020 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer.go
@@ -1,6 +1,7 @@
 package kafka_consumer
 
 import (
+	"fmt"
 	"log"
 	"strings"
 	"sync"
@@ -129,13 +130,13 @@ func (k *Kafka) receiver() {
 			return
 		case err := <-k.errs:
 			if err != nil {
-				log.Printf("E! Kafka Consumer Error: %s\n", err)
+				k.acc.AddError(fmt.Errorf("Kafka Consumer Error: %s\n", err))
 			}
 		case msg := <-k.in:
 			metrics, err := k.parser.Parse(msg.Value)
 			if err != nil {
-				log.Printf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
-					string(msg.Value), err.Error())
+				k.acc.AddError(fmt.Errorf("E! Kafka Message Parse Error\nmessage: %s\nerror: %s",
+					string(msg.Value), err.Error()))
 			}
 
 			for _, metric := range metrics {
@@ -158,7 +159,7 @@ func (k *Kafka) Stop() {
 	defer k.Unlock()
 	close(k.done)
 	if err := k.Consumer.Close(); err != nil {
-		log.Printf("E! Error closing kafka consumer: %s\n", err.Error())
+		k.acc.AddError(fmt.Errorf("E! Error closing kafka consumer: %s\n", err.Error()))
 	}
 }
 
diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
index c4936974..e1c24adb 100644
--- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go
+++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go
@@ -2,7 +2,6 @@ package kafka_consumer
 
 import (
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
@@ -43,7 +42,7 @@ func TestRunParser(t *testing.T) {
 	k.parser, _ = parsers.NewInfluxParser()
 	go k.receiver()
 	in <- saramaMsg(testMsg)
-	time.Sleep(time.Millisecond * 5)
+	acc.Wait(1)
 
 	assert.Equal(t, acc.NFields(), 1)
 }
@@ -58,7 +57,7 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	k.parser, _ = parsers.NewInfluxParser()
 	go k.receiver()
 	in <- saramaMsg(invalidMsg)
-	time.Sleep(time.Millisecond * 5)
+	acc.WaitError(1)
 
 	assert.Equal(t, acc.NFields(), 0)
 }
@@ -73,7 +72,7 @@ func TestRunParserAndGather(t *testing.T) {
 	k.parser, _ = parsers.NewInfluxParser()
 	go k.receiver()
 	in <- saramaMsg(testMsg)
-	time.Sleep(time.Millisecond * 5)
+	acc.Wait(1)
 
 	k.Gather(&acc)
 
@@ -92,7 +91,7 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
 	k.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
 	go k.receiver()
 	in <- saramaMsg(testMsgGraphite)
-	time.Sleep(time.Millisecond * 5)
+	acc.Wait(1)
 
 	k.Gather(&acc)
 
@@ -111,7 +110,7 @@ func TestRunParserAndGatherJSON(t *testing.T) {
 	k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
 	go k.receiver()
 	in <- saramaMsg(testMsgJSON)
-	time.Sleep(time.Millisecond * 5)
+	acc.Wait(1)
 
 	k.Gather(&acc)
 
diff --git a/plugins/inputs/logparser/logparser_test.go b/plugins/inputs/logparser/logparser_test.go
index 059bfd26..db9795f2 100644
--- a/plugins/inputs/logparser/logparser_test.go
+++ b/plugins/inputs/logparser/logparser_test.go
@@ -6,7 +6,6 @@ import (
 	"runtime"
 	"strings"
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/testutil"
 
@@ -41,7 +40,6 @@ func TestGrokParseLogFilesNonExistPattern(t *testing.T) {
 	acc := testutil.Accumulator{}
 	assert.Error(t, logparser.Start(&acc))
 
-	time.Sleep(time.Millisecond * 500)
 	logparser.Stop()
 }
 
@@ -61,7 +59,8 @@ func TestGrokParseLogFiles(t *testing.T) {
 	acc := testutil.Accumulator{}
 	assert.NoError(t, logparser.Start(&acc))
 
-	time.Sleep(time.Millisecond * 500)
+	acc.Wait(2)
+
 	logparser.Stop()
 
 	acc.AssertContainsTaggedFields(t, "logparser_grok",
@@ -102,14 +101,13 @@ func TestGrokParseLogFilesAppearLater(t *testing.T) {
 	acc := testutil.Accumulator{}
 	assert.NoError(t, logparser.Start(&acc))
 
-	time.Sleep(time.Millisecond * 500)
 	assert.Equal(t, acc.NFields(), 0)
 
 	os.Symlink(
 		thisdir+"grok/testdata/test_a.log",
 		emptydir+"/test_a.log")
 	assert.NoError(t, logparser.Gather(&acc))
-	time.Sleep(time.Millisecond * 500)
+	acc.Wait(1)
 
 	logparser.Stop()
 
@@ -143,7 +141,7 @@ func TestGrokParseLogFilesOneBad(t *testing.T) {
 	acc.SetDebug(true)
 	assert.NoError(t, logparser.Start(&acc))
 
-	time.Sleep(time.Millisecond * 500)
+	acc.Wait(1)
 	logparser.Stop()
 
 	acc.AssertContainsTaggedFields(t, "logparser_grok",
diff --git a/plugins/inputs/mongodb/mongodb_server_test.go b/plugins/inputs/mongodb/mongodb_server_test.go
index 7ad0f38a..e9d1bae9 100644
--- a/plugins/inputs/mongodb/mongodb_server_test.go
+++ b/plugins/inputs/mongodb/mongodb_server_test.go
@@ -4,7 +4,6 @@ package mongodb
 
 import (
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/testutil"
 	"github.com/stretchr/testify/assert"
@@ -32,12 +31,11 @@ func TestAddDefaultStats(t *testing.T) {
 	err := server.gatherData(&acc, false)
 	require.NoError(t, err)
 
-	time.Sleep(time.Duration(1) * time.Second)
 	// need to call this twice so it can perform the diff
 	err = server.gatherData(&acc, false)
 	require.NoError(t, err)
 
 	for key, _ := range DefaultStats {
-		assert.True(t, acc.HasIntValue(key))
+		assert.True(t, acc.HasIntField("mongodb", key))
 	}
 }
diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
index cfade294..3ea0480b 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
@@ -142,8 +142,8 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
 		subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
 		subscribeToken.Wait()
 		if subscribeToken.Error() != nil {
-			log.Printf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
-				strings.Join(m.Topics[:], ","), subscribeToken.Error())
+			m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
+				strings.Join(m.Topics[:], ","), subscribeToken.Error()))
 		}
 		m.started = true
 	}
@@ -151,7 +151,7 @@ func (m *MQTTConsumer) onConnect(c mqtt.Client) {
 }
 
 func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
-	log.Printf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error())
+	m.acc.AddError(fmt.Errorf("E! MQTT Connection lost\nerror: %s\nMQTT Client will try to reconnect", err.Error()))
 	return
 }
 
@@ -166,8 +166,8 @@ func (m *MQTTConsumer) receiver() {
 			topic := msg.Topic()
 			metrics, err := m.parser.Parse(msg.Payload())
 			if err != nil {
-				log.Printf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
-					string(msg.Payload()), err.Error())
+				m.acc.AddError(fmt.Errorf("E! MQTT Parse Error\nmessage: %s\nerror: %s",
+					string(msg.Payload()), err.Error()))
 			}
 
 			for _, metric := range metrics {
diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
index 2f527619..027e4818 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
@@ -2,7 +2,6 @@ package mqtt_consumer
 
 import (
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
@@ -86,7 +85,7 @@ func TestRunParser(t *testing.T) {
 	n.parser, _ = parsers.NewInfluxParser()
 	go n.receiver()
 	in <- mqttMsg(testMsgNeg)
-	time.Sleep(time.Millisecond * 250)
+	acc.Wait(1)
 
 	if a := acc.NFields(); a != 1 {
 		t.Errorf("got %v, expected %v", a, 1)
@@ -102,7 +101,7 @@ func TestRunParserNegativeNumber(t *testing.T) {
 	n.parser, _ = parsers.NewInfluxParser()
 	go n.receiver()
 	in <- mqttMsg(testMsg)
-	time.Sleep(time.Millisecond * 25)
+	acc.Wait(1)
 
 	if a := acc.NFields(); a != 1 {
 		t.Errorf("got %v, expected %v", a, 1)
@@ -119,11 +118,12 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	n.parser, _ = parsers.NewInfluxParser()
 	go n.receiver()
 	in <- mqttMsg(invalidMsg)
-	time.Sleep(time.Millisecond * 25)
+	acc.WaitError(1)
 
 	if a := acc.NFields(); a != 0 {
 		t.Errorf("got %v, expected %v", a, 0)
 	}
+	assert.Contains(t, acc.Errors[0].Error(), "MQTT Parse Error")
 }
 
 // Test that the parser parses line format messages into metrics
@@ -136,7 +136,7 @@ func TestRunParserAndGather(t *testing.T) {
 	n.parser, _ = parsers.NewInfluxParser()
 	go n.receiver()
 	in <- mqttMsg(testMsg)
-	time.Sleep(time.Millisecond * 25)
+	acc.Wait(1)
 
 	n.Gather(&acc)
 
@@ -154,9 +154,9 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
 	n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
 	go n.receiver()
 	in <- mqttMsg(testMsgGraphite)
-	time.Sleep(time.Millisecond * 25)
 
 	n.Gather(&acc)
+	acc.Wait(1)
 
 	acc.AssertContainsFields(t, "cpu_load_short_graphite",
 		map[string]interface{}{"value": float64(23422)})
@@ -172,10 +172,11 @@ func TestRunParserAndGatherJSON(t *testing.T) {
 	n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
 	go n.receiver()
 	in <- mqttMsg(testMsgJSON)
-	time.Sleep(time.Millisecond * 25)
 
 	n.Gather(&acc)
 
+	acc.Wait(1)
+
 	acc.AssertContainsFields(t, "nats_json_test",
 		map[string]interface{}{
 			"a":   float64(5),
diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go
index cbb85e01..7c9f5394 100644
--- a/plugins/inputs/nats_consumer/nats_consumer.go
+++ b/plugins/inputs/nats_consumer/nats_consumer.go
@@ -162,11 +162,11 @@ func (n *natsConsumer) receiver() {
 		case <-n.done:
 			return
 		case err := <-n.errs:
-			log.Printf("E! error reading from %s\n", err.Error())
+			n.acc.AddError(fmt.Errorf("E! error reading from %s\n", err.Error()))
 		case msg := <-n.in:
 			metrics, err := n.parser.Parse(msg.Data)
 			if err != nil {
-				log.Printf("E! subject: %s, error: %s", msg.Subject, err.Error())
+				n.acc.AddError(fmt.Errorf("E! subject: %s, error: %s", msg.Subject, err.Error()))
 			}
 
 			for _, metric := range metrics {
@@ -179,8 +179,8 @@ func (n *natsConsumer) receiver() {
 func (n *natsConsumer) clean() {
 	for _, sub := range n.Subs {
 		if err := sub.Unsubscribe(); err != nil {
-			log.Printf("E! Error unsubscribing from subject %s in queue %s: %s\n",
-				sub.Subject, sub.Queue, err.Error())
+			n.acc.AddError(fmt.Errorf("E! Error unsubscribing from subject %s in queue %s: %s\n",
+				sub.Subject, sub.Queue, err.Error()))
 		}
 	}
 
diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go
index 2f4d14d7..30ba0d2a 100644
--- a/plugins/inputs/nats_consumer/nats_consumer_test.go
+++ b/plugins/inputs/nats_consumer/nats_consumer_test.go
@@ -2,11 +2,11 @@ package natsconsumer
 
 import (
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
 	"github.com/nats-io/nats"
+	"github.com/stretchr/testify/assert"
 )
 
 const (
@@ -42,11 +42,8 @@ func TestRunParser(t *testing.T) {
 	n.wg.Add(1)
 	go n.receiver()
 	in <- natsMsg(testMsg)
-	time.Sleep(time.Millisecond * 25)
 
-	if acc.NFields() != 1 {
-		t.Errorf("got %v, expected %v", acc.NFields(), 1)
-	}
+	acc.Wait(1)
 }
 
 // Test that the parser ignores invalid messages
@@ -60,11 +57,10 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	n.wg.Add(1)
 	go n.receiver()
 	in <- natsMsg(invalidMsg)
-	time.Sleep(time.Millisecond * 25)
 
-	if acc.NFields() != 0 {
-		t.Errorf("got %v, expected %v", acc.NFields(), 0)
-	}
+	acc.WaitError(1)
+	assert.Contains(t, acc.Errors[0].Error(), "E! subject: telegraf, error:  metric parsing error")
+	assert.EqualValues(t, 0, acc.NMetrics())
 }
 
 // Test that the parser parses line format messages into metrics
@@ -78,10 +74,10 @@ func TestRunParserAndGather(t *testing.T) {
 	n.wg.Add(1)
 	go n.receiver()
 	in <- natsMsg(testMsg)
-	time.Sleep(time.Millisecond * 25)
 
 	n.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(23422)})
 }
@@ -97,10 +93,10 @@ func TestRunParserAndGatherGraphite(t *testing.T) {
 	n.wg.Add(1)
 	go n.receiver()
 	in <- natsMsg(testMsgGraphite)
-	time.Sleep(time.Millisecond * 25)
 
 	n.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "cpu_load_short_graphite",
 		map[string]interface{}{"value": float64(23422)})
 }
@@ -116,10 +112,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
 	n.wg.Add(1)
 	go n.receiver()
 	in <- natsMsg(testMsgJSON)
-	time.Sleep(time.Millisecond * 25)
 
 	n.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "nats_json_test",
 		map[string]interface{}{
 			"a":   float64(5),
diff --git a/plugins/inputs/socket_listener/socket_listener_test.go b/plugins/inputs/socket_listener/socket_listener_test.go
index 6764b6d2..9fa47280 100644
--- a/plugins/inputs/socket_listener/socket_listener_test.go
+++ b/plugins/inputs/socket_listener/socket_listener_test.go
@@ -81,42 +81,25 @@ func testSocketListener(t *testing.T, sl *SocketListener, client net.Conn) {
 
 	acc := sl.Accumulator.(*testutil.Accumulator)
 
+	acc.Wait(3)
 	acc.Lock()
-	if len(acc.Metrics) < 1 {
-		acc.Wait()
-	}
-	require.True(t, len(acc.Metrics) >= 1)
-	m := acc.Metrics[0]
+	m1 := acc.Metrics[0]
+	m2 := acc.Metrics[1]
+	m3 := acc.Metrics[2]
 	acc.Unlock()
 
-	assert.Equal(t, "test", m.Measurement)
-	assert.Equal(t, map[string]string{"foo": "bar"}, m.Tags)
-	assert.Equal(t, map[string]interface{}{"v": int64(1)}, m.Fields)
-	assert.True(t, time.Unix(0, 123456789).Equal(m.Time))
-
-	acc.Lock()
-	if len(acc.Metrics) < 2 {
-		acc.Wait()
-	}
-	require.True(t, len(acc.Metrics) >= 2)
-	m = acc.Metrics[1]
-	acc.Unlock()
+	assert.Equal(t, "test", m1.Measurement)
+	assert.Equal(t, map[string]string{"foo": "bar"}, m1.Tags)
+	assert.Equal(t, map[string]interface{}{"v": int64(1)}, m1.Fields)
+	assert.True(t, time.Unix(0, 123456789).Equal(m1.Time))
 
-	assert.Equal(t, "test", m.Measurement)
-	assert.Equal(t, map[string]string{"foo": "baz"}, m.Tags)
-	assert.Equal(t, map[string]interface{}{"v": int64(2)}, m.Fields)
-	assert.True(t, time.Unix(0, 123456790).Equal(m.Time))
-
-	acc.Lock()
-	if len(acc.Metrics) < 3 {
-		acc.Wait()
-	}
-	require.True(t, len(acc.Metrics) >= 3)
-	m = acc.Metrics[2]
-	acc.Unlock()
+	assert.Equal(t, "test", m2.Measurement)
+	assert.Equal(t, map[string]string{"foo": "baz"}, m2.Tags)
+	assert.Equal(t, map[string]interface{}{"v": int64(2)}, m2.Fields)
+	assert.True(t, time.Unix(0, 123456790).Equal(m2.Time))
 
-	assert.Equal(t, "test", m.Measurement)
-	assert.Equal(t, map[string]string{"foo": "zab"}, m.Tags)
-	assert.Equal(t, map[string]interface{}{"v": int64(3)}, m.Fields)
-	assert.True(t, time.Unix(0, 123456791).Equal(m.Time))
+	assert.Equal(t, "test", m3.Measurement)
+	assert.Equal(t, map[string]string{"foo": "zab"}, m3.Tags)
+	assert.Equal(t, map[string]interface{}{"v": int64(3)}, m3.Fields)
+	assert.True(t, time.Unix(0, 123456791).Equal(m3.Time))
 }
diff --git a/plugins/inputs/tail/tail.go b/plugins/inputs/tail/tail.go
index 508c1e32..0c19f911 100644
--- a/plugins/inputs/tail/tail.go
+++ b/plugins/inputs/tail/tail.go
@@ -2,7 +2,6 @@ package tail
 
 import (
 	"fmt"
-	"log"
 	"sync"
 
 	"github.com/hpcloud/tail"
@@ -86,7 +85,7 @@ func (t *Tail) Start(acc telegraf.Accumulator) error {
 	for _, filepath := range t.Files {
 		g, err := globpath.Compile(filepath)
 		if err != nil {
-			log.Printf("E! Error Glob %s failed to compile, %s", filepath, err)
+			t.acc.AddError(fmt.Errorf("E! Error Glob %s failed to compile, %s", filepath, err))
 		}
 		for file, _ := range g.Match() {
 			tailer, err := tail.TailFile(file,
@@ -124,21 +123,21 @@ func (t *Tail) receiver(tailer *tail.Tail) {
 	var line *tail.Line
 	for line = range tailer.Lines {
 		if line.Err != nil {
-			log.Printf("E! Error tailing file %s, Error: %s\n",
-				tailer.Filename, err)
+			t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
+				tailer.Filename, err))
 			continue
 		}
 		m, err = t.parser.ParseLine(line.Text)
 		if err == nil {
 			t.acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time())
 		} else {
-			log.Printf("E! Malformed log line in %s: [%s], Error: %s\n",
-				tailer.Filename, line.Text, err)
+			t.acc.AddError(fmt.Errorf("E! Malformed log line in %s: [%s], Error: %s\n",
+				tailer.Filename, line.Text, err))
 		}
 	}
 	if err := tailer.Err(); err != nil {
-		log.Printf("E! Error tailing file %s, Error: %s\n",
-			tailer.Filename, err)
+		t.acc.AddError(fmt.Errorf("E! Error tailing file %s, Error: %s\n",
+			tailer.Filename, err))
 	}
 }
 
@@ -146,12 +145,12 @@ func (t *Tail) Stop() {
 	t.Lock()
 	defer t.Unlock()
 
-	for _, t := range t.tailers {
-		err := t.Stop()
+	for _, tailer := range t.tailers {
+		err := tailer.Stop()
 		if err != nil {
-			log.Printf("E! Error stopping tail on file %s\n", t.Filename)
+			t.acc.AddError(fmt.Errorf("E! Error stopping tail on file %s\n", tailer.Filename))
 		}
-		t.Cleanup()
+		tailer.Cleanup()
 	}
 	t.wg.Wait()
 }
diff --git a/plugins/inputs/tail/tail_test.go b/plugins/inputs/tail/tail_test.go
index 31ecfbf3..b927d160 100644
--- a/plugins/inputs/tail/tail_test.go
+++ b/plugins/inputs/tail/tail_test.go
@@ -3,6 +3,7 @@ package tail
 import (
 	"io/ioutil"
 	"os"
+	"runtime"
 	"testing"
 	"time"
 
@@ -30,11 +31,9 @@ func TestTailFromBeginning(t *testing.T) {
 
 	acc := testutil.Accumulator{}
 	require.NoError(t, tt.Start(&acc))
-	time.Sleep(time.Millisecond * 100)
 	require.NoError(t, tt.Gather(&acc))
-	// arbitrary sleep to wait for message to show up
-	time.Sleep(time.Millisecond * 150)
 
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu",
 		map[string]interface{}{
 			"usage_idle": float64(100),
@@ -60,13 +59,19 @@ func TestTailFromEnd(t *testing.T) {
 
 	acc := testutil.Accumulator{}
 	require.NoError(t, tt.Start(&acc))
-	time.Sleep(time.Millisecond * 100)
+	time.Sleep(time.Millisecond * 200) //TODO remove once https://github.com/hpcloud/tail/pull/114 is merged & added to Godeps
+	for _, tailer := range tt.tailers {
+		for n, err := tailer.Tell(); err == nil && n == 0; n, err = tailer.Tell() {
+			// wait for tailer to jump to end
+			runtime.Gosched()
+		}
+	}
 
 	_, err = tmpfile.WriteString("cpu,othertag=foo usage_idle=100\n")
 	require.NoError(t, err)
 	require.NoError(t, tt.Gather(&acc))
-	time.Sleep(time.Millisecond * 50)
 
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu",
 		map[string]interface{}{
 			"usage_idle": float64(100),
@@ -96,7 +101,7 @@ func TestTailBadLine(t *testing.T) {
 	_, err = tmpfile.WriteString("cpu mytag= foo usage_idle= 100\n")
 	require.NoError(t, err)
 	require.NoError(t, tt.Gather(&acc))
-	time.Sleep(time.Millisecond * 50)
 
-	assert.Len(t, acc.Metrics, 0)
+	acc.WaitError(1)
+	assert.Contains(t, acc.Errors[0].Error(), "E! Malformed log line")
 }
diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go
index f7e5784d..27ced791 100644
--- a/plugins/inputs/tcp_listener/tcp_listener_test.go
+++ b/plugins/inputs/tcp_listener/tcp_listener_test.go
@@ -1,10 +1,15 @@
 package tcp_listener
 
 import (
+	"bufio"
+	"bytes"
 	"fmt"
+	"io"
+	"log"
 	"net"
+	"os"
+	"strings"
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
@@ -54,7 +59,6 @@ func BenchmarkTCP(b *testing.B) {
 			panic(err)
 		}
 
-		time.Sleep(time.Millisecond * 25)
 		conn, err := net.Dial("tcp", "127.0.0.1:8198")
 		if err != nil {
 			panic(err)
@@ -62,8 +66,10 @@ func BenchmarkTCP(b *testing.B) {
 		for i := 0; i < 100000; i++ {
 			fmt.Fprintf(conn, testMsg)
 		}
-		// wait for 100,000 metrics to get added to accumulator
-		time.Sleep(time.Millisecond)
+		conn.(*net.TCPConn).CloseWrite()
+		// wait for all 100,000 metrics to be processed
+		buf := []byte{0}
+		conn.Read(buf) // will EOF when completed
 		listener.Stop()
 	}
 }
@@ -81,16 +87,18 @@ func TestHighTrafficTCP(t *testing.T) {
 	err := listener.Start(acc)
 	require.NoError(t, err)
 
-	time.Sleep(time.Millisecond * 25)
 	conn, err := net.Dial("tcp", "127.0.0.1:8199")
 	require.NoError(t, err)
 	for i := 0; i < 100000; i++ {
 		fmt.Fprintf(conn, testMsg)
 	}
-	time.Sleep(time.Millisecond)
+	conn.(*net.TCPConn).CloseWrite()
+	buf := []byte{0}
+	_, err = conn.Read(buf)
+	assert.Equal(t, err, io.EOF)
 	listener.Stop()
 
-	assert.Equal(t, 100000, len(acc.Metrics))
+	assert.Equal(t, 100000, int(acc.NMetrics()))
 }
 
 func TestConnectTCP(t *testing.T) {
@@ -105,13 +113,12 @@ func TestConnectTCP(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
 	conn, err := net.Dial("tcp", "127.0.0.1:8194")
 	require.NoError(t, err)
 
 	// send single message to socket
 	fmt.Fprintf(conn, testMsg)
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -119,7 +126,7 @@ func TestConnectTCP(t *testing.T) {
 
 	// send multiple messages to socket
 	fmt.Fprintf(conn, testMsgs)
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(6)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
 	for _, hostTag := range hostTags {
@@ -143,7 +150,6 @@ func TestConcurrentConns(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
 	_, err := net.Dial("tcp", "127.0.0.1:8195")
 	assert.NoError(t, err)
 	_, err = net.Dial("tcp", "127.0.0.1:8195")
@@ -162,10 +168,8 @@ func TestConcurrentConns(t *testing.T) {
 			" the Telegraf tcp listener configuration.\n",
 		string(buf[:n]))
 
-	_, err = conn.Write([]byte(testMsg))
-	assert.NoError(t, err)
-	time.Sleep(time.Millisecond * 10)
-	assert.Zero(t, acc.NFields())
+	_, err = conn.Read(buf)
+	assert.Equal(t, io.EOF, err)
 }
 
 // Test that MaxTCPConections is respected when max==1
@@ -181,7 +185,6 @@ func TestConcurrentConns1(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
 	_, err := net.Dial("tcp", "127.0.0.1:8196")
 	assert.NoError(t, err)
 
@@ -198,10 +201,8 @@ func TestConcurrentConns1(t *testing.T) {
 			" the Telegraf tcp listener configuration.\n",
 		string(buf[:n]))
 
-	_, err = conn.Write([]byte(testMsg))
-	assert.NoError(t, err)
-	time.Sleep(time.Millisecond * 10)
-	assert.Zero(t, acc.NFields())
+	_, err = conn.Read(buf)
+	assert.Equal(t, io.EOF, err)
 }
 
 // Test that MaxTCPConections is respected
@@ -216,7 +217,6 @@ func TestCloseConcurrentConns(t *testing.T) {
 	acc := &testutil.Accumulator{}
 	require.NoError(t, listener.Start(acc))
 
-	time.Sleep(time.Millisecond * 25)
 	_, err := net.Dial("tcp", "127.0.0.1:8195")
 	assert.NoError(t, err)
 	_, err = net.Dial("tcp", "127.0.0.1:8195")
@@ -238,13 +238,9 @@ func TestRunParser(t *testing.T) {
 	go listener.tcpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
-	if a := acc.NFields(); a != 1 {
-		t.Errorf("got %v, expected %v", a, 1)
-	}
-
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -263,11 +259,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	listener.wg.Add(1)
 	go listener.tcpParser()
 
+	buf := bytes.NewBuffer(nil)
+	log.SetOutput(buf)
+	defer log.SetOutput(os.Stderr)
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 
-	if a := acc.NFields(); a != 0 {
-		t.Errorf("got %v, expected %v", a, 0)
+	scnr := bufio.NewScanner(buf)
+	for scnr.Scan() {
+		if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
+			break
+		}
 	}
 }
 
@@ -284,9 +285,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
 	go listener.tcpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "cpu_load_graphite",
 		map[string]interface{}{"value": float64(12)})
 }
@@ -304,9 +305,9 @@ func TestRunParserJSONMsg(t *testing.T) {
 	go listener.tcpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "udp_json_test",
 		map[string]interface{}{
 			"a":   float64(5),
diff --git a/plugins/inputs/udp_listener/udp_listener.go b/plugins/inputs/udp_listener/udp_listener.go
index 53c6a72f..d0a728b3 100644
--- a/plugins/inputs/udp_listener/udp_listener.go
+++ b/plugins/inputs/udp_listener/udp_listener.go
@@ -1,6 +1,7 @@
 package udp_listener
 
 import (
+	"fmt"
 	"log"
 	"net"
 	"sync"
@@ -107,8 +108,9 @@ func (u *UdpListener) Start(acc telegraf.Accumulator) error {
 	u.in = make(chan []byte, u.AllowedPendingMessages)
 	u.done = make(chan struct{})
 
-	u.wg.Add(2)
-	go u.udpListen()
+	u.udpListen()
+
+	u.wg.Add(1)
 	go u.udpParser()
 
 	log.Printf("I! Started UDP listener service on %s (ReadBuffer: %d)\n", u.ServiceAddress, u.UDPBufferSize)
@@ -126,32 +128,37 @@ func (u *UdpListener) Stop() {
 }
 
 func (u *UdpListener) udpListen() error {
-	defer u.wg.Done()
 	var err error
 
 	address, _ := net.ResolveUDPAddr("udp", u.ServiceAddress)
 	u.listener, err = net.ListenUDP("udp", address)
 
 	if err != nil {
-		log.Fatalf("E! Error: ListenUDP - %s", err)
+		return fmt.Errorf("E! Error: ListenUDP - %s", err)
 	}
 
 	log.Println("I! UDP server listening on: ", u.listener.LocalAddr().String())
 
-	buf := make([]byte, UDP_MAX_PACKET_SIZE)
-
 	if u.UDPBufferSize > 0 {
 		err = u.listener.SetReadBuffer(u.UDPBufferSize) // if we want to move away from OS default
 		if err != nil {
-			log.Printf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
-			return err
+			return fmt.Errorf("E! Failed to set UDP read buffer to %d: %s", u.UDPBufferSize, err)
 		}
 	}
 
+	u.wg.Add(1)
+	go u.udpListenLoop()
+	return nil
+}
+
+func (u *UdpListener) udpListenLoop() {
+	defer u.wg.Done()
+
+	buf := make([]byte, UDP_MAX_PACKET_SIZE)
 	for {
 		select {
 		case <-u.done:
-			return nil
+			return
 		default:
 			u.listener.SetReadDeadline(time.Now().Add(time.Second))
 
diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go
index eefdd593..4d78a1a4 100644
--- a/plugins/inputs/udp_listener/udp_listener_test.go
+++ b/plugins/inputs/udp_listener/udp_listener_test.go
@@ -1,12 +1,16 @@
 package udp_listener
 
 import (
+	"bufio"
+	"bytes"
 	"fmt"
 	"io/ioutil"
 	"log"
 	"net"
+	"os"
+	"runtime"
+	"strings"
 	"testing"
-	"time"
 
 	"github.com/influxdata/telegraf/plugins/parsers"
 	"github.com/influxdata/telegraf/testutil"
@@ -50,22 +54,27 @@ func TestHighTrafficUDP(t *testing.T) {
 	err := listener.Start(acc)
 	require.NoError(t, err)
 
-	time.Sleep(time.Millisecond * 25)
 	conn, err := net.Dial("udp", "127.0.0.1:8126")
 	require.NoError(t, err)
+	mlen := int64(len(testMsgs))
+	var sent int64
 	for i := 0; i < 20000; i++ {
-		// arbitrary, just to give the OS buffer some slack handling the
-		// packet storm.
-		time.Sleep(time.Microsecond)
-		fmt.Fprintf(conn, testMsgs)
+		for sent > listener.BytesRecv.Get()+32000 {
+			// more than 32kb sitting in OS buffer, let it drain
+			runtime.Gosched()
+		}
+		conn.Write([]byte(testMsgs))
+		sent += mlen
+	}
+	for sent > listener.BytesRecv.Get() {
+		runtime.Gosched()
+	}
+	for len(listener.in) > 0 {
+		runtime.Gosched()
 	}
-	time.Sleep(time.Millisecond)
 	listener.Stop()
 
-	// this is not an exact science, since UDP packets can easily get lost or
-	// dropped, but assume that the OS will be able to
-	// handle at least 90% of the sent UDP packets.
-	assert.InDelta(t, 100000, len(acc.Metrics), 10000)
+	assert.Equal(t, uint64(100000), acc.NMetrics())
 }
 
 func TestConnectUDP(t *testing.T) {
@@ -79,13 +88,12 @@ func TestConnectUDP(t *testing.T) {
 	require.NoError(t, listener.Start(acc))
 	defer listener.Stop()
 
-	time.Sleep(time.Millisecond * 25)
 	conn, err := net.Dial("udp", "127.0.0.1:8127")
 	require.NoError(t, err)
 
 	// send single message to socket
 	fmt.Fprintf(conn, testMsg)
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -93,7 +101,7 @@ func TestConnectUDP(t *testing.T) {
 
 	// send multiple messages to socket
 	fmt.Fprintf(conn, testMsgs)
-	time.Sleep(time.Millisecond * 15)
+	acc.Wait(6)
 	hostTags := []string{"server02", "server03",
 		"server04", "server05", "server06"}
 	for _, hostTag := range hostTags {
@@ -118,13 +126,9 @@ func TestRunParser(t *testing.T) {
 	go listener.udpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
-	if a := acc.NFields(); a != 1 {
-		t.Errorf("got %v, expected %v", a, 1)
-	}
-
+	acc.Wait(1)
 	acc.AssertContainsTaggedFields(t, "cpu_load_short",
 		map[string]interface{}{"value": float64(12)},
 		map[string]string{"host": "server01"},
@@ -144,11 +148,16 @@ func TestRunParserInvalidMsg(t *testing.T) {
 	listener.wg.Add(1)
 	go listener.udpParser()
 
+	buf := bytes.NewBuffer(nil)
+	log.SetOutput(buf)
+	defer log.SetOutput(os.Stderr)
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 
-	if a := acc.NFields(); a != 0 {
-		t.Errorf("got %v, expected %v", a, 0)
+	scnr := bufio.NewScanner(buf)
+	for scnr.Scan() {
+		if strings.Contains(scnr.Text(), fmt.Sprintf(malformedwarn, 1)) {
+			break
+		}
 	}
 }
 
@@ -166,9 +175,9 @@ func TestRunParserGraphiteMsg(t *testing.T) {
 	go listener.udpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "cpu_load_graphite",
 		map[string]interface{}{"value": float64(12)})
 }
@@ -187,9 +196,9 @@ func TestRunParserJSONMsg(t *testing.T) {
 	go listener.udpParser()
 
 	in <- testmsg
-	time.Sleep(time.Millisecond * 25)
 	listener.Gather(&acc)
 
+	acc.Wait(1)
 	acc.AssertContainsFields(t, "udp_json_test",
 		map[string]interface{}{
 			"a":   float64(5),
diff --git a/plugins/outputs/graphite/graphite_test.go b/plugins/outputs/graphite/graphite_test.go
index 4f1f2fef..3984728a 100644
--- a/plugins/outputs/graphite/graphite_test.go
+++ b/plugins/outputs/graphite/graphite_test.go
@@ -44,9 +44,7 @@ func TestGraphiteOK(t *testing.T) {
 	// Start TCP server
 	wg.Add(1)
 	t.Log("Starting server")
-	go TCPServer1(t, &wg)
-	// Give the fake graphite TCP server some time to start:
-	time.Sleep(time.Millisecond * 100)
+	TCPServer1(t, &wg)
 
 	// Init plugin
 	g := Graphite{
@@ -88,10 +86,8 @@ func TestGraphiteOK(t *testing.T) {
 	t.Log("Finished Waiting for first data")
 	var wg2 sync.WaitGroup
 	// Start TCP server
-	time.Sleep(time.Millisecond * 100)
 	wg2.Add(1)
-	go TCPServer2(t, &wg2)
-	time.Sleep(time.Millisecond * 100)
+	TCPServer2(t, &wg2)
 	//Write but expect an error, but reconnect
 	g.Write(metrics2)
 	err3 := g.Write(metrics2)
@@ -105,27 +101,31 @@ func TestGraphiteOK(t *testing.T) {
 }
 
 func TCPServer1(t *testing.T, wg *sync.WaitGroup) {
-	defer wg.Done()
 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
-	conn, _ := (tcpServer).Accept()
-	reader := bufio.NewReader(conn)
-	tp := textproto.NewReader(reader)
-	data1, _ := tp.ReadLine()
-	assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
-	conn.Close()
-	tcpServer.Close()
+	go func() {
+		defer wg.Done()
+		conn, _ := (tcpServer).Accept()
+		reader := bufio.NewReader(conn)
+		tp := textproto.NewReader(reader)
+		data1, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
+		conn.Close()
+		tcpServer.Close()
+	}()
 }
 
 func TCPServer2(t *testing.T, wg *sync.WaitGroup) {
-	defer wg.Done()
 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:2003")
-	conn2, _ := (tcpServer).Accept()
-	reader := bufio.NewReader(conn2)
-	tp := textproto.NewReader(reader)
-	data2, _ := tp.ReadLine()
-	assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
-	data3, _ := tp.ReadLine()
-	assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
-	conn2.Close()
-	tcpServer.Close()
+	go func() {
+		defer wg.Done()
+		conn2, _ := (tcpServer).Accept()
+		reader := bufio.NewReader(conn2)
+		tp := textproto.NewReader(reader)
+		data2, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
+		data3, _ := tp.ReadLine()
+		assert.Equal(t, "my.prefix.192_168_0_1.my_measurement 3.14 1289430000", data3)
+		conn2.Close()
+		tcpServer.Close()
+	}()
 }
diff --git a/plugins/outputs/influxdb/client/udp_test.go b/plugins/outputs/influxdb/client/udp_test.go
index 31196ddc..84efe0b2 100644
--- a/plugins/outputs/influxdb/client/udp_test.go
+++ b/plugins/outputs/influxdb/client/udp_test.go
@@ -66,7 +66,6 @@ func TestUDPClient_Write(t *testing.T) {
 	}()
 
 	// test sending simple metric
-	time.Sleep(time.Second)
 	n, err := client.Write([]byte("cpu value=99\n"))
 	assert.Equal(t, n, 13)
 	assert.NoError(t, err)
diff --git a/plugins/outputs/instrumental/instrumental_test.go b/plugins/outputs/instrumental/instrumental_test.go
index d77d8eb0..0d3ce904 100644
--- a/plugins/outputs/instrumental/instrumental_test.go
+++ b/plugins/outputs/instrumental/instrumental_test.go
@@ -16,9 +16,7 @@ import (
 func TestWrite(t *testing.T) {
 	var wg sync.WaitGroup
 	wg.Add(1)
-	go TCPServer(t, &wg)
-	// Give the fake TCP server some time to start:
-	time.Sleep(time.Millisecond * 100)
+	TCPServer(t, &wg)
 
 	i := Instrumental{
 		Host:     "127.0.0.1",
@@ -79,45 +77,47 @@ func TestWrite(t *testing.T) {
 
 func TCPServer(t *testing.T, wg *sync.WaitGroup) {
 	tcpServer, _ := net.Listen("tcp", "127.0.0.1:8000")
-	defer wg.Done()
-	conn, _ := tcpServer.Accept()
-	conn.SetDeadline(time.Now().Add(1 * time.Second))
-	reader := bufio.NewReader(conn)
-	tp := textproto.NewReader(reader)
-
-	hello, _ := tp.ReadLine()
-	assert.Equal(t, "hello version go/telegraf/1.1", hello)
-	auth, _ := tp.ReadLine()
-	assert.Equal(t, "authenticate abc123token", auth)
-	conn.Write([]byte("ok\nok\n"))
-
-	data1, _ := tp.ReadLine()
-	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
-	data2, _ := tp.ReadLine()
-	assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
-
-	conn, _ = tcpServer.Accept()
-	conn.SetDeadline(time.Now().Add(1 * time.Second))
-	reader = bufio.NewReader(conn)
-	tp = textproto.NewReader(reader)
-
-	hello, _ = tp.ReadLine()
-	assert.Equal(t, "hello version go/telegraf/1.1", hello)
-	auth, _ = tp.ReadLine()
-	assert.Equal(t, "authenticate abc123token", auth)
-	conn.Write([]byte("ok\nok\n"))
-
-	data3, _ := tp.ReadLine()
-	assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
-
-	data4, _ := tp.ReadLine()
-	assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4)
-
-	data5, _ := tp.ReadLine()
-	assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5)
-
-	data6, _ := tp.ReadLine()
-	assert.Equal(t, "", data6)
-
-	conn.Close()
+	go func() {
+		defer wg.Done()
+		conn, _ := tcpServer.Accept()
+		conn.SetDeadline(time.Now().Add(1 * time.Second))
+		reader := bufio.NewReader(conn)
+		tp := textproto.NewReader(reader)
+
+		hello, _ := tp.ReadLine()
+		assert.Equal(t, "hello version go/telegraf/1.1", hello)
+		auth, _ := tp.ReadLine()
+		assert.Equal(t, "authenticate abc123token", auth)
+		conn.Write([]byte("ok\nok\n"))
+
+		data1, _ := tp.ReadLine()
+		assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement.myfield 3.14 1289430000", data1)
+		data2, _ := tp.ReadLine()
+		assert.Equal(t, "gauge my.prefix.192_168_0_1.mymeasurement 3.14 1289430000", data2)
+
+		conn, _ = tcpServer.Accept()
+		conn.SetDeadline(time.Now().Add(1 * time.Second))
+		reader = bufio.NewReader(conn)
+		tp = textproto.NewReader(reader)
+
+		hello, _ = tp.ReadLine()
+		assert.Equal(t, "hello version go/telegraf/1.1", hello)
+		auth, _ = tp.ReadLine()
+		assert.Equal(t, "authenticate abc123token", auth)
+		conn.Write([]byte("ok\nok\n"))
+
+		data3, _ := tp.ReadLine()
+		assert.Equal(t, "increment my.prefix.192_168_0_1.my_histogram 3.14 1289430000", data3)
+
+		data4, _ := tp.ReadLine()
+		assert.Equal(t, "increment my.prefix.192_168_0_1_8888_123.bad_metric_name 1 1289430000", data4)
+
+		data5, _ := tp.ReadLine()
+		assert.Equal(t, "increment my.prefix.192_168_0_1.my_counter 3.14 1289430000", data5)
+
+		data6, _ := tp.ReadLine()
+		assert.Equal(t, "", data6)
+
+		conn.Close()
+	}()
 }
diff --git a/testutil/accumulator.go b/testutil/accumulator.go
index 63dfddd7..02bebf9c 100644
--- a/testutil/accumulator.go
+++ b/testutil/accumulator.go
@@ -129,6 +129,9 @@ func (a *Accumulator) AddError(err error) {
 	}
 	a.Lock()
 	a.Errors = append(a.Errors, err)
+	if a.Cond != nil {
+		a.Cond.Broadcast()
+	}
 	a.Unlock()
 }
 
@@ -198,13 +201,28 @@ func (a *Accumulator) NFields() int {
 	return counter
 }
 
-// Wait waits for a metric to be added to the accumulator.
-// Accumulator must already be locked.
-func (a *Accumulator) Wait() {
+// Wait waits for the given number of metrics to be added to the accumulator.
+func (a *Accumulator) Wait(n int) {
+	a.Lock()
+	if a.Cond == nil {
+		a.Cond = sync.NewCond(&a.Mutex)
+	}
+	for int(a.NMetrics()) < n {
+		a.Cond.Wait()
+	}
+	a.Unlock()
+}
+
+// WaitError waits for the given number of errors to be added to the accumulator.
+func (a *Accumulator) WaitError(n int) {
+	a.Lock()
 	if a.Cond == nil {
 		a.Cond = sync.NewCond(&a.Mutex)
 	}
-	a.Cond.Wait()
+	for len(a.Errors) < n {
+		a.Cond.Wait()
+	}
+	a.Unlock()
 }
 
 func (a *Accumulator) AssertContainsTaggedFields(
-- 
GitLab