Jelajahi Sumber

[draft] Kafka consumer

- add ability to pause
Alexey Kim 1 tahun lalu
induk
melakukan
e60a7ca9c0
1 mengubah file dengan 4 tambahan dan 4 penghapusan
  1. 4 4
      consumer/consumer.go

+ 4 - 4
consumer/consumer.go

@@ -91,6 +91,10 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 
 	c.state.Set(flagSubscribed)
 	for c.Subscribed() {
+		for c.Paused() {
+			time.Sleep(100 * time.Millisecond)
+		}
+
 		message, err := c.poll()
 		if err != nil {
 			// silently wait for a next message
@@ -114,10 +118,6 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 
 			ch <- message
 		}
-
-		for c.Paused() {
-			time.Sleep(100 * time.Millisecond)
-		}
 	}
 
 	log.Debug().Msg("consumer closed gracefully")