浏览代码

consumer

- fix rebalance strategy
Alexey Kim 1 年之前
父节点
当前提交
5db02a7b49
共有 1 个文件被更改,包括 11 次插入61 次删除
  1. 11 61
      consumer/kafka_consumer.go

+ 11 - 61
consumer/kafka_consumer.go

@@ -138,17 +138,17 @@ func (c *_consumer) poll() (*kafka.Message, error) {
 			}
 		}
 
-		// Handle manual commit since enable.auto.commit is unset.
-		if err = maybeCommit(c.session, e.TopicPartition); err != nil {
-			return nil, err
-		}
-
 		return e, nil
 	case kafka.Error:
 		log.Debug().
 			Str("service", "consumer").
 			Err(e).
 			Send()
+
+		if e.Code() == kafka.ErrAllBrokersDown {
+			c.state.Remove(flagSubscribed)
+		}
+
 		return nil, e
 	default:
 		if e != nil {
@@ -162,46 +162,10 @@ func (c *_consumer) poll() (*kafka.Message, error) {
 	}
 }
 
-// maybeCommit is called for each message we receive from a Kafka topic.
-// This method can be used to apply some arbitary logic/processing to the
-// offsets, write the offsets into some external storage, and finally, to
-// decide when we want to commit already-stored offsets into Kafka.
-func maybeCommit(c *kafka.Consumer, topicPartition kafka.TopicPartition) error {
-	// Commit the already-stored offsets to Kafka whenever the offset is divisible
-	// by 10, otherwise return early.
-	// This logic is completely arbitrary. We can use any other internal or
-	// external variables to decide when we commit the already-stored offsets.
-	if topicPartition.Offset%10 != 0 {
-		return nil
-	}
-
-	commitedOffsets, err := c.Commit()
-
-	// ErrNoOffset occurs when there are no stored offsets to commit. This
-	// can happen if we haven't stored anything since the last commit.
-	// While this will never happen for this example since we call this method
-	// per-message, and thus, always have something to commit, the error
-	// handling is illustrative of how to handle it in cases we call Commit()
-	// in another way, for example, every N seconds.
-	if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
-		return err
-	}
-
-	log.Debug().
-		Str("service", "consumer").
-		Msgf("commited offsets to Kafka: %v", commitedOffsets)
-	return nil
-}
-
 // rebalanceCallback is called on each group rebalance to assign additional
 // partitions, or remove existing partitions, from the consumer's current
 // assignment.
 //
-// A rebalance occurs when a consumer joins or leaves a consumer group, if it
-// changes the topic(s) it's subscribed to, or if there's a change in one of
-// the topics it's subscribed to, for example, the total number of partitions
-// increases.
-//
 // The application may use this optional callback to inspect the assignment,
 // alter the initial start offset (the .Offset field of each assigned partition),
 // and read/write offsets to commit to an alternative store outside of Kafka.
@@ -215,9 +179,10 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
 				len(ev.Partitions),
 				ev.Partitions)
 
-		err := c.Assign(ev.Partitions)
-		if err != nil {
-			return err
+		// The application may update the start .Offset of each
+		// assigned partition and then call IncrementalAssign().
+		if err := c.IncrementalAssign(ev.Partitions); err != nil {
+			panic(err)
 		}
 
 	case kafka.RevokedPartitions:
@@ -245,23 +210,8 @@ func rebalanceCallback(c *kafka.Consumer, event kafka.Event) error {
 				Msg("Assignment lost involuntarily, commit may fail")
 		}
 
-		// Since enable.auto.commit is unset, we need to commit offsets manually
-		// before the partition is revoked.
-		commitedOffsets, err := c.Commit()
-		if err != nil && err.(kafka.Error).Code() != kafka.ErrNoOffset {
-			log.Debug().
-				Str("service", "consumer").
-				Err(err).
-				Msg("failed to commit offsets")
-			return err
-		}
-
-		log.Debug().
-			Str("service", "consumer").
-			Msgf("commited offsets to Kafka: %v", commitedOffsets)
-
-		// Similar to Assign, client automatically calls Unassign() unless the
-		// callback has already called that method. Here, we don't call it.
+		// The client automatically calls IncrementalUnassign() unless
+		// the callback has already called that method.
 
 	default:
 		log.Debug().