rebalance.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package kafka
  2. import (
  3. "github.com/confluentinc/confluent-kafka-go/v2/kafka"
  4. "github.com/rs/zerolog/log"
  5. )
  6. // rebalanceCallback is called on each group rebalance to assign additional
  7. // partitions, or remove existing partitions, from the consumer's current
  8. // assignment.
  9. //
  10. // The application may use this optional callback to inspect the assignment,
  11. // alter the initial start offset (the .Offset field of each assigned partition),
  12. // and read/write offsets to commit to an alternative store outside of Kafka.
  13. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
  14. switch ev := event.(type) {
  15. case kafka.AssignedPartitions:
  16. log.Debug().
  17. Str("service", "consumer").
  18. Msgf("%s rebalance: %d new partition(s) assigned: %v",
  19. c.GetRebalanceProtocol(),
  20. len(ev.Partitions),
  21. ev.Partitions)
  22. // The application may update the start .Offset of each
  23. // assigned partition and then call IncrementalAssign().
  24. // Even though this example does not alter the offsets we
  25. // provide the call to IncrementalAssign() as an example.
  26. err := c.IncrementalAssign(ev.Partitions)
  27. if err != nil {
  28. panic(err)
  29. }
  30. case kafka.RevokedPartitions:
  31. log.Debug().
  32. Str("service", "consumer").
  33. Msgf("%s rebalance: %d partition(s) revoked: %v",
  34. c.GetRebalanceProtocol(),
  35. len(ev.Partitions),
  36. ev.Partitions)
  37. if c.AssignmentLost() {
  38. // Our consumer has been kicked out of the group and the
  39. // entire assignment is thus lost.
  40. log.Debug().
  41. Str("service", "consumer").
  42. Msg("Current assignment lost!")
  43. }
  44. // The client automatically calls IncrementalUnassign() unless
  45. // the callback has already called that method.
  46. }
  47. return nil
  48. }