对于Kafka Streams,如果我们使用低层的处理器API,我们可以控制提交与否。 所以如果在我们的代码中发生问题,我们不想提交这个消息。 在这种情况下,卡夫卡将多次重新传递此消息,直到问题得到解决。
但是如何控制在使用更高级别的streamDSL API时是否提交消息呢?
资源:
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html
你的陈述并不完全正确。 你不能 “控制提交或不提交” – 至少不是直接的(既不在处理器API也不在DSL中)。 你只能使用ProcessorContext#commit()
来请求额外的提交。 因此,在对#commit()
Streams的调用尝试尽快提交之后,它不是立即提交。 此外,即使您从未调用#commit()
,Streams也会自动提交。 您可以通过Streams配置commit.interval.m
来控制Streams提交间隔(参考http://docs.confluent.io/current/streams/developer-guide.html#configuring-a-kafka-streams-application )
如果出现“问题”,则取决于问题的类型,如何回答问题:
Processor#process()
或KeyValueMapper#apply()
循环,直到问题解决,并且你可以成功地处理当前的消息可能会遇到超时,即异常,使用这种策略 – 比较消费者配置heartbeat.interval.ms
和0.10.1 session.timeout.ms
[KIP-62] ) StateStore
并稍后处理。 然而,很难做到正确,并且打破了一些Streams假设(例如处理顺序)。 不建议使用,如果使用,你必须非常小心的影响 如果有一个未被捕获的异常 , StreamThread
将会死亡并且不会发生任何提交(您可以注册一个异常处理程序来获得有关此通知的信息: http : //docs.confluent.io/current/streams/developer-guide.html#using-kafka如果你所有的StreamThread
死掉了,你将需要创建一个新的KafkaStreams
实例来重新启动你的应用程序。
在消息成功处理之前,您不能从用户代码返回,因为如果您返回,Streams会假定消息已成功处理(因此可能会提交相应的偏移量)。 关于第(3)点,将记录放入特殊的StateStore以供后期处理被认为是“成功”处理的记录。