KAFKA 2.2.0 bug https://issues.apache.org/jira/browse/KAFKA-8052

现象

  • Kafka producer 无法正确的发出任何消息一直抛出Timeout(由于我们业务设计上就把业务时间和mq完全分离了这里,不会导致业务不能正常进行,只会产生业务暂时的不一致性)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

java.util.concurrent.TimeoutException: Timeout after waiting for 5000 ms.
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
	at com.my.fnc.mq.FncRunner.apply(FncRunner.java:42)
	at io.goudai.starter.runner.zookeeper.AbstractMultipartRunner$1.doRun(AbstractMultipartRunner.java:76)
	at io.goudai.starter.runner.zookeeper.AbstractRunner.takeLeadership(AbstractRunner.java:94)
	at org.apache.curator.framework.recipes.leader.LeaderSelector$WrappedListener.takeLeadership(LeaderSelector.java:537)
	at org.apache.curator.framework.recipes.leader.LeaderSelector.doWork(LeaderSelector.java:399)
	at org.apache.curator.framework.recipes.leader.LeaderSelector.doWorkLoop(LeaderSelector.java:444)
	at org.apache.curator.framework.recipes.leader.LeaderSelector.access$100(LeaderSelector.java:64)
	at org.apache.curator.framework.recipes.leader.LeaderSelector$2.call(LeaderSelector.java:245)
	at org.apache.curator.framework.recipes.leader.LeaderSelector$2.call(LeaderSelector.java:239)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
  • Kafka Consumer 不能正确的消费任何消息
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:786)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1242)
	at io.goudai.starter.kafka.consumer.KafkaBeanPostProcessor.handleRecord(KafkaBeanPostProcessor.java:236)
	at io.goudai.starter.kafka.consumer.KafkaBeanPostProcessor.lambda$startConsumer$2(KafkaBeanPostProcessor.java:206)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

检查Kafka Log

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{"log":"[2018-10-21 06:21:35,780] INFO [Partition user-auth-confirm-failed-12 broker=1] Shrinking ISR from 1,5,4 to 1,4 (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.780359744Z"}
{"log":"[2018-10-21 06:21:35,790] INFO [Partition user-auth-confirm-failed-12 broker=1] Cached zkVersion [133] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.790651434Z"}
{"log":"[2018-10-21 06:21:35,790] INFO [Partition order-agent-offline-pay-submitted-10 broker=1] Shrinking ISR from 2,1,5 to 2,1 (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.790689985Z"}
{"log":"[2018-10-21 06:21:35,792] INFO [Partition order-agent-offline-pay-submitted-10 broker=1] Cached zkVersion [55] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.79359849Z"}
{"log":"[2018-10-21 06:21:35,793] INFO [Partition payment-fail-2 broker=1] Shrinking ISR from 1,5,4 to 1,4 (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.793623565Z"}
{"log":"[2018-10-21 06:21:35,795] INFO [Partition payment-fail-2 broker=1] Cached zkVersion [133] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.796206037Z"}
{"log":"[2018-10-21 06:21:35,795] INFO [Partition deposit-paid-26 broker=1] Shrinking ISR from 1,5,4 to 1,4 (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.796228348Z"}
{"log":"[2018-10-21 06:21:35,798] INFO [Partition deposit-paid-26 broker=1] Cached zkVersion [160] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.799150335Z"}
{"log":"[2018-10-21 06:21:35,798] INFO [Partition order-agent-stock-notify-success-20 broker=1] Shrinking ISR from 1,5,4 to 1,4 (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.799177669Z"}
{"log":"[2018-10-21 06:21:35,801] INFO [Partition order-agent-stock-notify-success-20 broker=1] Cached zkVersion [133] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)\r\n","stream":"stdout","time":"2018-10-20T22:21:35.801745244Z"}
  • 可以看出重复最多的日志是 Cached zkVersion xx not equal to that in zookeeper, skip updating ISR
  • 试着将 Cached zkVersion xx not equal to that in zookeeper, skip updating ISR 贴到google搜索
  • 运气真不错,找到了这个 issue https://issues.apache.org/jira/browse/KAFKA-2729
  • 看来蛮多人都遇到了这个问题 然后再去 stackoverflow.com 上搜一下,果然找到了它 https://stackoverflow.com/questions/46644764/kafka-cached-zkversion-not-equal-to-that-in-zookeeper-broker-not-recovering
  • stackoverfolw 上的老哥说到解决方案就是万能的重启。
  • 原问答是这样的 This issue is known and tracked in KAFKA-2729 but not solved till now. This happens as far as I know on networks with big delays due to max traffic or some short network outages in a small timeframe. The only solution (afaik) is to restart all brokers.

结论

  • 综上所结论 暂时的解决办法就是万能的重启kafka节点的broker。
  • 想要彻底解决在官方的issus中显示是在kafka1.1.0 中修复的,需要把所有的broker进行升级。

解决问题让业务恢复正常吧

  • 重启所有的broker
  • 由于我们的kafka consumer和producer 在业务实现上都有遇到Exception 自动重启的的机制。So 这里我只重启broker就完成了业务的正常运行了。