Pārlūkot izejas kodu

[draft] Kafka consumer

- add ability to pause
Alexey Kim 1 gadu atpakaļ
vecāks
revīzija
8e19da86fc
2 mainītis faili ar 23 papildinājumiem un 0 dzēšanām
  1. 19 0
      consumer/consumer.go
  2. 4 0
      kafka.go

+ 19 - 0
consumer/consumer.go

@@ -15,6 +15,7 @@ import (
 
 const (
 	flagSubscribed uint32 = iota
+	flagPaused
 )
 
 //goland:noinspection ALL
@@ -61,6 +62,20 @@ func (c *_consumer) Subscribed() bool {
 	return c.state.Contains(flagSubscribed)
 }
 
+func (c *_consumer) Paused() bool {
+	return c.state.Contains(flagPaused)
+}
+
+func (c *_consumer) SetPause(is bool) error {
+	if is {
+		c.state.Set(flagPaused)
+	} else {
+		c.state.Remove(flagPaused)
+	}
+
+	return nil
+}
+
 func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
 	c.handlers = append(c.handlers, handlers...)
 }
@@ -99,6 +114,10 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 
 			ch <- message
 		}
+
+		for c.Paused() {
+			time.Sleep(100 * time.Millisecond)
+		}
 	}
 
 	log.Debug().Msg("consumer closed gracefully")

+ 4 - 0
kafka.go

@@ -25,8 +25,12 @@ func ConsumerOptionsWithCounter() ConsumerOptions {
 type Consumer interface {
 	Service
 
+	SetPause(bool) error
+	Paused() bool
+
 	Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
 	Subscribed() bool
+
 	RegisterHandlers(...ConsumerHandler)
 }