KafkaSpout没有收到来自Kafka的任何东西

我正在试图搭build一个Kafka-Storm“Hello World”系统。 我已经安装并运行了Kafka,当我向Kafka制作人发送数据时,我可以通过Kafka控制台消费者阅读。

我从“风暴入门”O'Reilly的书中拿出了第二章的例子,并修改它来使用KafkaSpout而不是常规的喷口。

当我运行应用程序时,数据已经在kafka中挂起,KafkaSpout的nextTuple没有得到任何消息 – 它进入,试图迭代协调器下的空pipe理器列表,然后退出。

我的环境是一个相当古老的Cloudera虚拟机,Storm 0.9和Kafka-Storm-0.9(最新),Kafka 2.9.2-0.7.0。

这是我如何定义SpoutConfig和拓扑:

String zookeepers = "localhost:2181"; SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"), "gtest", "/kafka", // zookeeper root path for offset storing "KafkaSpout"); spoutConfig.forceStartOffsetTime(-1); KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig); //Topology definition TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", kafkaSpout, 1); builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),1) .fieldsGrouping("word-normalizer", new Fields("word")); //Configuration Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(false); //Topology run conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); 

有人可以帮我找出为什么我没有收到任何东西?

谢谢,G.

如果你已经消费了这个消息,除非你的生产者产生新的消息,否则它不应该再被读取。 这是因为您的代码中使用了-1forceStartOffsetTime调用。

形成storm-contrib文档:

喷口中另一个非常有用的配置是能够强制喷口倒回到先前的偏移量。 您在spout config上执行forceStartOffsetTime,如下所示:

  spoutConfig.forceStartOffsetTime(-2); 

它将选择围绕该时间戳记录的最新偏移量开始消费。 您可以通过传入-1强制喷口总是从最新的偏移量开始,并且可以通过传入-2来强制它从最早的偏移量开始。

你是怎样生产的? 有一个片段是有用的。 你可以用-2替换-1,看看你是否收到任何东西,如果你的制作者没问题,那么你应该可以使用。

 SpoutConfig spoutConf = new SpoutConfig(...) spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 
 SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"), "gtest", // name of topic used by producer & consumer "/kafka", // zookeeper root path for offset storing "KafkaSpout"); 

您正在使用“gtest”主题接收数据。 确保你是由制作者从这个主题发送数据。

然后在螺栓上打印那个元组

 public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } 

它应该在kafka中打印待处理的数据。

我经历了一场风波和Kafka的融合。 这些都是快速发展和相对年轻的项目,所以很难得到工作实例来开始你的发展。

为了帮助其他开发人员(希望能让其他人提供可以使用的有用示例),我开始了一个github项目,以存放与Storm / Kafka(和Esper)开发相关的代码片段。

欢迎您在这里查看> https://github.com/buildlackey/cep

(点击storm + kafka目录下的示例程序,可以启动并运行)。