Browse Source

Kafka

- add consumer
Alexey Kim 8 months ago
parent
commit
24b668d628
7 changed files with 201 additions and 11 deletions
  1. 4 3
      go.mod
  2. 10 6
      go.sum
  3. 11 1
      kafka/config.go
  4. 110 0
      kafka/consumer.go
  5. 11 0
      kafka/iconsumer.go
  6. 1 1
      kafka/producer.go
  7. 54 0
      kafka/rebalance.go

+ 4 - 3
go.mod

@@ -3,7 +3,7 @@ module git.beejay.kim/Craft/Api
 go 1.20
 
 require (
-	github.com/ClickHouse/clickhouse-go/v2 v2.13.3
+	github.com/ClickHouse/clickhouse-go/v2 v2.13.4
 	github.com/Nerzal/gocloak/v13 v13.8.0
 	github.com/confluentinc/confluent-kafka-go/v2 v2.2.0
 	github.com/gofrs/uuid v4.4.0+incompatible
@@ -13,6 +13,7 @@ require (
 	github.com/labstack/echo-jwt/v4 v4.2.0
 	github.com/labstack/echo/v4 v4.11.1
 	github.com/mailru/easyjson v0.7.7
+	github.com/rs/zerolog v1.30.0
 	github.com/samber/lo v1.38.1
 	github.com/urfave/cli/v2 v2.25.7
 	gopkg.in/yaml.v3 v3.0.1
@@ -43,8 +44,8 @@ require (
 	github.com/valyala/bytebufferpool v1.0.0 // indirect
 	github.com/valyala/fasttemplate v1.2.2 // indirect
 	github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
-	go.opentelemetry.io/otel v1.16.0 // indirect
-	go.opentelemetry.io/otel/trace v1.16.0 // indirect
+	go.opentelemetry.io/otel v1.17.0 // indirect
+	go.opentelemetry.io/otel/trace v1.17.0 // indirect
 	golang.org/x/crypto v0.12.0 // indirect
 	golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
 	golang.org/x/net v0.14.0 // indirect

+ 10 - 6
go.sum

@@ -607,8 +607,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/ClickHouse/ch-go v0.58.2 h1:jSm2szHbT9MCAB1rJ3WuCJqmGLi5UTjlNu+f530UTS0=
 github.com/ClickHouse/ch-go v0.58.2/go.mod h1:Ap/0bEmiLa14gYjCiRkYGbXvbe8vwdrfTYWhsuQ99aw=
-github.com/ClickHouse/clickhouse-go/v2 v2.13.3 h1:/esk41SjVLIDQs2rkOmRKXJ1FIFArIJiX6sYG0DUavE=
-github.com/ClickHouse/clickhouse-go/v2 v2.13.3/go.mod h1:yoCB//XLqbyqaYvXzdbIdmMafOSomU3erh3r06NLCZU=
+github.com/ClickHouse/clickhouse-go/v2 v2.13.4 h1:NcvYN9ONZn3vlPMfQVUBSG5LKz+1y2wk4vaaz5QZXIg=
+github.com/ClickHouse/clickhouse-go/v2 v2.13.4/go.mod h1:u1AUh8E0XqN1sU1EDzbiGLTI4KWOd+lOHimNSsdyJec=
 github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
 github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
 github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
@@ -856,6 +856,7 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
 github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
 github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
 github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
+github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
 github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
@@ -1474,6 +1475,9 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
 github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
 github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
+github.com/rs/zerolog v1.30.0 h1:SymVODrcRsaRaSInD9yQtKbtWqwsfoPcRff/oRXLj4c=
+github.com/rs/zerolog v1.30.0/go.mod h1:/tk+P47gFdPXq4QYjvCmT5/Gsug2nagsFWBWhAiSi1w=
 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -1639,8 +1643,8 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.2
 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4=
 go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
 go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs=
-go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
-go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
+go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM=
+go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0=
 go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM=
 go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.3.0/go.mod h1:VpP4/RMn8bv8gNo9uK7/IMY4mtWLELsS+JIP0inH0h4=
 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.3.0/go.mod h1:hO1KLR7jcKaDDKDkvI9dP/FIhpmna5lkqPUQdEjFAM8=
@@ -1654,8 +1658,8 @@ go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi
 go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
 go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
 go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk=
-go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
-go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
+go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ=
+go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.opentelemetry.io/proto/otlp v0.11.0/go.mod h1:QpEjXPrNQzrFDZgoTo49dgHR9RYRSrg3NAKnUGl9YpQ=
 go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=

+ 11 - 1
kafka/config.go

@@ -5,7 +5,9 @@ import (
 )
 
 type Config struct {
-	Hosts []string `yaml:"hosts"`
+	Hosts   []string `yaml:"hosts"`
+	Group   string   `yaml:"group"`
+	Timeout int      `yaml:"timeout"`
 }
 
 func (c Config) Invalidate() error {
@@ -13,5 +15,13 @@ func (c Config) Invalidate() error {
 		return fmt.Errorf("at least one bootstrap server / host must be provided")
 	}
 
+	if c.Group == "" {
+		return fmt.Errorf("group name must not be an empty string")
+	}
+
+	if c.Timeout < 1 {
+		c.Timeout = 100
+	}
+
 	return nil
 }

+ 110 - 0
kafka/consumer.go

@@ -0,0 +1,110 @@
+package kafka
+
+import (
+	"errors"
+	"fmt"
+	lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+	"github.com/rs/zerolog/log"
+	"strings"
+	"sync"
+	"time"
+)
+
+var ErrNoMessage = errors.New("no message")
+
+type _consumer struct {
+	config  *Config
+	session *lib.Consumer
+}
+
+func NewConsumer(cfg *Config) (Consumer, error) {
+	var (
+		c = &_consumer{
+			config: cfg,
+		}
+		err error
+	)
+
+	if cfg == nil {
+		return nil, fmt.Errorf("config must be provided")
+	}
+
+	opts := &lib.ConfigMap{
+		"broker.address.family":         "v4",
+		"bootstrap.servers":             strings.Join(c.config.Hosts, ","),
+		"group.id":                      c.config.Group,
+		"partition.assignment.strategy": "cooperative-sticky",
+		"auto.offset.reset":             "earliest",
+		"log_level":                     0,
+	}
+
+	if c.session, err = lib.NewConsumer(opts); err != nil {
+		return nil, err
+	}
+
+	return c, nil
+}
+
+func (c *_consumer) Subscribe(topics []string, ch chan *lib.Message) error {
+	if topics == nil {
+		return fmt.Errorf("illegal arguments: at least one topic must be provided")
+	}
+
+	if err := c.session.SubscribeTopics(topics, rebalanceCallback); err != nil {
+		return err
+	}
+
+	for {
+		message, err := c.pollMessage()
+		if err != nil {
+			// silently wait for a next message
+			if errors.Is(err, ErrNoMessage) {
+				continue
+			}
+		}
+
+		if message != nil {
+			ch <- message
+		}
+	}
+}
+
+func (c *_consumer) String() string {
+	return "kafka:consumer"
+}
+
+func (c *_consumer) Close() error {
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		defer wg.Done()
+
+		if c.session != nil {
+			time.Sleep(time.Second)
+			_ = c.session.Close() //nolint:errcheck
+		}
+	}()
+
+	wg.Wait()
+	return nil
+}
+
+func (c *_consumer) pollMessage() (*lib.Message, error) {
+	ev := c.session.Poll(c.config.Timeout)
+	switch e := ev.(type) {
+	case *lib.Message:
+		return e, nil
+	case lib.Error:
+		return nil, e
+	default:
+		if e != nil {
+			log.Debug().
+				Str("service", "consumer").
+				Any("event", e).
+				Send()
+		}
+
+		return nil, ErrNoMessage
+	}
+}

+ 11 - 0
kafka/iconsumer.go

@@ -0,0 +1,11 @@
+package kafka
+
+import (
+	"git.beejay.kim/Craft/Api/service"
+	lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+)
+
+type Consumer interface {
+	service.Service
+	Subscribe([]string, chan *lib.Message) error
+}

+ 1 - 1
kafka/producer.go

@@ -13,7 +13,7 @@ type _producer struct {
 	session *lib.Producer
 }
 
-func New(cfg *Config) (Producer, error) {
+func NewProducer(cfg *Config) (Producer, error) {
 	var (
 		p = &_producer{
 			config: cfg,

+ 54 - 0
kafka/rebalance.go

@@ -0,0 +1,54 @@
+package kafka
+
+import (
+	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
+	"github.com/rs/zerolog/log"
+)
+
+// rebalanceCallback is called on each group rebalance to assign additional
+// partitions, or remove existing partitions, from the consumer's current
+// assignment.
+//
+// The application may use this optional callback to inspect the assignment,
+// alter the initial start offset (the .Offset field of each assigned partition),
+// and read/write offsets to commit to an alternative store outside of Kafka.
+func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
+	switch ev := event.(type) {
+	case kafka.AssignedPartitions:
+		log.Debug().
+			Str("service", "consumer").
+			Msgf("%s rebalance: %d new partition(s) assigned: %v",
+				c.GetRebalanceProtocol(),
+				len(ev.Partitions),
+				ev.Partitions)
+
+		// The application may update the start .Offset of each
+		// assigned partition and then call IncrementalAssign().
+		// Even though this example does not alter the offsets we
+		// provide the call to IncrementalAssign() as an example.
+		err := c.IncrementalAssign(ev.Partitions)
+		if err != nil {
+			panic(err)
+		}
+
+	case kafka.RevokedPartitions:
+		log.Debug().
+			Str("service", "consumer").
+			Msgf("%s rebalance: %d partition(s) revoked: %v",
+				c.GetRebalanceProtocol(),
+				len(ev.Partitions),
+				ev.Partitions)
+		if c.AssignmentLost() {
+			// Our consumer has been kicked out of the group and the
+			// entire assignment is thus lost.
+			log.Debug().
+				Str("service", "consumer").
+				Msg("Current assignment lost!")
+		}
+
+		// The client automatically calls IncrementalUnassign() unless
+		// the callback has already called that method.
+	}
+
+	return nil
+}