Ver Fonte

Consumer

- add feature `pause`
Alexey Kim há 1 ano atrás
pai
commit
a46618e1f3
2 ficheiros alterados com 11 adições e 2 exclusões
  1. 10 2
      consumer/consumer.go
  2. 1 0
      kafka.go

+ 10 - 2
consumer/consumer.go

@@ -15,6 +15,7 @@ import (
 
 const (
 	flagSubscribed uint32 = iota
+	flagPaused
 )
 
 //goland:noinspection ALL
@@ -61,6 +62,14 @@ func (c *_consumer) Subscribed() bool {
 	return c.state.Contains(flagSubscribed)
 }
 
+func (c *_consumer) SetPause(f bool) {
+	if f {
+		c.state.Set(flagPaused)
+	} else {
+		c.state.Remove(flagPaused)
+	}
+}
+
 func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
 	c.handlers = append(c.handlers, handlers...)
 }
@@ -75,7 +84,7 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 	}
 
 	c.state.Set(flagSubscribed)
-	for c.Subscribed() {
+	for c.Subscribed() && !c.state.Contains(flagPaused) {
 		message, err := c.poll()
 		if err != nil {
 			// silently wait for a next message
@@ -99,7 +108,6 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 
 			ch <- message
 		}
-
 	}
 
 	log.Debug().Msg("consumer closed gracefully")

+ 1 - 0
kafka.go

@@ -28,6 +28,7 @@ type Consumer interface {
 	Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
 	Subscribed() bool
 	RegisterHandlers(...ConsumerHandler)
+	SetPause(bool)
 }
 
 type Producer interface {