如何处理错误,并在使用Kafka Streams DSL时不要提交

对于Kafka Streams,如果我们使用低层的处理器API,我们可以控制提交与否。 所以如果在我们的代码中发生问题,我们不想提交这个消息​​。 在这种情况下,卡夫卡将多次重新传递此消息,直到问题得到解决。

但是如何控制在使用更高级别的streamDSL API时是否提交消息呢?

资源:

http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html

你的陈述并不完全正确。 你不能 “控制提交或不提交” – 至少不是直接的(既不在处理器API也不在DSL中)。 你只能使用ProcessorContext#commit()来请求额外的提交。 因此,在对#commit() Streams的调用尝试尽快提交之后,它不是立即提交。 此外,即使您从未调用#commit() ,Streams也会自动提交。 您可以通过Streams配置commit.interval.m来控制Streams提交间隔(参考http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application

如果出现“问题”,则取决于问题的类型,如何回答问题:

  • 如果你发现一个问题,你不能从中恢复,你只能抛出一个异常,并“停止世界”(参见下文)。
  • 如果你有一个可恢复的错误,你需要在你自己的代码中(例如,在Processor#process()KeyValueMapper#apply()循环,直到问题解决,并且你可以成功地处理当前的消息可能会遇到超时,即异常,使用这种策略 – 比较消费者配置heartbeat.interval.ms和0.10.1 session.timeout.ms [KIP-62] )
  • 另一种方法是将现在无法处理的记录放入StateStore并稍后处理。 然而,很难做到正确,并且打破了一些Streams假设(例如处理顺序)。 不建议使用,如果使用,你必须非常小心的影响

如果有一个未被捕获的异常StreamThread将会死亡并且不会发生任何提交(您可以注册一个异常处理程序来获得有关此通知的信息: http : //docs.confluent.io/current/streams/developer-guide.html#using-kafka如果你所有的StreamThread死掉了,你将需要创建一个新的KafkaStreams实例来重新启动你的应用程序。

在消息成功处理之前,您不能从用户代码返回,因为如果您返回,Streams会假定消息已成功处理(因此可能会提交相应的偏移量)。 关于第(3)点,将记录放入特殊的StateStore以供后期处理被认为是“成功”处理的记录。