Prechádzať zdrojové kódy

Consumer refactoring

Alexey Kim 10 mesiacov pred
rodič
commit
31edef229e
4 zmenil súbory, kde vykonal 131 pridanie a 96 odobranie
  1. 100 64
      consumer/consumer.go
  2. 6 4
      go.mod
  3. 24 22
      go.sum
  4. 1 6
      kafka.go

+ 100 - 64
consumer/consumer.go

@@ -8,22 +8,23 @@ import (
 	"github.com/google/uuid"
 	"github.com/kelindar/bitmap"
 	"github.com/rs/zerolog/log"
+	"github.com/samber/lo"
 	"strings"
 	"sync"
 	"time"
 )
 
 const (
-	flagSubscribed uint32 = iota
-	flagPaused
+	FlagPause uint32 = iota
+	FlagStop
 )
 
 //goland:noinspection ALL
 type _consumer struct {
-	config   *Config
-	state    bitmap.Bitmap
-	handlers []service.ConsumerHandler
-	session  *kafka.Consumer
+	config        *Config
+	handlers      []service.ConsumerHandler
+	session       *kafka.Consumer
+	subscriptions []chan uint32
 }
 
 func New(cfg *Config) (service.Consumer, error) {
@@ -58,77 +59,100 @@ func (c *_consumer) ID() uuid.UUID {
 	return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("consumer.kafka"))
 }
 
-func (c *_consumer) Subscribed() bool {
-	return c.state.Contains(flagSubscribed)
-}
-
-func (c *_consumer) Paused() bool {
-	return c.state.Contains(flagPaused)
-}
-
-func (c *_consumer) SetPause(is bool) error {
-	if is {
-		c.state.Set(flagPaused)
-	} else {
-		c.state.Remove(flagPaused)
-	}
-
-	return nil
-}
-
 func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
 	c.handlers = append(c.handlers, handlers...)
 }
 
