123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- package kafka
- import (
- "github.com/confluentinc/confluent-kafka-go/v2/kafka"
- "github.com/rs/zerolog/log"
- )
- // rebalanceCallback is called on each group rebalance to assign additional
- // partitions, or remove existing partitions, from the consumer's current
- // assignment.
- //
- // The application may use this optional callback to inspect the assignment,
- // alter the initial start offset (the .Offset field of each assigned partition),
- // and read/write offsets to commit to an alternative store outside of Kafka.
- func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
- switch ev := event.(type) {
- case kafka.AssignedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d new partition(s) assigned: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- // The application may update the start .Offset of each
- // assigned partition and then call IncrementalAssign().
- // Even though this example does not alter the offsets we
- // provide the call to IncrementalAssign() as an example.
- err := c.IncrementalAssign(ev.Partitions)
- if err != nil {
- panic(err)
- }
- case kafka.RevokedPartitions:
- log.Debug().
- Str("service", "consumer").
- Msgf("%s rebalance: %d partition(s) revoked: %v",
- c.GetRebalanceProtocol(),
- len(ev.Partitions),
- ev.Partitions)
- if c.AssignmentLost() {
- // Our consumer has been kicked out of the group and the
- // entire assignment is thus lost.
- log.Debug().
- Str("service", "consumer").
- Msg("Current assignment lost!")
- }
- // The client automatically calls IncrementalUnassign() unless
- // the callback has already called that method.
- }
- return nil
- }
|