diff --git a/Godeps b/Godeps
index 5cdfecbe72c7d57b097619a9e427b3e1f7924b29..0b9a1672761dd61b6b41f5dc2b7336b16e0624ea 100644
--- a/Godeps
+++ b/Godeps
@@ -28,6 +28,7 @@ github.com/matttproud/golang_protobuf_extensions d0c3fe89de86839aecf2e0579c40ba3
 github.com/mreiferson/go-snappystream 028eae7ab5c4c9e2d1cb4c4ca1e53259bbe7e504
 github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b
 github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9
+github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df
 github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f
 github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988
 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index e7329b042d99004b26ccfb0e8ea05ea1558af524..794885129f252c21228a0acb38dc1fd2aa34b0bd 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -22,6 +22,7 @@ import (
 	_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
 	_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
 	_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
+	_ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer"
 	_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
 	_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
 	_ "github.com/influxdata/telegraf/plugins/inputs/passenger"
diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..f3b67c9d54a7f00fad41943cbc3946ca6b6e43ff
--- /dev/null
+++ b/plugins/inputs/nats_consumer/README.md
@@ -0,0 +1,38 @@
+# NATS Consumer
+
+The [NATS](http://www.nats.io/about/) consumer plugin reads from 
+specified NATS subjects and adds messages to InfluxDB. The plugin expects messages
+in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). 
+A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/)
+is used when subscribing to subjects so multiple instances of telegraf can read
+from a NATS cluster in parallel.
+
+## Configuration
+```
+# Read metrics from NATS subject(s)
+[[inputs.nats_consumer]]
+  ### urls of NATS servers
+  servers = ["nats://localhost:4222"]
+  ### Use Transport Layer Security
+  secure = false
+  ### subject(s) to consume
+  subjects = ["telegraf"]
+  ### name a queue group
+  queue_group = "telegraf_consumers"
+  ### Maximum number of points to buffer between collection intervals
+  point_buffer = 100000
+  
+  ### Data format to consume. This can be "json", "influx" or "graphite"
+  ### Each data format has it's own unique set of configuration options, read
+  ### more about them here:
+  ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+```
+
+## Testing
+
+To run tests:
+
+```
+go test
+```
\ No newline at end of file
diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go
new file mode 100644
index 0000000000000000000000000000000000000000..4b25fa0a1941fd03e85a02607356ecbf86aeacdb
--- /dev/null
+++ b/plugins/inputs/nats_consumer/nats_consumer.go
@@ -0,0 +1,202 @@
+package natsconsumer
+
+import (
+	"fmt"
+	"log"
+	"sync"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/inputs"
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/nats-io/nats"
+)
+
+type natsError struct {
+	conn *nats.Conn
+	sub  *nats.Subscription
+	err  error
+}
+
+func (e natsError) Error() string {
+	return fmt.Sprintf("%s url:%s id:%s sub:%s queue:%s",
+		e.err.Error(), e.conn.ConnectedUrl(), e.conn.ConnectedServerId(), e.sub.Subject, e.sub.Queue)
+}
+
+type natsConsumer struct {
+	QueueGroup string
+	Subjects   []string
+	Servers    []string
+	Secure     bool
+
+	PointBuffer int
+	parser      parsers.Parser
+
+	sync.Mutex
+	Conn *nats.Conn
+	Subs []*nats.Subscription
+
+	// channel for all incoming NATS messages
+	in chan *nats.Msg
+	// channel for all NATS read errors
+	errs chan error
+	// channel for all incoming parsed points
+	metricC chan telegraf.Metric
+	done    chan struct{}
+}
+
+var sampleConfig = `
+  ### urls of NATS servers
+  servers = ["nats://localhost:4222"]
+  ### Use Transport Layer Security
+  secure = false
+  ### subject(s) to consume
+  subjects = ["telegraf"]
+  ### name a queue group
+  queue_group = "telegraf_consumers"
+  ### Maximum number of points to buffer between collection intervals
+  point_buffer = 100000
+  
+  ### Data format to consume. This can be "json", "influx" or "graphite"
+  ### Each data format has it's own unique set of configuration options, read
+  ### more about them here:
+  ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md
+  data_format = "influx"
+`
+
+func (n *natsConsumer) SampleConfig() string {
+	return sampleConfig
+}
+
+func (n *natsConsumer) Description() string {
+	return "Read metrics from NATS subject(s)"
+}
+
+func (n *natsConsumer) SetParser(parser parsers.Parser) {
+	n.parser = parser
+}
+
+func (n *natsConsumer) natsErrHandler(c *nats.Conn, s *nats.Subscription, e error) {
+	select {
+	case n.errs <- natsError{conn: c, sub: s, err: e}:
+	default:
+		return
+	}
+}
+
+// Start the nats consumer. Caller must call *natsConsumer.Stop() to clean up.
+func (n *natsConsumer) Start() error {
+	n.Lock()
+	defer n.Unlock()
+
+	var connectErr error
+
+	opts := nats.DefaultOptions
+	opts.Servers = n.Servers
+	opts.Secure = n.Secure
+
+	if n.Conn == nil || n.Conn.IsClosed() {
+		n.Conn, connectErr = opts.Connect()
+		if connectErr != nil {
+			return connectErr
+		}
+
+		// Setup message and error channels
+		n.errs = make(chan error)
+		n.Conn.SetErrorHandler(n.natsErrHandler)
+
+		n.in = make(chan *nats.Msg)
+		for _, subj := range n.Subjects {
+			sub, err := n.Conn.ChanQueueSubscribe(subj, n.QueueGroup, n.in)
+			if err != nil {
+				return err
+			}
+			n.Subs = append(n.Subs, sub)
+		}
+	}
+
+	n.done = make(chan struct{})
+	if n.PointBuffer == 0 {
+		n.PointBuffer = 100000
+	}
+
+	n.metricC = make(chan telegraf.Metric, n.PointBuffer)
+
+	// Start the message reader
+	go n.receiver()
+	log.Printf("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v\n",
+		n.Conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
+
+	return nil
+}
+
+// receiver() reads all incoming messages from NATS, and parses them into
+// influxdb metric points.
+func (n *natsConsumer) receiver() {
+	defer n.clean()
+	for {
+		select {
+		case <-n.done:
+			return
+		case err := <-n.errs:
+			log.Printf("error reading from %s\n", err.Error())
+		case msg := <-n.in:
+			metrics, err := n.parser.Parse(msg.Data)
+			if err != nil {
+				log.Printf("subject: %s, error: %s", msg.Subject, err.Error())
+			}
+
+			for _, metric := range metrics {
+				select {
+				case n.metricC <- metric:
+					continue
+				default:
+					log.Printf("NATS Consumer buffer is full, dropping a metric." +
+						" You may want to increase the point_buffer setting")
+				}
+			}
+
+		}
+	}
+}
+
+func (n *natsConsumer) clean() {
+	n.Lock()
+	defer n.Unlock()
+	close(n.in)
+	close(n.metricC)
+	close(n.errs)
+
+	for _, sub := range n.Subs {
+		if err := sub.Unsubscribe(); err != nil {
+			log.Printf("Error unsubscribing from subject %s in queue %s: %s\n",
+				sub.Subject, sub.Queue, err.Error())
+		}
+	}
+
+	if n.Conn != nil && !n.Conn.IsClosed() {
+		n.Conn.Close()
+	}
+}
+
+func (n *natsConsumer) Stop() {
+	n.Lock()
+	close(n.done)
+	n.Unlock()
+}
+
+func (n *natsConsumer) Gather(acc telegraf.Accumulator) error {
+	n.Lock()
+	defer n.Unlock()
+	npoints := len(n.metricC)
+	for i := 0; i < npoints; i++ {
+		point := <-n.metricC
+		acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
+	}
+	return nil
+}
+
+func init() {
+	inputs.Add("nats_consumer", func() telegraf.Input {
+		return &natsConsumer{}
+	})
+}
diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..50c663cb4a99b36275869e3271c0dc7100e1611f
--- /dev/null
+++ b/plugins/inputs/nats_consumer/nats_consumer_test.go
@@ -0,0 +1,152 @@
+package natsconsumer
+
+import (
+	"testing"
+	"time"
+
+	"github.com/influxdata/telegraf"
+	"github.com/influxdata/telegraf/plugins/parsers"
+	"github.com/influxdata/telegraf/testutil"
+	"github.com/nats-io/nats"
+)
+
+const (
+	testMsg         = "cpu_load_short,host=server01 value=23422.0 1422568543702900257"
+	testMsgGraphite = "cpu.load.short.graphite 23422 1454780029"
+	testMsgJSON     = "{\"a\": 5, \"b\": {\"c\": 6}}\n"
+	invalidMsg      = "cpu_load_short,host=server01 1422568543702900257"
+	pointBuffer     = 5
+)
+
+func newTestNatsConsumer() (*natsConsumer, chan *nats.Msg) {
+	in := make(chan *nats.Msg, pointBuffer)
+	n := &natsConsumer{
+		QueueGroup:  "test",
+		Subjects:    []string{"telegraf"},
+		Servers:     []string{"nats://localhost:4222"},
+		Secure:      false,
+		PointBuffer: pointBuffer,
+		in:          in,
+		errs:        make(chan error, pointBuffer),
+		done:        make(chan struct{}),
+		metricC:     make(chan telegraf.Metric, pointBuffer),
+	}
+	return n, in
+}
+
+// Test that the parser parses NATS messages into points
+func TestRunParser(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewInfluxParser()
+	go n.receiver()
+	in <- natsMsg(testMsg)
+	time.Sleep(time.Millisecond)
+
+	if a := len(n.metricC); a != 1 {
+		t.Errorf("got %v, expected %v", a, 1)
+	}
+}
+
+// Test that the parser ignores invalid messages
+func TestRunParserInvalidMsg(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewInfluxParser()
+	go n.receiver()
+	in <- natsMsg(invalidMsg)
+	time.Sleep(time.Millisecond)
+
+	if a := len(n.metricC); a != 0 {
+		t.Errorf("got %v, expected %v", a, 0)
+	}
+}
+
+// Test that points are dropped when we hit the buffer limit
+func TestRunParserRespectsBuffer(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewInfluxParser()
+	go n.receiver()
+	for i := 0; i < pointBuffer+1; i++ {
+		in <- natsMsg(testMsg)
+	}
+	time.Sleep(time.Millisecond)
+
+	if a := len(n.metricC); a != pointBuffer {
+		t.Errorf("got %v, expected %v", a, pointBuffer)
+	}
+}
+
+// Test that the parser parses nats messages into points
+func TestRunParserAndGather(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewInfluxParser()
+	go n.receiver()
+	in <- natsMsg(testMsg)
+	time.Sleep(time.Millisecond)
+
+	acc := testutil.Accumulator{}
+	n.Gather(&acc)
+
+	if a := len(acc.Metrics); a != 1 {
+		t.Errorf("got %v, expected %v", a, 1)
+	}
+	acc.AssertContainsFields(t, "cpu_load_short",
+		map[string]interface{}{"value": float64(23422)})
+}
+
+// Test that the parser parses nats messages into points
+func TestRunParserAndGatherGraphite(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil)
+	go n.receiver()
+	in <- natsMsg(testMsgGraphite)
+	time.Sleep(time.Millisecond)
+
+	acc := testutil.Accumulator{}
+	n.Gather(&acc)
+
+	if a := len(acc.Metrics); a != 1 {
+		t.Errorf("got %v, expected %v", a, 1)
+	}
+	acc.AssertContainsFields(t, "cpu_load_short_graphite",
+		map[string]interface{}{"value": float64(23422)})
+}
+
+// Test that the parser parses nats messages into points
+func TestRunParserAndGatherJSON(t *testing.T) {
+	n, in := newTestNatsConsumer()
+	defer close(n.done)
+
+	n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
+	go n.receiver()
+	in <- natsMsg(testMsgJSON)
+	time.Sleep(time.Millisecond)
+
+	acc := testutil.Accumulator{}
+	n.Gather(&acc)
+
+	if a := len(acc.Metrics); a != 1 {
+		t.Errorf("got %v, expected %v", a, 1)
+	}
+	acc.AssertContainsFields(t, "nats_json_test",
+		map[string]interface{}{
+			"a":   float64(5),
+			"b_c": float64(6),
+		})
+}
+
+func natsMsg(val string) *nats.Msg {
+	return &nats.Msg{
+		Subject: "telegraf",
+		Data:    []byte(val),
+	}
+}