-func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
-	if c.Subscribed() {
-		return fmt.Errorf("illegal state: already subscribed")
+func (c *_consumer) Subscribe(topics []string, io chan uint32) (chan *kafka.Message, error) {
+	if topics == nil || len(topics) == 0 {
+		return nil, errors.New("illegal arguments: at least one topic must be provided to subscribe")
 	}
 
-	if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
-		return err
+	var (
+		messages = make(chan *kafka.Message)
+		err      error
+	)
+
+	if err = c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
+		return nil, err
 	}
 
-	c.state.Set(flagSubscribed)
-	for c.Subscribed() {
-		for c.Paused() {
-			time.Sleep(100 * time.Millisecond)
-		}
+	go func() {
+		defer func() {
+			c.subscriptions = lo.Filter(c.subscriptions, func(ch chan uint32, _ int) bool {
+				return io != ch
+			})
+		}()
+
+		var (
+			state   bitmap.Bitmap
+			message *kafka.Message
+		)
+
+		for {
+			select {
+			case s := <-io:
+				switch s {
+				case FlagStop:
+					log.Debug().
+						Str("service", "consumer").
+						Msg("consumer gracefully closed")
+					return
+				case FlagPause:
+					if state.Contains(FlagPause) {
+						state.Remove(FlagPause)
+					} else {
+						state.Set(FlagPause)
+					}
+				}
+
+			default:
+			}
 
-		message, err := c.poll()
-		if err != nil {
-			// silently wait for a next message
-			if errors.Is(err, ErrNoMessage) {
+			if state.Contains(FlagPause) {
+				<-time.After(100 * time.Millisecond)
 				continue
 			}
 
-			log.Debug().
-				Str("service", "consumer").
-				Err(err).
-				Send()
+			message, err = c.poll()
+			if err != nil {
+				// silently wait for a next message
+				if errors.Is(err, ErrNoMessage) {
+					continue
+				}
 
-			c.state.Remove(flagSubscribed)
-			return err
-		}
+				log.Debug().
+					Str("service", "consumer").
+					Err(err).
+					Send()
+
+				messages <- nil
+				return
+			}
 
-		if message != nil {
-			ch <- message
+			if message != nil {
+				messages <- message
+			}
 		}
-	}
+	}()
 
-	log.Debug().
-		Str("service", "consumer").
-		Msg("consumer gracefully closed")
-	return nil
+	c.subscriptions = append(c.subscriptions, io)
+	return messages, nil
 }
 
 func (c *_consumer) Close() error {
-	if c.Subscribed() {
-		c.state.Remove(flagSubscribed)
-	}
-
 	var wg sync.WaitGroup
-	wg.Add(1)
+	wg.Add(2)
+
+	go func() {
+		defer wg.Done()
+
+		for i := range c.subscriptions {
+			if c.subscriptions[i] != nil {
+				c.subscriptions[i] <- FlagStop
+			}
+		}
+	}()
 
 	go func() {
 		defer wg.Done()
@@ -150,6 +174,7 @@ func (c *_consumer) poll() (*kafka.Message, error) {
 	)
 
 	switch e := ev.(type) {
+
 	case *kafka.Message:
 		for i := range c.handlers {
 			if err = c.handlers[i](e); err != nil {
@@ -158,17 +183,28 @@ func (c *_consumer) poll() (*kafka.Message, error) {
 		}
 
 		return e, nil
+
 	case kafka.Error:
-		log.Debug().
-			Str("service", "consumer").
-			Err(e).
-			Send()
+		return nil, e
 
-		if e.Code() == kafka.ErrAllBrokersDown {
-			c.state.Remove(flagSubscribed)
+	case kafka.OffsetsCommitted:
+		if e.Error != nil {
+			return nil, err
 		}
 
-		return nil, e
+		l := log.Debug().Str("service", "consumer")
+		for _, offset := range e.Offsets {
+			if offset.Topic == nil {
+				continue
+			}
+
+			l = l.Str(*offset.Topic, fmt.Sprintf("partition: %d; offset: %s",
+				offset.Partition,
+				offset.Offset.String()))
+		}
+		l.Send()
+		return nil, ErrNoMessage
+
 	default:
 		if e != nil {
 			log.Debug().

+ 6 - 4
go.mod

@@ -3,19 +3,21 @@ module git.beejay.kim/tool/service
 go 1.22.2
 
 require (
-	github.com/ClickHouse/clickhouse-go/v2 v2.27.1
+	github.com/ClickHouse/clickhouse-go/v2 v2.28.1
 	github.com/confluentinc/confluent-kafka-go/v2 v2.5.0
 	github.com/google/uuid v1.6.0
 	github.com/kelindar/bitmap v1.5.2
 	github.com/rs/zerolog v1.33.0
-	github.com/samber/lo v1.46.0
+	github.com/samber/lo v1.47.0
 	github.com/urfave/cli/v2 v2.27.4
+	golang.org/x/net v0.28.0
 	gopkg.in/yaml.v3 v3.0.1
 )
 
 require (
 	github.com/ClickHouse/ch-go v0.61.5 // indirect
 	github.com/andybalholm/brotli v1.1.0 // indirect
+	github.com/containerd/errdefs v0.1.0 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
 	github.com/go-faster/city v1.0.1 // indirect
 	github.com/go-faster/errors v0.7.1 // indirect
@@ -33,6 +35,6 @@ require (
 	github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
 	go.opentelemetry.io/otel v1.26.0 // indirect
 	go.opentelemetry.io/otel/trace v1.26.0 // indirect
-	golang.org/x/sys v0.22.0 // indirect
-	golang.org/x/text v0.16.0 // indirect
+	golang.org/x/sys v0.23.0 // indirect
+	golang.org/x/text v0.17.0 // indirect
 )

+ 24 - 22
go.sum

@@ -8,14 +8,14 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
 github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
 github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4=
 github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg=
-github.com/ClickHouse/clickhouse-go/v2 v2.27.1 h1:cSUewKnQ2XWvCNpCV0WRAQGvShElJ1Qyb6nDq8GId/I=
-github.com/ClickHouse/clickhouse-go/v2 v2.27.1/go.mod h1:XvcaX7ai9T9si83rZ0cB3y2upq9AYMwdj16Trqm+sPg=
+github.com/ClickHouse/clickhouse-go/v2 v2.28.1 h1:tpdOxWZlZ4IYiFWpIteU57JVdWVbSI5OwflofAdhFno=
+github.com/ClickHouse/clickhouse-go/v2 v2.28.1/go.mod h1:0U915l9qynE508ehh3ea9+UMGc7gZlAV+9W6pUZd7kk=
 github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
 github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
 github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
 github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
-github.com/Microsoft/hcsshim v0.11.5 h1:haEcLNpj9Ka1gd3B3tAEs9CpE0c+1IhoL59w/exYU38=
-github.com/Microsoft/hcsshim v0.11.5/go.mod h1:MV8xMfmECjl5HdO7U/3/hFVnkmSBjAjmA09d4bExKcU=
+github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
+github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
 github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
@@ -68,6 +68,8 @@ github.com/containerd/errdefs v0.1.0 h1:m0wCRBiu1WJT/Fr+iOoQHMQS/eP5myQ8lCv4Dz5Z
 github.com/containerd/errdefs v0.1.0/go.mod h1:YgWiiHtLmSeBrvpw+UfPijzbLaB77mEG1WwJTDETIV0=
 github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
 github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
+github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A=
+github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw=
 github.com/containerd/ttrpc v1.2.3 h1:4jlhbXIGvijRtNC8F/5CpuJZ7yKOBFGFOOXg1bkISz0=
 github.com/containerd/ttrpc v1.2.3/go.mod h1:ieWsXucbb8Mj9PH0rXCw1i8IunRbbAiDkpXkbfflWBM=
 github.com/containerd/typeurl/v2 v2.1.1 h1:3Q4Pt7i8nYwy2KmQWIw2+1hTvwTE/6w9FqcttATPO/4=
@@ -90,8 +92,8 @@ github.com/docker/compose/v2 v2.27.0 h1:FKyClQdErCxUZULC2zo6Jn5ve+epFPe/Y0HaxjmU
 github.com/docker/compose/v2 v2.27.0/go.mod h1:uaqwmY6haO8wXWHk+LAsqqDapX6boH4izRKqj/E7+Bo=
 github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk=
 github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
-github.com/docker/docker v27.0.3+incompatible h1:aBGI9TeQ4MPlhquTQKq9XbK79rKFVwXNUAYz9aXyEBE=
-github.com/docker/docker v27.0.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
+github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
+github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
 github.com/docker/docker-credential-helpers v0.8.0 h1:YQFtbBQb4VrpoPxhFuzEBPQ9E16qz5SpHLS+uswaCp8=
 github.com/docker/docker-credential-helpers v0.8.0/go.mod h1:UGFXcuoQ5TxPiB54nHOZ32AWRqQdECoh/Mg0AlEYb40=
 github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c h1:lzqkGL9b3znc+ZUgi7FlLnqjQhcXxkNM/quxIjBVMD0=
@@ -295,8 +297,8 @@ github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
 github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
-github.com/samber/lo v1.46.0 h1:w8G+oaCPgz1PoCJztqymCFaKwXt+5cCXn51uPxExFfQ=
-github.com/samber/lo v1.46.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
+github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc=
+github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
 github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE=
 github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs=
 github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
@@ -324,8 +326,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
 github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
-github.com/testcontainers/testcontainers-go v0.32.0 h1:ug1aK08L3gCHdhknlTTwWjPHPS+/alvLJU/DRxTD/ME=
-github.com/testcontainers/testcontainers-go v0.32.0/go.mod h1:CRHrzHLQhlXUsa5gXjTOfqIEJcrK5+xMDmBr/WMI88E=
+github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw=
+github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8=
 github.com/testcontainers/testcontainers-go/modules/compose v0.31.0 h1:H74o3HisnApIDQx7sWibGzOl/Oo0By8DjyVeUf3qd6I=
 github.com/testcontainers/testcontainers-go/modules/compose v0.31.0/go.mod h1:z1JAsvL2/pNFy40yJX0VX9Yk+hzOCIO5DydxBJHBbCY=
 github.com/theupdateframework/notary v0.7.0 h1:QyagRZ7wlSpjT5N2qQAh/pN+DVqgekv4DzbAiAiEL3c=
@@ -400,8 +402,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
-golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
+golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
+golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
 golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@@ -411,16 +413,16 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
-golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
+golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
+golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
 golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ=
 golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
-golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
+golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
+golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -431,17 +433,17 @@ golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
-golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
+golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
-golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
+golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
+golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
-golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
+golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
+golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
 golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
 golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

+ 1 - 6
kafka.go

@@ -9,12 +9,7 @@ type ConsumerHandler func(*kafka.Message) error
 type Consumer interface {
 	Service
 
-	SetPause(bool) error
-	Paused() bool
-
-	Subscribe([]string, chan *kafka.Message) error
-	Subscribed() bool
-
+	Subscribe([]string, chan uint32) (chan *kafka.Message, error)
 	RegisterHandlers(...ConsumerHandler)
 }