Browse Source

Kafka Consumer

- add ConsumerOptions
Alexey Kim 1 year ago
parent
commit
d581688a1c
4 changed files with 47 additions and 5 deletions
  1. 5 1
      consumer/consumer.go
  2. 9 1
      go.mod
  3. 16 2
      go.sum
  4. 17 1
      kafka.go

+ 5 - 1
consumer/consumer.go

@@ -65,7 +65,7 @@ func (c *_consumer) RegisterHandlers(handlers ...service.ConsumerHandler) {
 	c.handlers = append(c.handlers, handlers...)
 }
 
-func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
+func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message, opts service.ConsumerOptions) error {
 	if c.Subscribed() {
 		return fmt.Errorf("illegal state: already subscribed")
 	}
@@ -93,6 +93,10 @@ func (c *_consumer) Subscribe(topics []string, ch chan *kafka.Message) error {
 		}
 
 		if message != nil {
+			if opts.Counter != nil {
+				opts.Counter.Inc()
+			}
+
 			ch <- message
 		}
 

+ 9 - 1
go.mod

@@ -7,7 +7,8 @@ require (
 	github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
 	github.com/google/uuid v1.6.0
 	github.com/kelindar/bitmap v1.5.2
-	github.com/rs/zerolog v1.31.0
+	github.com/prometheus/client_golang v1.18.0
+	github.com/rs/zerolog v1.32.0
 	github.com/urfave/cli/v2 v2.27.1
 	gopkg.in/yaml.v3 v3.0.1
 )
@@ -15,6 +16,8 @@ require (
 require (
 	github.com/ClickHouse/ch-go v0.58.2 // indirect
 	github.com/andybalholm/brotli v1.1.0 // indirect
+	github.com/beorn7/perks v1.0.1 // indirect
+	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
 	github.com/go-faster/city v1.0.1 // indirect
 	github.com/go-faster/errors v0.6.1 // indirect
@@ -23,9 +26,13 @@ require (
 	github.com/klauspost/cpuid/v2 v2.2.4 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.19 // indirect
+	github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
 	github.com/paulmach/orb v0.11.1 // indirect
 	github.com/pierrec/lz4/v4 v4.1.18 // indirect
 	github.com/pkg/errors v0.9.1 // indirect
+	github.com/prometheus/client_model v0.5.0 // indirect
+	github.com/prometheus/common v0.45.0 // indirect
+	github.com/prometheus/procfs v0.12.0 // indirect
 	github.com/russross/blackfriday/v2 v2.1.0 // indirect
 	github.com/segmentio/asm v1.2.0 // indirect
 	github.com/shopspring/decimal v1.3.1 // indirect
@@ -35,4 +42,5 @@ require (
 	golang.org/x/sys v0.16.0 // indirect
 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240125205218-1f4bbc51befe // indirect
 	google.golang.org/grpc v1.60.1 // indirect
+	google.golang.org/protobuf v1.32.0 // indirect
 )

+ 16 - 2
go.sum

@@ -12,8 +12,12 @@ github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7
 github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w=
 github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
 github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
 github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
 github.com/containerd/containerd v1.7.11 h1:lfGKw3eU35sjV0aG2eYZTiwFEY1pCzxdzicHP3SZILw=
@@ -87,6 +91,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
 github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
 github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
 github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk=
 github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc=
 github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc=
@@ -113,11 +119,19 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
 github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
+github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
+github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
+github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
+github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
+github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
+github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
+github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
 github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
-github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A=
-github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
+github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
+github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=

+ 17 - 1
kafka.go

@@ -2,14 +2,30 @@ package service
 
 import (
 	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/prometheus/client_golang/prometheus/promauto"
 )
 
 type ConsumerHandler func(*kafka.Message) error
+type ConsumerOptions struct {
+	Counter prometheus.Counter
+}
+
+func ConsumerOptionsWithCounter() ConsumerOptions {
+	return ConsumerOptions{
+		Counter: promauto.NewCounter(prometheus.CounterOpts{
+			Namespace: "kafka",
+			Subsystem: "consumer",
+			Name:      "received",
+			Help:      "The total number of received messages",
+		}),
+	}
+}
 
 type Consumer interface {
 	Service
 
-	Subscribe([]string, chan *kafka.Message) error
+	Subscribe([]string, chan *kafka.Message, ConsumerOptions) error
 	Subscribed() bool
 	RegisterHandlers(...ConsumerHandler)
 }