diff --git a/CHANGELOG.md b/CHANGELOG.md
index bed972e5c1d44d8d4883e99bac202c25921c1185..ce2a883e172efe0fcb6b088aa9e1955ed93511aa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -32,6 +32,7 @@ should now look like:
 - [#1247](https://github.com/influxdata/telegraf/pull/1247): rollbar webhook plugin.
 - [#1402](https://github.com/influxdata/telegraf/pull/1402): docker-machine/boot2docker no longer required for unit tests.
 - [#1350](https://github.com/influxdata/telegraf/pull/1350): cgroup input plugin.
+- [#1369](https://github.com/influxdata/telegraf/pull/1369): Add input plugin for consuming metrics from NSQD.
 
 ### Bugfixes
 
diff --git a/README.md b/README.md
index 425e7d701e57f15af25db46c65322a1ee5a43444..53e6725347515ee32588597daafeafefebf2da6f 100644
--- a/README.md
+++ b/README.md
@@ -220,6 +220,9 @@ Telegraf can also collect metrics via the following service plugins:
 * [webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks)
   * [github](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/github)
   * [rollbar](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/webhooks/rollbar)
+* [nsq_consumer](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/nsq_consumer)
+* [github_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/github_webhooks)
+* [rollbar_webhooks](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/rollbar_webhooks)
 
 We'll be adding support for many more over the coming months. Read on if you
 want to add support for another service or third-party API.
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index 512753b7aa9c3121faf076a02e2c295ec0b6bd42..529a13baedb917683f065f6de7f63d24ee7e2251 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -41,6 +41,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/net_response"
 	_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
 	_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
+	_ "github.com/influxdata/telegraf/plugins/inputs/nsq_consumer"
 	_ "github.com/influxdata/telegraf/plugins/inputs/nstat"
 	_ "github.com/influxdata/telegraf/plugins/inputs/ntpq"
 	_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
diff --git a/plugins/inputs/nsq_consumer/README.md b/plugins/inputs/nsq_consumer/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..eac494ccbd05c4f97d8440776b7ee70b56885013
--- /dev/null
+++ b/plugins/inputs/nsq_consumer/README.md
@@ -0,0 +1,25 @@
+# NSQ Consumer Input Plugin
+
+The [NSQ](http://nsq.io/) consumer plugin polls a specified NSQD
+topic and adds messages to InfluxDB. This plugin allows a message to be in any of the supported `data_format` types. 
+
+## Configuration
+
+```toml
+# Read metrics from NSQD topic(s)
+[[inputs.nsq_consumer]]
+  ## An array of NSQD HTTP API endpoints
+  server = "localhost:4150"
+  topic = "telegraf"
+  channel = "consumer"
+  max_in_flight = 100
+
+  ## Data format to consume.
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+```
+
+## Testing
+The `nsq_consumer_test` mocks out the interaction with `NSQD`. It requires no outside dependencies.
diff --git a/plugins/inputs/nsq_consumer/nsq_consumer.go b/plugins/inputs/nsq_consumer/nsq_consumer.go
new file mode 100644
index 0000000000000000000000000000000000000000..b227b7e5029d78d30794dc1f2b26aa0b430582f6
--- /dev/null
+++ b/plugins/inputs/nsq_consumer/nsq_consumer.go
@@ -0,0 +1,99 @@
+package nsq_consumer
+
+import (
+	"log"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/nsqio/go-nsq"
+)
+
+//NSQConsumer represents the configuration of the plugin
+type NSQConsumer struct {
+	Server      string
+	Topic       string
+	Channel     string
+	MaxInFlight int
+	parser      parsers.Parser
+	consumer    *nsq.Consumer
+	acc         telegraf.Accumulator
+}
+
+var sampleConfig = `
+  ## An string representing the NSQD TCP Endpoint
+  server = "localhost:4150"
+  topic = "telegraf"
+  channel = "consumer"
+  max_in_flight = 100
+
+  ## Data format to consume.
+  ## Each data format has it's own unique set of configuration options, read
+  ## more about them here:
+  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+`
+
+func init() {
+	inputs.Add("nsq_consumer", func() telegraf.Input {
+		return &NSQConsumer{}
+	})
+}
+
+// SetParser takes the data_format from the config and finds the right parser for that format
+func (n *NSQConsumer) SetParser(parser parsers.Parser) {
+	n.parser = parser
+}
+
+// SampleConfig returns config values for generating a sample configuration file
+func (n *NSQConsumer) SampleConfig() string {
+	return sampleConfig
+}
+
+// Description prints description string
+func (n *NSQConsumer) Description() string {
+	return "Read NSQ topic for metrics."
+}
+
+// Start pulls data from nsq
+func (n *NSQConsumer) Start(acc telegraf.Accumulator) error {
+	n.acc = acc
+	n.connect()
+	n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) error {
+		metrics, err := n.parser.Parse(message.Body)
+		if err != nil {
+			log.Printf("NSQConsumer Parse Error\nmessage:%s\nerror:%s", string(message.Body), err.Error())
+			return nil
+		}
+		for _, metric := range metrics {
+			n.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
+		}
+		message.Finish()
+		return nil
+	}), n.MaxInFlight)
+	n.consumer.ConnectToNSQD(n.Server)
+	return nil
+}
+
+// Stop processing messages
+func (n *NSQConsumer) Stop() {
+	n.consumer.Stop()
+}
+
+// Gather is a noop
+func (n *NSQConsumer) Gather(acc telegraf.Accumulator) error {
+	return nil
+}
+
+func (n *NSQConsumer) connect() error {
+	if n.consumer == nil {
+		config := nsq.NewConfig()
+		config.MaxInFlight = n.MaxInFlight
+		consumer, err := nsq.NewConsumer(n.Topic, n.Channel, config)
+		if err != nil {
+			return err
+		}
+		n.consumer = consumer
+	}
+	return nil
+}
diff --git a/plugins/inputs/nsq_consumer/nsq_consumer_test.go b/plugins/inputs/nsq_consumer/nsq_consumer_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..59db675a552e10e6493de1c0b16307e6eba39f35
--- /dev/null
+++ b/plugins/inputs/nsq_consumer/nsq_consumer_test.go
@@ -0,0 +1,245 @@
+package nsq_consumer
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
+	"io"
+	"log"
+	"net"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/nsqio/go-nsq"
+	"github.com/stretchr/testify/assert"
+)
+
+// This test is modeled after the kafka consumer integration test
+func TestReadsMetricsFromNSQ(t *testing.T) {
+	msgID := nsq.MessageID{'1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 's', 'd', 'f', 'g', 'h'}
+	msg := nsq.NewMessage(msgID, []byte("cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"))
+
+	script := []instruction{
+		// SUB
+		instruction{0, nsq.FrameTypeResponse, []byte("OK")},
+		// IDENTIFY
+		instruction{0, nsq.FrameTypeResponse, []byte("OK")},
+		instruction{20 * time.Millisecond, nsq.FrameTypeMessage, frameMessage(msg)},
+		// needed to exit test
+		instruction{100 * time.Millisecond, -1, []byte("exit")},
+	}
+
+	addr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:4155")
+	newMockNSQD(script, addr.String())
+
+	consumer := &NSQConsumer{
+		Server:      "127.0.0.1:4155",
+		Topic:       "telegraf",
+		Channel:     "consume",
+		MaxInFlight: 1,
+	}
+
+	p, _ := parsers.NewInfluxParser()
+	consumer.SetParser(p)
+	var acc testutil.Accumulator
+	assert.Equal(t, 0, len(acc.Metrics), "There should not be any points")
+	if err := consumer.Start(&acc); err != nil {
+		t.Fatal(err.Error())
+	} else {
+		defer consumer.Stop()
+	}
+
+	waitForPoint(&acc, t)
+
+	if len(acc.Metrics) == 1 {
+		point := acc.Metrics[0]
+		assert.Equal(t, "cpu_load_short", point.Measurement)
+		assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Fields)
+		assert.Equal(t, map[string]string{
+			"host":      "server01",
+			"direction": "in",
+			"region":    "us-west",
+		}, point.Tags)
+		assert.Equal(t, time.Unix(0, 1422568543702900257).Unix(), point.Time.Unix())
+	} else {
+		t.Errorf("No points found in accumulator, expected 1")
+	}
+
+}
+
+// Waits for the metric that was sent to the kafka broker to arrive at the kafka
+// consumer
+func waitForPoint(acc *testutil.Accumulator, t *testing.T) {
+	// Give the kafka container up to 2 seconds to get the point to the consumer
+	ticker := time.NewTicker(5 * time.Millisecond)
+	defer ticker.Stop()
+	counter := 0
+	for {
+		select {
+		case <-ticker.C:
+			counter++
+			if counter > 1000 {
+				t.Fatal("Waited for 5s, point never arrived to consumer")
+			} else if acc.NFields() == 1 {
+				return
+			}
+		}
+	}
+}
+
+func newMockNSQD(script []instruction, addr string) *mockNSQD {
+	n := &mockNSQD{
+		script:   script,
+		exitChan: make(chan int),
+	}
+
+	tcpListener, err := net.Listen("tcp", addr)
+	if err != nil {
+		log.Fatalf("FATAL: listen (%s) failed - %s", n.tcpAddr.String(), err)
+	}
+	n.tcpListener = tcpListener
+	n.tcpAddr = tcpListener.Addr().(*net.TCPAddr)
+
+	go n.listen()
+
+	return n
+}
+
+// The code below allows us to mock the interactions with nsqd. This is taken from:
+// https://github.com/nsqio/go-nsq/blob/master/mock_test.go
+type instruction struct {
+	delay     time.Duration
+	frameType int32
+	body      []byte
+}
+
+type mockNSQD struct {
+	script      []instruction
+	got         [][]byte
+	tcpAddr     *net.TCPAddr
+	tcpListener net.Listener
+	exitChan    chan int
+}
+
+func (n *mockNSQD) listen() {
+	for {
+		conn, err := n.tcpListener.Accept()
+		if err != nil {
+			break
+		}
+		go n.handle(conn)
+	}
+	close(n.exitChan)
+}
+
+func (n *mockNSQD) handle(conn net.Conn) {
+	var idx int
+	buf := make([]byte, 4)
+	_, err := io.ReadFull(conn, buf)
+	if err != nil {
+		log.Fatalf("ERROR: failed to read protocol version - %s", err)
+	}
+
+	readChan := make(chan []byte)
+	readDoneChan := make(chan int)
+	scriptTime := time.After(n.script[0].delay)
+	rdr := bufio.NewReader(conn)
+
+	go func() {
+		for {
+			line, err := rdr.ReadBytes('\n')
+			if err != nil {
+				return
+			}
+			// trim the '\n'
+			line = line[:len(line)-1]
+			readChan <- line
+			<-readDoneChan
+		}
+	}()
+
+	var rdyCount int
+	for idx < len(n.script) {
+		select {
+		case line := <-readChan:
+			n.got = append(n.got, line)
+			params := bytes.Split(line, []byte(" "))
+			switch {
+			case bytes.Equal(params[0], []byte("IDENTIFY")):
+				l := make([]byte, 4)
+				_, err := io.ReadFull(rdr, l)
+				if err != nil {
+					log.Printf(err.Error())
+					goto exit
+				}
+				size := int32(binary.BigEndian.Uint32(l))
+				b := make([]byte, size)
+				_, err = io.ReadFull(rdr, b)
+				if err != nil {
+					log.Printf(err.Error())
+					goto exit
+				}
+			case bytes.Equal(params[0], []byte("RDY")):
+				rdy, _ := strconv.Atoi(string(params[1]))
+				rdyCount = rdy
+			case bytes.Equal(params[0], []byte("FIN")):
+			case bytes.Equal(params[0], []byte("REQ")):
+			}
+			readDoneChan <- 1
+		case <-scriptTime:
+			inst := n.script[idx]
+			if bytes.Equal(inst.body, []byte("exit")) {
+				goto exit
+			}
+			if inst.frameType == nsq.FrameTypeMessage {
+				if rdyCount == 0 {
+					scriptTime = time.After(n.script[idx+1].delay)
+					continue
+				}
+				rdyCount--
+			}
+			_, err := conn.Write(framedResponse(inst.frameType, inst.body))
+			if err != nil {
+				log.Printf(err.Error())
+				goto exit
+			}
+			scriptTime = time.After(n.script[idx+1].delay)
+			idx++
+		}
+	}
+
+exit:
+	n.tcpListener.Close()
+	conn.Close()
+}
+
+func framedResponse(frameType int32, data []byte) []byte {
+	var w bytes.Buffer
+
+	beBuf := make([]byte, 4)
+	size := uint32(len(data)) + 4
+
+	binary.BigEndian.PutUint32(beBuf, size)
+	_, err := w.Write(beBuf)
+	if err != nil {
+		return nil
+	}
+
+	binary.BigEndian.PutUint32(beBuf, uint32(frameType))
+	_, err = w.Write(beBuf)
+	if err != nil {
+		return nil
+	}
+
+	w.Write(data)
+	return w.Bytes()
+}
+
+func frameMessage(m *nsq.Message) []byte {
+	var b bytes.Buffer
+	m.WriteTo(&b)
+	return b.Bytes()
+}