|
@@ -15,7 +15,6 @@ import (
|
|
|
|
|
|
const (
|
|
const (
|
|
flagSubscribed uint32 = iota
|
|
flagSubscribed uint32 = iota
|
|
- flagPaused
|
|
|
|
)
|
|
)
|
|
|
|
|
|
//goland:noinspection ALL
|
|
//goland:noinspection ALL
|
|
@@ -62,14 +61,6 @@ func (c *_consumer) Subscribed() bool {
|
|
return c.state.Contains(flagSubscribed)
|
|
return c.state.Contains(flagSubscribed)
|
|
}
|
|
}
|
|
|
|
|
|
-func (c *_consumer) SetPause(f bool) {
|
|
|
|
- if f {
|
|
|
|
- c.state.Set(flagPaused)
|
|
|
|
- } else {
|
|
|
|
- c.state.Remove(flagPaused)
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
|
|
func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
|
|
c.handlers = append(c.handlers, handlers...)
|
|
c.handlers = append(c.handlers, handlers...)
|
|
}
|
|
}
|
|
@@ -84,7 +75,7 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
|
|
}
|
|
}
|
|
|
|
|
|
c.state.Set(flagSubscribed)
|
|
c.state.Set(flagSubscribed)
|
|
- for c.Subscribed() && !c.state.Contains(flagPaused) {
|
|
|
|
|
|
+ for c.Subscribed() {
|
|
message, err := c.poll()
|
|
message, err := c.poll()
|
|
if err != nil {
|
|
if err != nil {
|
|
// silently wait for a next message
|
|
// silently wait for a next message
|