package kafka import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/rs/zerolog/log" ) // rebalanceCallback is called on each group rebalance to assign additional // partitions, or remove existing partitions, from the consumer's current // assignment. // // The application may use this optional callback to inspect the assignment, // alter the initial start offset (the .Offset field of each assigned partition), // and read/write offsets to commit to an alternative store outside of Kafka. func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { case kafka.AssignedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d new partition(s) assigned: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) // The application may update the start .Offset of each // assigned partition and then call IncrementalAssign(). // Even though this example does not alter the offsets we // provide the call to IncrementalAssign() as an example. err := c.IncrementalAssign(ev.Partitions) if err != nil { panic(err) } case kafka.RevokedPartitions: log.Debug(). Str("service", "consumer"). Msgf("%s rebalance: %d partition(s) revoked: %v", c.GetRebalanceProtocol(), len(ev.Partitions), ev.Partitions) if c.AssignmentLost() { // Our consumer has been kicked out of the group and the // entire assignment is thus lost. log.Debug(). Str("service", "consumer"). Msg("Current assignment lost!") } // The client automatically calls IncrementalUnassign() unless // the callback has already called that method. } return nil }