|
@@ -0,0 +1,83 @@
|
|
|
|
+package kafka
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "fmt"
|
|
|
|
+ lib "github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
|
|
|
+ "strings"
|
|
|
|
+ "sync"
|
|
|
|
+ "time"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+type _producer struct {
|
|
|
|
+ config *Config
|
|
|
|
+ session *lib.Producer
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func New(cfg *Config) (Producer, error) {
|
|
|
|
+ var (
|
|
|
|
+ p = &_producer{
|
|
|
|
+ config: cfg,
|
|
|
|
+ }
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if cfg == nil {
|
|
|
|
+ return nil, fmt.Errorf("config must be provided")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ opts := &lib.ConfigMap{
|
|
|
|
+ "bootstrap.servers": strings.Join(p.config.Hosts, ","),
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if p.session, err = lib.NewProducer(opts); err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return p, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *_producer) Produce(message *lib.Message) error {
|
|
|
|
+ var (
|
|
|
|
+ delivery = make(chan lib.Event)
|
|
|
|
+ err error
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ if message.TopicPartition.Topic == nil {
|
|
|
|
+ return fmt.Errorf("illegal message: at least one topic must be provided for topic partition")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err = p.session.Produce(message, delivery); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ e := <-delivery
|
|
|
|
+ m := e.(*lib.Message)
|
|
|
|
+
|
|
|
|
+ if m.TopicPartition.Error != nil {
|
|
|
|
+ return fmt.Errorf("delivery failed: %v", m.TopicPartition.Error)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ close(delivery)
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *_producer) String() string {
|
|
|
|
+ return "kafka:producer"
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (p *_producer) Close() error {
|
|
|
|
+ var wg sync.WaitGroup
|
|
|
|
+ wg.Add(1)
|
|
|
|
+
|
|
|
|
+ go func() {
|
|
|
|
+ defer wg.Done()
|
|
|
|
+
|
|
|
|
+ if p.session != nil {
|
|
|
|
+ time.Sleep(time.Second)
|
|
|
|
+ p.session.Close() //nolint:errcheck
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+
|
|
|
|
+ wg.Wait()
|
|
|
|
+ return nil
|
|
|
|
+}
|