在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
- 直接使用Storm的Topology对数据进行实时分析处理
- 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理
实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:
- zookeeper-3.4.5.tar.gz
- kafka_2.9.2-0.8.1.1.tgz
- apache-storm-0.9.2-incubating.tar.gz
- hadoop-2.2.0.tar.gz
程序配置运行所基于的操作系统为CentOS 5.11。
Kafka安装配置
我们使用3台机器搭建Kafka集群:
192.168.4.142 h1 192.168.4.143 h2 192.168.4.144 h3
在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:
cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz tar xvzf kafka_2.9.2-0.8.1.1.tgz ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /usr/local/zookeeper bin/zkCli.sh
在ZooKeeper执行如下命令创建chroot路径:
create /kafka ''
这样,每次连接Kafka集群的时候(使用--zookeeper
选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/ scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/ ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1 # 在h1修改 broker.id=2 # 在h2修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1 Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
上面Leader、Replicas、Isr的含义如下:
Partition: 分区 Leader : 负责读写指定分区的节点 Replicas : 复制该分区log的节点列表 Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
Storm安装配置
Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
192.168.4.142 h1 192.168.4.143 h2 192.168.4.144 h3
首先,在h1节点上,执行如下命令安装:
cd /usr/local/ wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz tar xvzf apache-storm-0.9.2-incubating.tar.gz ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
然后,修改配置文件conf/storm.yaml,内容如下所示:
storm.zookeeper.servers: - "h1" - "h2" - "h3" storm.zookeeper.port: 2181 # nimbus.host: "h1" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/tmp/storm"
将配置好的安装文件,分发到其他节点上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/ scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/ ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
bin/storm nimbus & bin/storm supervisor &
为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
bin/storm ui &
这样可以通过访问http://h2:8080/来查看Topology的运行状况。
整合Kafka+Storm
消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.2-incubating</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency>
下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyKafkaTopology { public static class KafkaWordSplitter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); LOG.info("RECV[kafka -> splitter] " + line); String[] words = line.split("\\s+"); for(String word : words) { LOG.info("EMIT[splitter -> counter] " + word); collector.emit(input, new Values(word, 1)); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static class WordCounter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(WordCounter.class); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; private Map<String, AtomicInteger> counterMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counterMap = new HashMap<String, AtomicInteger>(); } @Override public void execute(Tuple input) { String word = input.getString(0); int count = input.getInteger(1); LOG.info("RECV[splitter -> counter] " + word + " : " + count); AtomicInteger ai = this.counterMap.get(word); if(ai == null) { ai = new AtomicInteger(); this.counterMap.put(word, ai); } ai.addAndGet(count); collector.ack(input); LOG.info("CHECK statistics map: " + this.counterMap); } @Override public void cleanup() { LOG.info("The final result:"); Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator(); while(iter.hasNext()) { Entry<String, AtomicInteger> entry = iter.next(); LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { String zks = "h1:2181,h2:2181,h3:2181"; String topic = "my-replicated-topic5"; String zkRoot = "/storm"; // default zookeeper root configuration for storm String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = false; spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"}); spoutConf.zkPort = 2181; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5 builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader"); builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word")); Config conf = new Config(); String name = MyKafkaTopology.class.getSimpleName(); if (args != null && args.length > 0) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
然后,就可以提交我们开发的Topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。
整合Storm+HDFS
Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class StormToHDFSTopology { public static class EventSpout extends BaseRichSpout { private static final Log LOG = LogFactory.getLog(EventSpout.class); private static final long serialVersionUID = 886149197481637894L; private SpoutOutputCollector collector; private Random rand; private String[] records; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; rand = new Random(); records = new String[] { "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35", "10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02", "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" }; } @Override public void nextTuple() { Utils.sleep(1000); DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); Date d = new Date(System.currentTimeMillis()); String minute = df.format(d); String record = records[rand.nextInt(records.length)]; LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record); collector.emit(new Values(minute, record)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("minute", "record")); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter(" : "); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // rotate files FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/storm/").withPrefix("app_").withExtension(".log"); HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://h1:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("event-spout", new EventSpout(), 3); builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute")); Config conf = new Config(); String name = StormToHDFSTopology.class.getSimpleName(); if (args != null && args.length > 0) { conf.put(Config.NIMBUS_HOST, args[0]); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
整合Kafka+Storm+HDFS
上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples; import java.util.Arrays; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class DistributeWordTopology { public static class KafkaWordToUpperCase extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class); private static final long serialVersionUID = -5207232012035109026L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0).trim(); LOG.info("RECV[kafka -> splitter] " + line); if(!line.isEmpty()) { String upperLine = line.toUpperCase(); LOG.info("EMIT[splitter -> counter] " + upperLine); collector.emit(input, new Values(upperLine, upperLine.length())); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line", "len")); } } public static class RealtimeBolt extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class); private static final long serialVersionUID = -4115132557403913367L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0).trim(); LOG.info("REALTIME: " + line); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { // Configure Kafka String zks = "h1:2181,h2:2181,h3:2181"; String topic = "my-replicated-topic5"; String zkRoot = "/storm"; // default zookeeper root configuration for storm String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.forceFromStart = false; spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"}); spoutConf.zkPort = 2181; // Configure HDFS bolt RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://h1:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); // configure & build topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader"); builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper"); builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper"); // submit topology Config conf = new Config(); String name = DistributeWordTopology.class.getSimpleName(); if (args != null && args.length > 0) { String nimbus = args[0]; conf.put(Config.NIMBUS_HOST, nimbus); conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } } }
上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。
参考链接
- http://kafka.apache.org/
- http://kafka.apache.org/documentation.html
- https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
- http://storm.apache.org/
- http://storm.apache.org/documentation/Tutorial.html
- http://storm.apache.org/documentation/FAQ.html
- https://github.com/ptgoetz/storm-hdfs
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
请问你有弄过flume+kafka+storm+hdfs这一套完整的日志分析系统吗?有没有相关的中文资料分享一下的
是的,目前我们正在使用这套系统做日志离线收集分析,另外,从Storm这里还做了分发,对日志进行实时分析。
你们数据量有多大,如果需要写hdfs多个分区,小文件问题怎么解决
难道各个之间的整合都得自己开发么?可是我不是搞开发的,java语言我也不懂,这样的话岂不是无法实现这样的整合了吗?或者留下你的联系方式,跟你请教一下吧!
如果你不是搞开发的,那这一套对你来说就没有太大用处了。我想你就是想实时收集日志数据,最终存储到HDFS吧,那么就没有必要搞这一套了,可以直接使用Flume+HDFS这两个就能搞定,不需要编写任何代码,只需配置即可。
请问flume+hdfs 只能实现日志收集的功能吗,现在开发不做这块,如果单从运维层面出发,又要做日志收集,又要做日志分析请问那种架构比较合适
关键看你怎么使用,结合你的业务特点,有一些还是很适合的。Flume是一个日志收集工具,它支持直接收集日志文件(如指定日志生成目录),也可以自定义Flume Agent的source组件使Flume变成一个服务(如启动一个Socket服务)。HDFS就是用来存储数据,你要是想分析数据,完全可以依赖Hadoop平台的计算能力去处理(使用Python等编写MapReduce程序)。从运维的层面来实现,可以使用Flume+HDFS+Hive,将日志收集过来写入HDFS,然后导入Hive表进行分析,我想类SQL语句写起来,应该比直接编写难度较大的程序代码要容易多了吧。
这个yunwei还是可以玩玩ELK的
ELK是什么?
数据分析方案
楼主,您能不能把您上面的程序包发我一下,特别是那个KafkaSpout怎么处理的万分感谢!
这个类是Storm发行包的external中有的,可以查看这个Module:storm-kafka,或者直接在Maven配置pom.xml中增加该依赖。
感谢楼主提供的资料,已参照部署到集群中。但要注意一个问题:
配置kafka时,如果使用zookeeper create /kafka创建了节点,kafka与storm集成时new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然会报java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。
storm-kafka插件默认kafka的 zk_path如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;
对,你说的没错。安装Kafka的时候如果不覆盖默认的zk路径,Kafka所使用的相关路径都会在zk的根路径下面。
哥们,能详细说说你这个问题吗?我好像遇到类似问题了。
博主,这套东西可以实现从HDFS上读取数据到STORM中实时处理么
可以啊,开发一个自己的Spout,其中包含从HDFS中读取数据的逻辑。
求这个逻辑实现,军哥
您好,博主
broker.id=1 # 在h1修改
broker.id=2 # 在h2修改
哪儿h3上是不是也要改一下,还有我的zookeeper是利用cdh,用yum的方式安装的,可以吗?
curl -LO http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
sudo yum localinstall cloudera-cdh-5-0.x86_64.rpm -y
sudo yum clean all -y
yum repolist
sudo rpm –import http://archive.cloudera.com/cdh5/redhat/5/x86_64/cdh/RPM-GPG-KEY-cloudera
#3.Install Zookeeper
yum install zookeeper* -y
cat >/etc/zookeeper/conf/zoo.cfg <<EOF
maxClientCnxns=50
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=h1:2888:3888
server.2=h2:2888:3888
server.3=h3:2888:3888
EOF
Kafka集群你有几个broker,就需要配置几个不同的id。
ZooKeeper和你使用什么方式安装没关系,不要把ZooKeeper里面的myid和Kafka的broker id搞混淆了。
楼主,您这一套用于生产环境了吗?求教我要实现用户行为分析,推荐系统的话Kafka+Storm+HDFS合适吗?新手,感谢赐教
是的。建议使用比较新的版本,Storm最新稳定版是0.9.5,老的版本有些BUG。
多谢楼主。还望多多指教。先行谢过
生产者产生消息,消费者没有消息输出。是什么原因?麻烦给点建议吧。
不知道楼主的storm与HDFS的集成有没有试过如果hadoop做了HA,是否还可以指定一个namenode节点
这个倒是没试过。不知道最新版的storm-hdfs有没有支持配置多个NN地址,如果没有的话,可以自己重写HdfsBolt这个的实现逻辑,如果发现Active NN与Standby NN发生切换,在HdfsBolt里面进行切换处理。
恕我冒昧的说一下。。。。。我按你的思路写了一个差不多的程序。。。。结果数据。。。有重复。。。就是一个把数据从kafka里取出来再写入本地文件里。。我存到10000条数据到kafka,写到文件里却是10648条。。。。发现有648条重复数据。。。不知是什么问题。。
我猜,可能是这个原因,Storm在写入HDFS(NN1)时已经成功,这时NameNode NN1挂掉,切换到NameNode NN2,但是此时还没有给HdfsBolt的前一个组件发送ACK,所以这种情况下Storm会认为那天记录没有发送成功,记录就会重新emit,重新emit后,数据再次写入HDFS(NN2),这样,一条记录重复写了2遍。
如果你不使用Storm Trident来保证(batch transaction),这种情况下就是存在重复的问题,但是基本它不会丢失数据,这就存在一条记录至少被发送一次(可能是多次)。你要有个应对这种情况的方案,比如识别记录唯一性,进行去重。
我是刚刚开始学strom,为什么我下载maven的依赖都下不来呢
有些依赖可能需要翻墙才能下载下来,你可以找个翻墙软件试试。如果不是这个问题,尝试使用Maven多构建几次,可能会构建成功。
拜读楼主好文!我想请问楼主,有没有做过数据流从storm流向kafka的呢?我的想法是数据源在storm上做一些数据的预处理,补空值或者加字段之类,然后输出到kafka,再由kafka流向另一个处理系统,所以请楼主指点,例子或者相关文档。谢谢!
这个就很容易了,你可以在Storm程序中,使用Kafka Producer将处理后的数据,再写回到Kafka的另一个topic中,其实就是简单地使用Kafka Producer API而已。
好的,谢谢楼主指导!
楼主有QQ吗,我现在正在开发一套后台日志 收集+分析系统,也是准备用 zk+kafka+storm, 但是不熟悉这套,请楼主指教一下 QQ 442583890
楼主 我按着你的粘过来 后 报这个错误java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
curator.jar找不到。你有遇到过这个吗?
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
估计是String zkRoot = “/storm”; // default zookeeper root configuration for storm
和我的不一样。你的注释没看懂,这是个什么?
1、你用的是哪个版本的Storm?这个问题应该是Storm的external库的依赖包缺少,你查看一下storm-kafka这个external库,它应该依赖了curator这个开源库。
2、Storm依赖Zookeeper,他需要保存一些元数据,需要在Zookeeper中创建一个根path,就是你安装Storm的时候配置中指定的,不指定的话默认是/storm,可以连接到Zookeeper上查看该path。
真的是storm版本的问题。谢谢楼主
请问楼主,要是我的数据源是其他小组通过指定端口发送过来的,有没有什么办法,直接配置一下就可以存到kafka队列中去?不然是不是还得在producer里面再去端口取数据,然后再存到队列中去?这样是不是有点麻烦?有没什么简单的办法?
用flume就可以实现了
按照楼主的教程配置后,worker日志中报错…
2015-10-29T09:13:45.136+0800 o.a.z.ClientCnxn [INFO] Socket connection established to VM195/172.19.5.231:2181, initiating session
2015-10-29T09:13:45.153+0800 o.a.z.ClientCnxn [INFO] Session establishment complete on server VM195/172.19.5.231:2181, sessionid = 0x150a2bacdd203e1, negotiated timeout = 20000
2015-10-29T09:13:45.162+0800 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2015-10-29T09:13:45.208+0800 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
2015-10-29T09:13:45.211+0800 s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=localhost:9092, partition=0}]
2015-10-29T09:13:45.211+0800 s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: []
2015-10-29T09:13:45.211+0800 s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=localhost:9092, partition=0}]
2015-10-29T09:13:45.750+0800 s.k.PartitionManager [INFO] Read partition information from: /storm/word/partition_0 –> null
2015-10-29T09:13:45.842+0800 k.c.SimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2015-10-29T09:13:45.852+0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.8.2.2.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.PartitionManager.(PartitionManager.java:83) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.5.jar:0.9.5]
… 6 common frames omitted
2015-10-29T09:13:45.853+0800 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.8.2.2.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.PartitionManager.(PartitionManager.java:83) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.5.jar:0.9.5]
… 6 common frames omitted
2015-10-29T09:13:45.904+0800 b.s.util [ERROR] Halting process: (“Worker died”)
java.lang.RuntimeException: (“Worker died”)
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__7028$fn__7029.invoke(worker.clj:497) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$mk_executor_data$fn__6480$fn__6481.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
2015-10-29T09:13:45.909+0800 b.s.d.worker [INFO] Shutting down worker MyKafkaTopology-2-1446081174 618ce090-046e-43fb-bdd7-a26e98eb57ba 6700
我现在也遇到这个问题了,初步百度到时因为kafka服务端配置了host.name=localhost 。不知道你后来是怎么解决的
看看你的Kafka是不是和Storm在一个节点上,如果不是,Storm里面Kafka Spout读local怎么可能读到Kafka里面的Topic呢
您好,没太明白您这句话的意思。能否解答一下。Kafka和Storm在一个节点上指的是zookeeper的节点么?需要怎么查看呢。
哦,Kafka如果配置的多节点的集群,配置中host.name肯定是要改成实际的IP或主机名,然后在Storm中读取Kakfa数据时指定正确的ZooKeeper地址信息(参考文中配置的ZK的信息——Host:Port列表),ZooKeeper是要知道Kafka Broker实际地址的(而非Kafka默认的localhost)。(PS:是我上面回复产生误解了,忽略掉,以这个为准吧)
我的kafka集群命令行可以读到数据,而且在集群外的kafka节点也可以消费集群中的数据。现在代码报如下错误。能帮看一下么。或者我能加下您的联系方式么?困扰已久,望大神帮忙。我的qq 867015234.
64669 [Thread-8-kafka-reader] ERROR backtype.storm.util – Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedByInterruptException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_79]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681) ~[na:1.7.0_79]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.9.2-0.8.1.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.PartitionManager.(PartitionManager.java:82) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 6 common frames omitted
64669 [Thread-8-kafka-reader] ERROR backtype.storm.daemon.executor –
java.lang.RuntimeException: java.nio.channels.ClosedByInterruptException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_79]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681) ~[na:1.7.0_79]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.9.2-0.8.1.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.PartitionManager.(PartitionManager.java:82) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 6 common frames omitted
64671 [Thread-8-kafka-reader] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory – Thread Thread[Thread-8-kafka-reader,5,main] died
java.lang.RuntimeException: java.lang.InterruptedException
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$exists_node_QMARK_$fn__1658.invoke(zookeeper.clj:102) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.cluster$mk_distributed_cluster_state$reify__1866.mkdirs(cluster.clj:109) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.cluster$mk_storm_cluster_state$reify__2285.report_error(cluster.clj:368) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$throttled_report_error_fn$fn__3132.invoke(executor.clj:178) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$mk_executor_data$fn__3185$fn__3186.invoke(executor.clj:237) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:441) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.7.0_79]
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.7.0_79]
at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_79]
at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1309) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1036) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) ~[curator-framework-2.4.0.jar:na]
at backtype.storm.zookeeper$exists_node_QMARK_$fn__1658.invoke(zookeeper.clj:101) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
… 9 common frames omitted
原因分析:程序从zookeeper上获取topic上的分区信息中的broker的host信息为localhost:9092
由于测试机器和zookeeper集群不在一个局域网,不能解析主机名为localhost的信息,导致异常。
解决方法:
1 修改测试机器上的hosts文件,将DP映射到具体的ip地址
2 或者将执行程序放到和zookeeper集群一个网段的机器上执行。
非常详细的好文
您好,我在您的教程“安装配置storm”中,storm ui显示错误如下:
org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
at backtype.storm.thrift$nimbus_client_and_conn.invoke(thrift.clj:75)
at backtype.storm.ui.core$cluster_summary.invoke(core.clj:455)
at backtype.storm.ui.core$fn__8223.invoke(core.clj:789)
at compojure.core$make_route$fn__3365.invoke(core.clj:93)
at compojure.core$if_route$fn__3353.invoke(core.clj:39)
at compojure.core$if_method$fn__3346.invoke(core.clj:24)
at compojure.core$routing$fn__3371.invoke(core.clj:106)
at clojure.core$some.invoke(core.clj:2443)
at compojure.core$routing.doInvoke(core.clj:106)
at clojure.lang.RestFn.applyTo(RestFn.java:139)
at clojure.core$apply.invoke(core.clj:619)
我的storm.yaml如下:
storm.zookeeper.servers:
– “1.1.1.1″
– “2.2.2.2″
– “3.3.3.3″
– “4.4.4.4″
storm.zookeeper.port: 2333
#
nimbus.host: “2.2.2.2″
supervisor.slots.ports:
– 6700
– 6701
– 6702
– 6703
– 6704
storm.local.dir: “/tmp/storm”
请问怎么回事呢?如果信息提供不完善的话,请您告知一下~
查看一下启动Storm的各个进程的日志吧:nimbus/ui,貌似Storm集群没有启动成功。
对的,nimbus log有这个:
2015-11-16 02:04:33 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=202.114.10.131:2333,202.114.10.134:2333,211.69.198.234:2333,211.69.198.247:2333 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@7296c1fc
2015-11-16 02:04:33 o.a.z.ClientCnxn [INFO] Opening socket connection to server 211.69.198.234/211.69.198.234:2333. Will not attempt to authenticate using SASL (unknown error)
2015-11-16 02:04:33 o.a.z.ClientCnxn [WARN] Session 0×0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_71]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_71]
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
2015-11-16 02:04:41 o.a.z.ClientCnxn [INFO] Opening socket connection to server 202.114.10.131/202.114.10.131:2333. Will not attempt to authenticate using SASL (unknown error)
2015-11-16 02:04:41 o.a.z.ClientCnxn [WARN] Session 0×0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
然后一直Connection refused。。。
也就是说,reconnect接下来就是connection refused
ZooKeeper的地址不是随便配置的,一般ZooKeeper集群都是奇数个节点,将storm.zookeeper.servers参数配置成有效的ZooKeeper集群地址(ZooKeeper集群节点上要把防火墙关掉)。
解决,果然是ZooKeeper的问题。。谢谢您!
博主,
org.apache.storm.hdfs.bolt.HdfsBolt
在哪个jar中
比如,使用0.9.3版本的storm,可以看到apache-storm-0.9.3\external\storm-hdfs\storm-hdfs-0.9.3.jar这个文件,就在这里的。
博主,我用storm集群从kafka读取数据,处理后写到hdfs。为什么storm运行一段时间大概半小时就变成脑残了,重新又写一次。不会是因为重复读取kafka数据的缘故吧?我的kafka里面有几万条数据,处理一次大概几分钟就处理完了。
根据你的描述,任何一个环节都有可能有问题,无法定位具体原因。
博主,请问你日志是怎么进去消息队列的。
自己程序中写入的还是,把原先生成的log文件中的内容收集进去的?
用Flume的话,1.6.0+版本自带一个Kafka Sink,只需要配置即可,其实写入kafka,就是Producer做的事情,如果你想自己写,直接熟悉一下kafka的Producer使用即可。
楼主您好!你的這一套资料、源码及文档还在吗?可不可以给我发一份呢?想学学!
代码我基本都贴出来了,其他的也就是你配置一下环境就可以了。
博主,请教你一个问题。数据采集的解析工作适合放在flume端还是放在storm端比较合适?
如果能够保证解析的逻辑通用,并且后续几乎不会随着实际业务需求变化,可以考虑将简单的解析逻辑在flume端实现。更一般来说,你最好把Flume就看成一个收集日志的工具,具体包含业务逻辑处理的内容都放到外部,比如storm中。
看了您的文章,思路得到了新的提高,现在想把hadoop里的数据推送到移动端或邮件中,kafka有这样的功能么?
或者推送到微信中等,对于微信订阅推送也还没有了解呢?
完全可以的,不过实现方式和使用其他一些MQ可能有很大不同。要知道,从Kafka消费某个Topic的时候,是PULL模式消费,需要自己实现:从Kafka拉取数据并主动推送到手机端或邮件中。
后来怎么解决的
博主,您好,
想问一下,能不能用kafka直接写入HDFS,不经过storm这一环节呢?
可以的,可以参考下LinkedIn的Camus,现在已经被Gobblin取代了,参考链接:https://github.com/linkedin/camus
你好,我遇到这么一个错误:java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8434 [Thread-17-storm-hdfs] INFO backtype.storm.daemon.executor – Preparing bolt storm-hdfs:(6)
8435 [Thread-15-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8435 [Thread-17-storm-hdfs] ERROR backtype.storm.util – Async loop died!
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8436 [Thread-17-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8443 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×51 zxid:0×22 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955 Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955
8445 [Thread-19-storm-hdfs] INFO backtype.storm.daemon.executor – Preparing bolt storm-hdfs:(7)
8447 [Thread-19-storm-hdfs] ERROR backtype.storm.util – Async loop died!
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8447 [Thread-19-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8455 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×55 zxid:0×24 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs
8456 [Thread-21-word-splitter] INFO backtype.storm.daemon.executor – Preparing bolt word-splitter:(8)
8460 [Thread-21-word-splitter] INFO backtype.storm.daemon.executor – Prepared bolt word-splitter:(8)
8461 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×56 zxid:0×25 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs
8490 [Thread-17-storm-hdfs] ERROR backtype.storm.util – Halting process: (“Worker died”)
java.lang.RuntimeException: (“Worker died”)
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__7024$fn__7025.invoke(worker.clj:493) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$mk_executor_data$fn__6480$fn__6481.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8490 [Thread-15-storm-hdfs] ERROR backtype.storm.util – Halting process: (“Worker died”)
博主你好,请问对于socket连接接收的实时数据,以实时存储为目的,可以用kafka+strom+hdfs的框架吗?
可以啊,通过Socket Server接收数据,推进Kafka里面,后面的就直接进入Storm处理,最后存储到HDFS。
HDFS应该是对一次写,多次读比较擅长,那么Strom实时存储数据到HDFS合理吗?Strom往HDFS存的方式是来一个存一个还是mini的批量呢?或者两种方式都可以的话是怎样设置选择其中一种呢?
请教个问题,我想接别人的数据所以自己部署的storm,zookeeper和别人的kafka、zookeeper不在同一个集群,比如:我的storm在cm1,cm2,cm3,cm4,cm5,zookeeper在cm1,cm2,cm3。而别人的kafka和zookeeper在sd-jp01,sd-jp02,sd-jp03 部分代码如下:
, String zks=”sd-jp01:2181,sd-jp02:2181,sd-jp03:2181″;
String topic=”hydra.conversions”;
String zkRoot=”/kafka”;
String id=”word”;
BrokerHosts brokerHosts=new ZkHosts(zks);
SpoutConfig spoutConf=new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme=new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers=Arrays.asList(new String[]{“cm1″,”cm2″,”cm3″});
spoutConf.zkPort=2181;
其中zkRoot是kafka在zookeeper下的路径,但是为什么有这个错误:ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/hydra.conversions/partitions at storm.kafka.???请指点一下谢谢,或者加我qq:937038088
这种错误,很显然你是在访问对方Kafka的默认ZooKeeper路径,所以可以确定对方Kafka集群安装时所指定的不是默认路径。
代码中zkRoot指的是Storm在ZooKeeper中存储元数据的路径,也就是你自己搭建的ZooKeeper集群中的路径。
在集群sd-jp01–sd-jp03中kafka的zookeeper中我看到了/kafka路径,如果将zkRoot设置成我自己搭建storm在zookeeper集群路径,那怎么读取kafka数据呢?以我上面的例子看,该怎么写代码?可否给出这段代码的详细过程???谢谢
BrokerHosts hosts = new ZkHosts(zkConnString,”/kafka/brokers”);
默认是/brokers
楼主,你好!看到你这牌文章觉得写很好,新手非常实用。能给个联系方式吗?方便以后交流!谢谢
页面右上角有联系方式。
楼主,你好!你贴的代码里, String zkRoot = “/storm”; // default zookeeper root configuration for storm,zkRoot应该是/kafka,是笔误吧
zkRoot是为Storm配置的,不是Kafka。
楼主,我storm是0.8.2 zookeeper是3.4.6运行报错
6366 [Thread-20] ERROR backtype.storm.util – Async loop died!
java.lang.NoSuchMethodError: backtype.storm.task.TopologyContext.registerMetric(Ljava/lang/String;Lbacktype/storm/metric/api/IMetric;I)Lbacktype/storm/metric/api/IMetric;
at storm.kafka.KafkaSpout.open(KafkaSpout.java:81)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
6369 [Thread-20] ERROR backtype.storm.daemon.executor –
java.lang.NoSuchMethodError: backtype.storm.task.TopologyContext.registerMetric(Ljava/lang/String;Lbacktype/storm/metric/api/IMetric;I)Lbacktype/storm/metric/api/IMetric;
at storm.kafka.KafkaSpout.open(KafkaSpout.java:81)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
看这个错误NoSuchMethodError,明显是你使用的Storm用的JDK编译版本,与运行环境JDK版本不同导致的。
版本是相同的,都是jdk1.7,这和storm插件里pom相关配置没影响吗?因为我看插件里pom的一些引用和我实际项目里版本不同,如storm-kafka-0.9.6插件里storm版本是0.9.6
现在插件换成storm-kafka-0.9.6,报错如下:
4400 [Thread-6] INFO backtype.storm.daemon.worker – Worker 00da0547-69a2-4b73-bae6-da12a3d2bf02 for storm word-1-1461636951 on 874443c4-4b17-4dda-b65b-f56b16f21aee:1 has finished loading
4591 [Thread-16] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl – Starting
4636 [Thread-16] ERROR backtype.storm.util – Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:174)
at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:264)
at storm.kafka.ZkState.(ZkState.java:62)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
4656 [Thread-16] ERROR backtype.storm.daemon.executor –
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:174)
at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:264)
at storm.kafka.ZkState.(ZkState.java:62)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
4711 [Thread-16] INFO backtype.storm.util – Halting process: (“Worker died”)
楼主你好,我是storm-0.9.4,kafka_2.11-0.9.0.1的环境,提交拓扑没问题,但是读不到kafka里的数据,之前是0.9.6的storm,更改至此仍然报错如下,求解。
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) at backtype.storm.daemon.executor$fn__4654$fn__4669.invoke(executor.clj:522) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) … 5 more
你好,你这个问题解决了吗,我也遇到了提交拓扑没问题,但是读不到kafka里的数据。
楼主,你好,在storm与kafka集成中遇到一个问题:
2016-05-19 04:47:13.696 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=master:2181,worker01:
2181,worker02:2181 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@6bfa2a58
2016-05-19 04:47:13.699 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=master:2181,worker01:
2181,worker02:2181 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@1bcde435
2016-05-19 04:47:13.712 o.a.z.ClientCnxn [INFO] Opening socket connection to server master/192.168.216.128:2181.
Will not attempt to authenticate using SASL (unknown error)
2016-05-19 04:47:13.729 o.a.z.ClientCnxn [INFO] Socket connection established to master/192.168.216.128:2181, in
itiating session
2016-05-19 04:47:13.703 o.a.s.util [ERROR] Async loop died!
java.lang.NoSuchMethodError: org.apache.curator.utils.ZKPaths.fixForNamespace(Ljava/lang/String;Ljava/lang/Strin
g;Z)Ljava/lang/String;
at org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:104) ~[stormjar.ja
r:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:182) ~[s
tormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) ~[st
ormjar.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) ~[stormja
r.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) ~[stormjar.ja
r:?]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602) ~[storm-core-1.0.0.jar:1.
0.0]
at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
2016-05-19 04:47:13.749 o.a.s.d.executor [ERROR]
找了好久,也没有找到解决方法,请帮忙看下,谢谢
可以给我看看你的代码么。我最近也在学习这块知识。874083937
Node /kafka/brokers/ids/1 does not exist
跟您的配置一样的,但是一直提示这个错误是为什么呢?
请问楼主,在maven install的结果报错:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project kafkaWordCount: Compilation failure: Compilation failure:
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[10,34] 程序包org.apache.commons.logging不存在
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[11,34] 程序包org.apache.commons.logging不存在
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[36,38] 找不到符号
[ERROR] 符号: 类 Log
[ERROR] 位置: 类 topology.MyKafkaTopology.KafkaWordSplitter
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[65,38] 找不到符号
[ERROR] 符号: 类 Log
[ERROR] 位置: 类 topology.MyKafkaTopology.WordCounter
这个是什么问题呀?谢谢
请问一下楼主,Storm和kafka的版本有版本对应吗?我现在使用的是storm0.10.0,应该使用哪一版的kafka呀?谢谢
最近公司也想搭建一个实时日志采集和分析平台,主要有两个需求:
1、实时日志分析:打算采用 flume+kafaka+storm+hdfs
2、日志要保留到HDFS中:采用flume+kafaka+hdfs
这里有问问题想请教楼主:flume可以直接把数据写入Hdfs,有必要加入kafka(flume+kafaka+hdfs)作为缓冲吗?
你要考虑到,如果不使用Kafka缓冲消息,你的HDFS的写入速度是否能满足实时数据收集的速度,如果可以保证就可以不使用kafka。
使用Kafka的好处是,即使Flume路由的消息量很大,速度很快,通过加上Kakfa层进行缓冲,能够保证不丢失消息。
感谢!
还有个问题:如果使用flume+kafka+hdfs,如何实现kafka写入hdfs呢。我在官网的文档问看到“ HDFSSinkConnector ”,但是没有找到什么具体的案例,不知道楼主有没有什么经验或者这方面的建议?
我没有试过直接从Kafka将数据导出到HDFS,如果没有开源的组件,你可以自己实现Kafka Consumer,将数据写到HDFS,这个也比较简单可控。
请问下楼主,我的拓扑运行一段时间后就会报这样的错误,而且会有很多这样的错误,会是什么原因啊,困扰好久了
2016-08-25T13:20:33.605+0800 o.a.s.h.b.HdfsBolt [WARN] write/sync failed.
org.apache.hadoop.ipc.RemoteException: 0
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:505)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3528)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:702)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:228)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:506)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
at org.apache.hadoop.ipc.Client.call(Client.java:1347) ~[stormjar.jar:na]
at org.apache.hadoop.ipc.Client.call(Client.java:1300) ~[stormjar.jar:na]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~[stormjar.jar:na]
at com.sun.proxy.$Proxy7.getAdditionalDatanode(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) ~[stormjar.jar:na]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[stormjar.jar:na]
at com.sun.proxy.$Proxy7.getAdditionalDatanode(Unknown Source) ~[na:na]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) ~[stormjar.jar:na]
博主,您好,请问有监控kafka低级API消费者的工具吗?我用storm消费kafka数据,因为storm用的低级API,所以kafka eagle和manager好像都监控不到topic的消费情况。storm ui上有storm的数据,但是想看看kafka的监控记录
无法从zk中读取offset信息,每次都从开始读取,查看日志:
19534 [Thread-16-kafkaspout-executor[3 3]] INFO o.a.s.k.PartitionManager – Read partition information from: /kafkaspout/kafkaspout/partition_0 –> null
代码如下:
public class GpsMsgTopoplgy {
public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// 配置Zookeeper地址
String zk = SysConfig.getProperty(“storm.zookeeper.quorum”);
BrokerHosts brokerHosts = new ZkHosts(zk);
// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
String topic = SysConfig.getProperty(“storm.topic”);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, “/kafkaspout”, “kafkaspout”);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkPort=2181;
List servers = new ArrayList();
servers.add(“14.215.109.186″);
servers.add(“14.215.108.126″);
servers.add(“14.215.109.30″);
spoutConfig.zkServers = servers;
// 配置KafkaBolt中的kafka.broker.properties
Config conf = new Config();
Properties prop = new Properties();
// 配置Kafka broker地址
String brokerList = SysConfig.getProperty(“storm.kafka.brokerlist”);
prop.put(“bootstrap.servers”, brokerList);
// serializer.class为消息的序列化类
String serClass = SysConfig.getProperty(“storm.kafka.serializer.class”);
prop.put(“key.serializer”, serClass);
prop.put(“value.serializer”, serClass);
prop.put(“timeout.ms”, “60000″);
prop.put(“request.timeout.ms”, “60000″);
// 配置KafkaBolt生成的topic
String outputTopic = SysConfig.getProperty(“storm.kafka.output.topic”);
prop.put(“topic”, outputTopic);
conf.put(“topic”, outputTopic);
//spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“kafkaspout”, new KafkaSpout(spoutConfig), 1);// 启动线程数与kafka分区数一致
builder.setBolt(“computebolt”, new GpsMsgComputeBolt()).setNumTasks(1).shuffleGrouping(“kafkaspout”);
// builder.setBolt(“hbasebolt”, new
// GpsInfoHbaseBolt()).shuffleGrouping(“computebolt”);
// builder.setBolt(“hbasebolt”, new
// GpsInfoHbaseBolt()).fieldsGrouping(“computebolt”, new
// Fields(“company”));
/*
* builder.setBolt(“kafkabolt”, new
* KafkaBolt().withTupleToKafkaMapper(new
* GpsTupleToKafkaMapperImp())
* .withProducerProperties(prop)) .fieldsGrouping(“computebolt”, new
* Fields(“key”));
*/
//builder.setBolt(“hbasebolt”, new DistanceComputeToHbaseBolt()).setNumTasks(1).fieldsGrouping(“computebolt”, new Fields(“resmsg”,”detailmsg”));
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“Topo”, conf, builder.createTopology());
// Utils.sleep(100000);
// cluster.killTopology(“Topo”);
// cluster.shutdown();
}
}
}
麻烦帮忙看看是什么原因,谢谢啦
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(“hdfs://h1:8020″)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
楼主我把“h1:8020”换成自己的hadoop集群名就不行,没法运行,报错:
java.lang.RuntimeException: Error preparing HdfsBolt: java.net.UnknownHostException: pengtai
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:109) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: pengtai
知道啥情况吗,找了好几天没找到原因为啥不能用集群名,只能用主机名
在你的Storm集群的各个节点上,增加主机名pengtai到IP的映射。
答主,我的pengtai是handoop集群的名字。用集群名主要是为了在namenode1挂掉后嫩个自动切换到namenode2上。pengtai不是一个主机名。谢谢,请指教一下。
答主,我的storm集群是可以访问hadoop集群,可以查看文件等操作(命令行方式),就是代码中的接口改用集群名就报错。拜托了,请指点一下。谢谢~
楼主你好,请问kafka+spark streaming做实时分析可以吗?
如果你的业务应用场景对实时性要求不是很高,可以用这个方案,Spark Streaming走的是micro batch的方法。
最后的hdfs相关的class,是需要在maven加入jar对吗?好像博主没有写明。
KeeperErrorCode = NoNode for /kafka/brokers/topics/test/partitions
我按上文写的,出现这个错误。该如何办呢?有qq吗?
楼主,我想问下,我用的是storm 1.0.2,spoutConf.forceFromStart这个属性该怎么办啊?结果不发他也增加
Storm 1.x版本我也没尝试过,可能有些配置改了吧
楼主,为什么我的build的setBolt方法传参hdfsBolt的时候报错呢,提示不能解析这个方法,看了下源码 他需要传的是IRichBolt的实现,hdfsBolt继承的是BaseRichBolt
遇到类似问题
楼主,我使用的版本如下
storm-core 1.0.2
storm-kafka 1.0.2
kafka_2.10 0.10.1.0
配置文件如下
kafka
server.properties
listeners=PLAINTEXT://:9092
zookeeper.connect=test-s1:2181,test-s2:2181,test-s3:2181 使用的是zookeeper跟节点
查看了各个日志
现在就内部调用的spout报错,错误信息如下
k.c.SimpleConsumer [INFO] Reconnect due to error:
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) [stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2016-11-09 11:39:14.228 o.a.s.k.KafkaUtils [WARN] Network error when fetching messages:
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[?:1.7.0_80]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.7.0_80]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.7.0_80]
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2016-11-09 11:39:14.229 o.a.s.k.KafkaSpout [WARN] Fetch failed
org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
Caused by: java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[?:1.7.0_80]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.7.0_80]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.7.0_80]
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[stormjar.jar:?]
… 7 more
报错信息如下
看看是不是ZK集群所在节点的防火墙未关闭呢?
三台内网服务器防火墙都是关闭的
iptables 0:off 1:off 2:off 3:off 4:off 5:off 6:off
您好,你的问题解决了吗,我也遇到这个问题了
大概跟kafka配置文件有关,这行要加上,your.host.name替换为ip,
listeners = PLAINTEXT://your.host.name:9092
没在右上角看到您的联系方式,没想到您回的这么及时,我的qq是 151487510 网名:七个猕猴桃 若不嫌打扰,请加下我,谢谢!!
想问一下,您使用common-logging把日志记录在哪里了,本地文件吗
2014年的博文,不知道现在马上2017年有没有什么可替换的方案
只要能用、够用就你可以了。如果对新技术比较感兴趣,这个可替代方案很多的,可以尝试下面这几个开源框架:Flink、Spark Streaming、Apache Apex、Heron、Druid等等,每个都有不同的特点。
楼主,在命令行和代码中的h1,h2,h3都要替换成实际的ip地址吗?谢谢指教
我这里用的是主机名,你用IP也可以的。
博主您好,能帮我看一下我这个错误吗?
Exception in thread “main” java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at storm.kafka.KafkaConfig.(KafkaConfig.java:43)
at storm.kafka.SpoutConfig.(SpoutConfig.java:32)
at cn.tt.bigdata.storm_kafka.KafkaTopo.main(KafkaTopo.java:23)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 3 more
异常信息不是告诉你了, 类找不到?!
在整合HDFS的时候,我的pom依赖总是用不对,代码上导入包还是会出错,前辈能给出整合HDFS时的pom依赖吗?
看这里:https://github.com/shirdrn/opensource/blob/master/opensource-storm/pom.xml
还是建议你,用比较新版本的Storm,我这个实践时还比较早一些,有些依赖资源兼容性不是特别好。
楼主感谢您的分享,受益匪浅。在使用storm toplogy时出现了一次异常,maven编译通过,但是用Java命令运行class文件时,报错:
[root@stormtestsrv01 classes]# java MyKafkaTopology
Error: A JNI error has occurred, please check your installation and try again
Exception in thread “main” java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 7 more
能帮我看一下问题出在哪里吗?不胜感激
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
看看是不是没有引入这个类对应的JAR文件,可能是在你打包的时候,没有将对应的依赖JAR文件都打进去。
楼主,能帮我看个错误吗,当我向storm提交toplogy时报错,但是本地运行正常:SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/storm/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread “main” java.lang.ExceptionInInitializerError
at org.apache.storm.config$read_storm_config.invoke(config.clj:78)
at org.apache.storm.config$fn__908.invoke(config.clj:100)
at org.apache.storm.config__init.load(Unknown Source)
at org.apache.storm.config__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at clojure.lang.RT.classForName(RT.java:2154)
at clojure.lang.RT.classForName(RT.java:2163)
at clojure.lang.RT.loadClassForName(RT.java:2182)
at clojure.lang.RT.load(RT.java:436)
at clojure.lang.RT.load(RT.java:412)
at clojure.core$load$fn__5448.invoke(core.clj:5866)
at clojure.core$load.doInvoke(core.clj:5865)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.core$load_one.invoke(core.clj:5671)
at clojure.core$load_lib$fn__5397.invoke(core.clj:5711)
at clojure.core$load_lib.doInvoke(core.clj:5710)
at clojure.lang.RestFn.applyTo(RestFn.java:142)
at clojure.core$apply.invoke(core.clj:632)
at clojure.core$load_libs.doInvoke(core.clj:5753)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at clojure.core$apply.invoke(core.clj:634)
at clojure.core$use.doInvoke(core.clj:5843)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at org.apache.storm.command.config_value$loading__5340__auto____12276.invoke(config_value.clj:16)
at org.apache.storm.command.config_value__init.load(Unknown Source)
at org.apache.storm.command.config_value__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at clojure.lang.RT.classForName(RT.java:2154)
at clojure.lang.RT.classForName(RT.java:2163)
at clojure.lang.RT.loadClassForName(RT.java:2182)
at clojure.lang.RT.load(RT.java:436)
at clojure.lang.RT.load(RT.java:412)
at clojure.core$load$fn__5448.invoke(core.clj:5866)
at clojure.core$load.doInvoke(core.clj:5865)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.lang.Var.invoke(Var.java:379)
at org.apache.storm.command.config_value.(Unknown Source)
Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:383)
at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:427)
at org.apache.storm.utils.Utils.readStormConfig(Utils.java:463)
at org.apache.storm.utils.Utils.(Utils.java:177)
… 39 more
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at org.apache.storm.utils.Utils.getConfigFileInputStream(Utils.java:409)
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:362)
… 42 more
Running: /usr/java/jdk1.8.0_101/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/storm -Dstorm.log.dir=/storm/logs -Djava.library.path= -Dstorm.conf.file= -cp /storm/lib/log4j-1.2.17.jar:/storm/lib/slf4j-api-1.7.21.jar:/storm/lib/kafka_2.10-0.10.2.0.jar:/storm/lib/disruptor-3.3.2.jar:/storm/lib/objenesis-2.1.jar:/storm/lib/storm-rename-hack-1.1.0.jar:/storm/lib/log4j-core-2.8.jar:/storm/lib/clojure-1.7.0.jar:/storm/lib/log4j-over-slf4j-1.6.6.jar:/storm/lib/asm-5.0.3.jar:/storm/lib/ring-cors-0.1.5.jar:/storm/lib/metrics-core-2.2.0.jar:/storm/lib/jopt-simple-5.0.3.jar:/storm/lib/log4j-slf4j-impl-2.8.jar:/storm/lib/snappy-java-1.1.2.6.jar:/storm/lib/kryo-3.0.3.jar:/storm/lib/reflectasm-1.10.1.jar:/storm/lib/storm-core-1.1.0.jar:/storm/lib/servlet-api-2.5.jar:/storm/lib/scala-library-2.10.6.jar:/storm/lib/log4j-api-2.8.jar:/storm/lib/kafka-clients-0.10.2.0.jar:/storm/lib/zkclient-0.10.jar:/storm/lib/minlog-1.3.0.jar:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/storm/conf:/storm/bin -Dstorm.jar=/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} MyKafkaTopology 10.123.16.200
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/storm/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread “main” java.lang.RuntimeException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:140)
at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:167)
at backtype.storm.utils.Utils.readStormConfig(Utils.java:191)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:92)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:160)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:142)
at MyKafkaTopology.main(MyKafkaTopology.java:134)
谢谢你
可能是你的程序打包有问题,可以仔细看下这个提示:
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar.
检查一下,是不是自己实现的代码中有defaults.yaml这个配置文件。
请问在kafka+storm+hdfs架构中,由于storm中bolt的重发机制,如何保证消费的每条kafka数据只会往hdfs中写入一次?
楼主能发一下你这个storm整合的源代码吗?想学习一下
我去,我看到这篇帖子竟然都18年了 楼主能分享下整合的源码吗?
可以在这里查看源码:https://github.com/shirdrn/opensource/tree/master/opensource-storm/src/main/java/org/shirdrn/storm/examples
涉及到的相关代码,一般都可以到我的github上找到。
我也是18年.跟个新技术太难了,军哥已经不知道去哪高就了
无论我在哪里,博客都永久在这里。有问题可以随时讨论哈~0_0~
这个回复暖心度爆棚了!。。。
运行KafkaSpout 时老是报错找不到Node /brokers/ids/1,楼主知道怎么解决吗?
28064 [refresh-active-timer] INFO o.a.s.d.worker – All connections are ready for worker baa33fd0-5ead-4435-811b-76417fc97fd8:1027 with id d7752aea-f43d-44e6-b1ec-dc5fc20489e2
28283 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor – Preparing bolt __system:(-1)
28300 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor – Prepared bolt __system:(-1)
28318 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Opening spout spout:(3)
28331 [Thread-19-__acker-executor[1 1]] INFO o.a.s.d.executor – Preparing bolt __acker:(1)
28335 [Thread-19-__acker-executor[1 1]] INFO o.a.s.d.executor – Prepared bolt __acker:(1)
28336 [Thread-15-bolt1-executor[2 2]] INFO o.a.s.d.executor – Preparing bolt bolt1:(2)
28337 [Thread-15-bolt1-executor[2 2]] INFO o.a.s.d.executor – Prepared bolt bolt1:(2)
28619 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
28782 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
33456 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
33514 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
33580 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
33583 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
33584 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader – Read partition info from zookeeper: GlobalPartitionInformation{topic=topic2, partitionMap={}}
33586 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
33592 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Opened spout spout:(3)
33595 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Activating spout spout:(3)
33596 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Refreshing partition manager connections
38163 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
38168 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
38171 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader – Read partition info from zookeeper: GlobalPartitionInformation{topic=topic2, partitionMap={}}
38172 [Thread-17-spout-executor[3 3]] WARN o.a.s.k.KafkaUtils – there are more tasks than partitions (tasks: 1; partitions: 0), some tasks will be idle
38172 [Thread-17-spout-executor[3 3]] WARN o.a.s.k.KafkaUtils – Task [1/1] no partitions assigned
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Deleted partition managers: []
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] New partition managers: []
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Finished refreshing
1、检查一下你的Kafka集群正常吗?
2、你使用的这个Kafka的 topic对应ZK中的元数据是否正常?
Kafka集群可以正常在虚拟机内生产消费数据,但是在zkCli.sh中查看也没有/brokers/ids,,这是哪一部分的配置没做好吗?
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[controller_epoch, storm, brokers, zookeeper, admin, consumers, config]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics]
[zk: localhost:2181(CONNECTED) 2] ls /ids
Node does not exist: /ids
[zk: localhost:2181(CONNECTED) 3] ls /topics
Node does not exist: /topics
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[tracker, topic1, topic2, wordCount, orderMq, order]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/ids
[]
[zk: localhost:2181(CONNECTED) 6]
Exception in thread “main” java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 6 more
上传jar包到集群,这个错误是为什么啊?大神
看下你打包时,有把storm/kafka/BrokerHosts对应的jar文件打进去吗?
调了些后,现在storm UI上出现这个错误
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partition
b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:42) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__5624$fn__5639.invoke(executor.clj:564) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:477) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partitions
at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 7 more
大神求赐教啊!
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/client/HdfsDataOutputStream$SyncFlag
at org.shirdrn.storm.examples.DistributeWordTopology.main(DistributeWordTopology.java:120)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.client.HdfsDataOutputStream$SyncFlag
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 1 more
楼主这个错是为啥??求解答
java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/client/HdfsDataOutputStream$SyncFlag
找不到这个类啊,看下你的project里面的依赖,是不是少了。
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/RemoteIterator
楼主 这个是在哪个包啊?
这个类在hdfs对应的包里面。
这个怎么批处理啊
实现批处理有很多种选择:
1、可以将Flume之前收集的原始日志,从业务服务器上批量同步到HDFS,或LOAD到HIve,然后基于HDFS上数据,选择合适的批处理计算框架进行处理,如Spark、Hive、Impala等等;
2、通过Flume的Sink直接将收集的数据写入HDFS,或LOAD到Hive,然后进行批处理;
3、从Kafka集群,将数据LOAD到HDFS,或LOAD到Hive,然后进行批处理;
4、Storm处理过程中,走两个分支,一个是实时处理,另一个就是直接存储数据到HDFS,然后进行批处理。
根据你实际的情况,选择一种即可,推荐第1种选择。
storm on yarn 如何链接kafka
作为什么都不懂的小白,只是想问一下大博主,用消息中间件的话会不会造成消息处理的延时呢,如果不用的话,在spout接收消息时是不是还会产生额外处理,同样会增加延迟。谢谢~
博主,代码报错了,想要一份完整的 pom,急求回复啊!在线等,谢谢博主,感激不尽
楼主运行Kafkaspout的时候,报错,您看看这是为什么
21646 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
36729 [Thread-17-spout-executor[3 3]] ERROR o.a.c.ConnectionState – Connection timed out for connection string (192.168.52.138,192.168.52.135,192.168.52.139) and timeout (15000) / elapsed (15070)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-2.7.0.jar:?]
at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) [curator-2.7.0.jar:?]
at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:492) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.daemon.executor$fn__7885$fn__7900.invoke(executor.clj:601) [storm-core-1.0.1.jar:1.0.1]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) [storm-core-1.0.1.jar:1.0.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
楼主帮忙看一下运行Kafkaspout的时候出现如下错误
21646 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
36729 [Thread-17-spout-executor[3 3]] ERROR o.a.c.ConnectionState – Connection timed out for connection string (192.168.52.138,192.168.52.135,192.168.52.139) and timeout (15000) / elapsed (15070)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-2.7.0.jar:?]
at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) [curator-2.7.0.jar:?]
at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:492) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.daemon.executor$fn__7885$fn__7900.invoke(executor.clj:601) [storm-core-1.0.1.jar:1.0.1]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) [storm-core-1.0.1.jar:1.0.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
楼主,请问有 Strom + Druid 的例子吗