diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
index 0cba92eb6050652f41aae9738c72ce91c82d378e..72e0d3c19f3a7e899193ec5f8153615f260d3d01 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
@@ -46,6 +46,8 @@ type MQTTConsumer struct {
 
 	// keep the accumulator internally:
 	acc telegraf.Accumulator
+	
+	started bool false
 }
 
 var sampleConfig = `
@@ -134,16 +136,19 @@ func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
 	return nil
 }
  func onConnect(c *MQTT.Client) {
-	 topics := make(map[string]byte)
-	 for _, topic := range m.Topics {
-		 topics[topic] = byte(m.QoS)
-	 }
-	 subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
-	 subscribeToken.Wait()
-	 if subscribeToken.Error() != nil {
-		 log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
-			 string(m.Topics), err.Error())
-	 }
+ 	if (!m.PersistentSession || !m.started) {
+		topics := make(map[string]byte)
+		for _, topic := range m.Topics {
+			topics[topic] = byte(m.QoS)
+		}
+		subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
+		subscribeToken.Wait()
+		if subscribeToken.Error() != nil {
+			log.Printf("MQTT SUBSCRIBE ERROR\ntopics: %s\nerror: %s",
+				string(m.Topics), err.Error())
+		}
+		m.started = true;
+ 	}
  }
 // receiver() reads all incoming messages from the consumer, and parses them into
 // influxdb metric points.
@@ -178,6 +183,7 @@ func (m *MQTTConsumer) Stop() {
 	defer m.Unlock()
 	close(m.done)
 	m.client.Disconnect(200)
+	m.started = false
 }
 
 func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error {