Ver código fonte

kafka.Consumer

- get rid of useless options
Alexey Kim 1 mês atrás
pai
commit
8e8b2995d2
2 arquivos alterados com 2 adições e 22 exclusões
  1. 1 5
      consumer/consumer.go
  2. 1 17
      kafka.go

+ 1 - 5
consumer/consumer.go

@@ -80,7 +80,7 @@ func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
 	c.handlers = append(c.handlers, handlers...)
 }
 
-func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error {
+func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
 	if c.Subscribed() {
 		return fmt.Errorf("illegal state: already subscribed")
 	}
@@ -112,10 +112,6 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts serv
 		}
 
 		if message != nil {
-			if opts.Counter != nil {
-				opts.Counter.Inc()
-			}
-
 			ch <- message
 		}
 	}

+ 1 - 17
kafka.go

@@ -2,25 +2,9 @@ package service
 
 import (
 	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
-	"github.com/prometheus/client_golang/prometheus"
-	"github.com/prometheus/client_golang/prometheus/promauto"
 )
 
 type ConsumerHandler func(*kafka.Message) error
-type ConsumerOptions struct {
-	Counter prometheus.Counter
-}
-
-func ConsumerOptionsWithCounter() ConsumerOptions {
-	return ConsumerOptions{
-		Counter: promauto.NewCounter(prometheus.CounterOpts{
-			Namespace: "kafka",
-			Subsystem: "consumer",
-			Name:      "received",
-			Help:      "The total number of received messages",
-		}),
-	}
-}
 
 type Consumer interface {
 	Service
@@ -28,7 +12,7 @@ type Consumer interface {
 	SetPause(bool) error
 	Paused() bool
 
-	Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
+	Subscribe([]string, chan *kafka.Message) error
 	Subscribed() bool
 
 	RegisterHandlers(...ConsumerHandler)