Alexey Kim 1 жил өмнө
parent
commit
7bb11c40e5

+ 1 - 1
consumer/kafka_consumer.go → consumer/consumer.go

@@ -25,7 +25,7 @@ type _consumer struct {
 	session  *kafka.Consumer
 }
 
-func NewKafkaConsumer(cfg *Config) (service.Consumer, error) {
+func New(cfg *Config) (service.Consumer, error) {
 	var (
 		c = &_consumer{
 			config: cfg,

+ 15 - 0
producer/config.go

@@ -0,0 +1,15 @@
+package consumer
+
+import "fmt"
+
+type Config struct {
+	Hosts []string `yaml:"hosts"`
+}
+
+func (c Config) Invalidate() error {
+	if len(c.Hosts) < 1 {
+		return fmt.Errorf("at least one bootstrap server / host must be provided")
+	}
+
+	return nil
+}

+ 104 - 0
producer/producer.go

@@ -0,0 +1,104 @@
+package consumer
+
+import (
+	"fmt"
+	"git.beejay.kim/tool/service"
+	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
+	"github.com/google/uuid"
+	"github.com/rs/zerolog/log"
+	"strings"
+	"sync"
+	"time"
+)
+
+type _producer struct {
+	config  *Config
+	session *kafka.Producer
+}
+
+func New(cfg *Config) (service.Producer, error) {
+	var (
+		p   = new(_producer)
+		err error
+	)
+
+	if cfg == nil {
+		return nil, fmt.Errorf("config must be provided")
+	}
+
+	p.config = cfg
+	opts := &kafka.ConfigMap{
+		"bootstrap.servers": strings.Join(p.config.Hosts, ","),
+	}
+
+	if p.session, err = kafka.NewProducer(opts); err != nil {
+		return nil, err
+	}
+
+	go func() {
+		for e := range p.session.Events() {
+			switch ev := e.(type) {
+			case kafka.Error:
+				// Generic client instance-level errors, such as
+				// broker connection failures, authentication issues, etc.
+				//
+				// These errors should generally be considered informational
+				// as the underlying client will automatically try to
+				// recover from any errors encountered, the application
+				// does not need to take action on them.
+				log.Debug().Str("service", "producer").Err(ev).Send()
+			default:
+				log.Debug().Str("service", "producer").Msgf("ignored event: %s", ev)
+			}
+		}
+	}()
+
+	return p, nil
+}
+
+func (p *_producer) ID() uuid.UUID {
+	return uuid.NewSHA1(uuid.NameSpaceDNS, []byte("producer.kafka"))
+}
+
+func (p *_producer) Close() error {
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go func() {
+		defer wg.Done()
+
+		if p.session != nil {
+			time.Sleep(time.Second)
+			p.session.Close() //nolint:errcheck
+		}
+	}()
+
+	wg.Wait()
+	return nil
+}
+
+func (p *_producer) Produce(message *kafka.Message) error {
+	var (
+		delivery = make(chan kafka.Event)
+		err      error
+	)
+
+	defer close(delivery)
+
+	if message.TopicPartition.Topic == nil {
+		return fmt.Errorf("illegal message: at least one topic name must be provided")
+	}
+
+	if err = p.session.Produce(message, delivery); err != nil {
+		return err
+	}
+
+	e := <-delivery
+	m := e.(*kafka.Message)
+
+	if m.TopicPartition.Error != nil {
+		return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
+	}
+
+	return nil
+}