在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
- 直接使用Storm的Topology对数据进行实时分析处理
- 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理
实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:
- zookeeper-3.4.5.tar.gz
- kafka_2.9.2-0.8.1.1.tgz
- apache-storm-0.9.2-incubating.tar.gz
- hadoop-2.2.0.tar.gz
程序配置运行所基于的操作系统为CentOS 5.11。
Kafka安装配置
我们使用3台机器搭建Kafka集群:
1 2 3 | 192.168.4.142 h1 192.168.4.143 h2 192.168.4.144 h3 |
在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
首先,在h1上准备Kafka安装文件,执行如下命令:
1 2 3 4 5 | cd /usr/local/ wget http: //mirror .bit.edu.cn /apache/kafka/0 .8.1.1 /kafka_2 .9.2-0.8.1.1.tgz tar xvzf kafka_2.9.2-0.8.1.1.tgz ln -s /usr/local/kafka_2 .9.2-0.8.1.1 /usr/local/kafka chown -R kafka:kafka /usr/local/kafka_2 .9.2-0.8.1.1 /usr/local/kafka |
修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
1 2 | broker.id=0 zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
1 | zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka |
而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
1 2 | cd /usr/local/zookeeper bin /zkCli .sh |
在ZooKeeper执行如下命令创建chroot路径:
1 | create /kafka '' |
这样,每次连接Kafka集群的时候(使用--zookeeper
选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:
1 2 | scp -r /usr/local/kafka_2 .9.2-0.8.1.1/ h2: /usr/local/ scp -r /usr/local/kafka_2 .9.2-0.8.1.1/ h3: /usr/local/ |
最后,在h2、h3节点上配置,执行如下命令:
1 2 3 | cd /usr/local/ ln -s /usr/local/kafka_2 .9.2-0.8.1.1 /usr/local/kafka chown -R kafka:kafka /usr/local/kafka_2 .9.2-0.8.1.1 /usr/local/kafka |
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
1 2 3 | broker.id=1 # 在h1修改 broker.id=2 # 在h2修改 |
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
1 | bin /kafka-server-start .sh /usr/local/kafka/config/server .properties & |
可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
1 | bin /kafka-topics .sh --create --zookeeper h1:2181,h2:2181,h3:2181 /kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5 |
查看创建的Topic,执行如下命令:
1 | bin /kafka-topics .sh --describe --zookeeper h1:2181,h2:2181,h3:2181 /kafka --topic my-replicated-topic5 |
结果信息如下所示:
1 2 3 4 5 6 | Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs: Topic: my-replicated-topic5 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,0,1 Topic: my-replicated-topic5 Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,2,1 Topic: my-replicated-topic5 Partition: 4 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1 |
上面Leader、Replicas、Isr的含义如下:
1 2 3 4 | Partition: 分区 Leader : 负责读写指定分区的节点 Replicas : 复制该分区log的节点列表 Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader |
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
1 | bin /kafka-console-producer .sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5 |
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
1 | bin /kafka-console-consumer .sh --zookeeper h1:2181,h2:2181,h3:2181 /kafka --from-beginning --topic my-replicated-topic5 |
可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
Storm安装配置
Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
1 2 3 | 192.168.4.142 h1 192.168.4.143 h2 192.168.4.144 h3 |
首先,在h1节点上,执行如下命令安装:
1 2 3 4 5 | cd /usr/local/ wget http: //mirror .bit.edu.cn /apache/incubator/storm/apache-storm-0 .9.2-incubating /apache-storm-0 .9.2-incubating. tar .gz tar xvzf apache-storm-0.9.2-incubating. tar .gz ln -s /usr/local/apache-storm-0 .9.2-incubating /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0 .9.2-incubating /usr/local/storm |
然后,修改配置文件conf/storm.yaml,内容如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | storm.zookeeper.servers: - "h1" - "h2" - "h3" storm.zookeeper.port: 2181 # nimbus.host: "h1" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/tmp/storm" |
将配置好的安装文件,分发到其他节点上:
1 2 | scp -r /usr/local/apache-storm-0 .9.2-incubating/ h2: /usr/local/ scp -r /usr/local/apache-storm-0 .9.2-incubating/ h3: /usr/local/ |
最后,在h2、h3节点上配置,执行如下命令:
1 2 3 | cd /usr/local/ ln -s /usr/local/apache-storm-0 .9.2-incubating /usr/local/storm chown -R storm:storm /usr/local/apache-storm-0 .9.2-incubating /usr/local/storm |
Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
1 2 | bin /storm nimbus & bin /storm supervisor & |
为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
1 | bin /storm ui & |
这样可以通过访问http://h2:8080/来查看Topology的运行状况。
整合Kafka+Storm
消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | < dependency > < groupId >org.apache.storm</ groupId > < artifactId >storm-core</ artifactId > < version >0.9.2-incubating</ version > < scope >provided</ scope > </ dependency > < dependency > < groupId >org.apache.storm</ groupId > < artifactId >storm-kafka</ artifactId > < version >0.9.2-incubating</ version > </ dependency > < dependency > < groupId >org.apache.kafka</ groupId > < artifactId >kafka_2.9.2</ artifactId > < version >0.8.1.1</ version > < exclusions > < exclusion > < groupId >org.apache.zookeeper</ groupId > < artifactId >zookeeper</ artifactId > </ exclusion > < exclusion > < groupId >log4j</ groupId > < artifactId >log4j</ artifactId > </ exclusion > </ exclusions > </ dependency > |
下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | package org.shirdrn.storm.examples; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MyKafkaTopology { public static class KafkaWordSplitter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordSplitter. class ); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; } @Override public void execute(Tuple input) { String line = input.getString( 0 ); LOG.info( "RECV[kafka -> splitter] " + line); String[] words = line.split( "\\s+" ); for (String word : words) { LOG.info( "EMIT[splitter -> counter] " + word); collector.emit(input, new Values(word, 1 )); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" , "count" )); } } public static class WordCounter extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(WordCounter. class ); private static final long serialVersionUID = 886149197481637894L; private OutputCollector collector; private Map<String, AtomicInteger> counterMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; this .counterMap = new HashMap<String, AtomicInteger>(); } @Override public void execute(Tuple input) { String word = input.getString( 0 ); int count = input.getInteger( 1 ); LOG.info( "RECV[splitter -> counter] " + word + " : " + count); AtomicInteger ai = this .counterMap.get(word); if (ai == null ) { ai = new AtomicInteger(); this .counterMap.put(word, ai); } ai.addAndGet(count); collector.ack(input); LOG.info( "CHECK statistics map: " + this .counterMap); } @Override public void cleanup() { LOG.info( "The final result:" ); Iterator<Entry<String, AtomicInteger>> iter = this .counterMap.entrySet().iterator(); while (iter.hasNext()) { Entry<String, AtomicInteger> entry = iter.next(); LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" , "count" )); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { String zks = "h1:2181,h2:2181,h3:2181" ; String topic = "my-replicated-topic5" ; String zkRoot = "/storm" ; // default zookeeper root configuration for storm String id = "word" ; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme()); spoutConf.forceFromStart = false ; spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" }); spoutConf.zkPort = 2181 ; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 ); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5 builder.setBolt( "word-splitter" , new KafkaWordSplitter(), 2 ).shuffleGrouping( "kafka-reader" ); builder.setBolt( "word-counter" , new WordCounter()).fieldsGrouping( "word-splitter" , new Fields( "word" )); Config conf = new Config(); String name = MyKafkaTopology. class .getSimpleName(); if (args != null && args.length > 0 ) { // Nimbus host name passed from command line conf.put(Config.NIMBUS_HOST, args[ 0 ]); conf.setNumWorkers( 3 ); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism( 3 ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep( 60000 ); cluster.shutdown(); } } } |
上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
1 2 3 4 5 6 7 8 | cp /usr/local/kafka/libs/kafka_2 .9.2-0.8.1.1.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/scala-library-2 .9.2.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/metrics-core-2 .2.0.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/snappy-java-1 .0.5.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/zkclient-0 .3.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/log4j-1 .2.15.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/slf4j-api-1 .7.2.jar /usr/local/storm/lib/ cp /usr/local/kafka/libs/jopt-simple-3 .2.jar /usr/local/storm/lib/ |
然后,就可以提交我们开发的Topology程序了:
1 | bin /storm jar /home/storm/storm-examples-0 .0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1 |
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
1 | spoutConf.forceFromStart = false ; |
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。
整合Storm+HDFS
Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | package org.shirdrn.storm.examples; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class StormToHDFSTopology { public static class EventSpout extends BaseRichSpout { private static final Log LOG = LogFactory.getLog(EventSpout. class ); private static final long serialVersionUID = 886149197481637894L; private SpoutOutputCollector collector; private Random rand; private String[] records; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; rand = new Random(); records = new String[] { "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" , "10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02" , "10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35" }; } @Override public void nextTuple() { Utils.sleep( 1000 ); DateFormat df = new SimpleDateFormat( "yyyy-MM-dd_HH-mm-ss" ); Date d = new Date(System.currentTimeMillis()); String minute = df.format(d); String record = records[rand.nextInt(records.length)]; LOG.info( "EMIT[spout -> hdfs] " + minute + " : " + record); collector.emit( new Values(minute, record)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "minute" , "record" )); } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { // use "|" instead of "," for field delimiter RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter( " : " ); // sync the filesystem after every 1k tuples SyncPolicy syncPolicy = new CountSyncPolicy( 1000 ); // rotate files FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" ); HdfsBolt hdfsBolt = new HdfsBolt() .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "event-spout" , new EventSpout(), 3 ); builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).fieldsGrouping( "event-spout" , new Fields( "minute" )); Config conf = new Config(); String name = StormToHDFSTopology. class .getSimpleName(); if (args != null && args.length > 0 ) { conf.put(Config.NIMBUS_HOST, args[ 0 ]); conf.setNumWorkers( 3 ); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism( 3 ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep( 60000 ); cluster.shutdown(); } } } |
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | < plugin > < groupId >org.apache.maven.plugins</ groupId > < artifactId >maven-shade-plugin</ artifactId > < version >1.4</ version > < configuration > < createDependencyReducedPom >true</ createDependencyReducedPom > </ configuration > < executions > < execution > < phase >package</ phase > < goals > < goal >shade</ goal > </ goals > < configuration > < transformers > < transformer implementation = "org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> < transformer implementation = "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > < mainClass ></ mainClass > </ transformer > </ transformers > </ configuration > </ execution > </ executions > </ plugin > |
整合Kafka+Storm+HDFS
上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | package org.shirdrn.storm.examples; import java.util.Arrays; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class DistributeWordTopology { public static class KafkaWordToUpperCase extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class ); private static final long serialVersionUID = -5207232012035109026L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; } @Override public void execute(Tuple input) { String line = input.getString( 0 ).trim(); LOG.info( "RECV[kafka -> splitter] " + line); if (!line.isEmpty()) { String upperLine = line.toUpperCase(); LOG.info( "EMIT[splitter -> counter] " + upperLine); collector.emit(input, new Values(upperLine, upperLine.length())); } collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "line" , "len" )); } } public static class RealtimeBolt extends BaseRichBolt { private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase. class ); private static final long serialVersionUID = -4115132557403913367L; private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this .collector = collector; } @Override public void execute(Tuple input) { String line = input.getString( 0 ).trim(); LOG.info( "REALTIME: " + line); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException { // Configure Kafka String zks = "h1:2181,h2:2181,h3:2181" ; String topic = "my-replicated-topic5" ; String zkRoot = "/storm" ; // default zookeeper root configuration for storm String id = "word" ; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme( new StringScheme()); spoutConf.forceFromStart = false ; spoutConf.zkServers = Arrays.asList( new String[] { "h1" , "h2" , "h3" }); spoutConf.zkPort = 2181 ; // Configure HDFS bolt RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter( "\t" ); // use "\t" instead of "," for field delimiter SyncPolicy syncPolicy = new CountSyncPolicy( 1000 ); // sync the filesystem after every 1k tuples FileRotationPolicy rotationPolicy = new TimedRotationPolicy( 1 .0f, TimeUnit.MINUTES); // rotate files FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath( "/storm/" ).withPrefix( "app_" ).withExtension( ".log" ); // set file name format HdfsBolt hdfsBolt = new HdfsBolt() .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); // configure & build topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "kafka-reader" , new KafkaSpout(spoutConf), 5 ); builder.setBolt( "to-upper" , new KafkaWordToUpperCase(), 3 ).shuffleGrouping( "kafka-reader" ); builder.setBolt( "hdfs-bolt" , hdfsBolt, 2 ).shuffleGrouping( "to-upper" ); builder.setBolt( "realtime" , new RealtimeBolt(), 2 ).shuffleGrouping( "to-upper" ); // submit topology Config conf = new Config(); String name = DistributeWordTopology. class .getSimpleName(); if (args != null && args.length > 0 ) { String nimbus = args[ 0 ]; conf.put(Config.NIMBUS_HOST, nimbus); conf.setNumWorkers( 3 ); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); } else { conf.setMaxTaskParallelism( 3 ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(name, conf, builder.createTopology()); Thread.sleep( 60000 ); cluster.shutdown(); } } } |
上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
1 | bin /storm jar ~ /storm-examples-0 .0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1 |
可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。
参考链接
- http://kafka.apache.org/
- http://kafka.apache.org/documentation.html
- https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
- http://storm.apache.org/
- http://storm.apache.org/documentation/Tutorial.html
- http://storm.apache.org/documentation/FAQ.html
- https://github.com/ptgoetz/storm-hdfs

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
请问你有弄过flume+kafka+storm+hdfs这一套完整的日志分析系统吗?有没有相关的中文资料分享一下的
是的,目前我们正在使用这套系统做日志离线收集分析,另外,从Storm这里还做了分发,对日志进行实时分析。
你们数据量有多大,如果需要写hdfs多个分区,小文件问题怎么解决
难道各个之间的整合都得自己开发么?可是我不是搞开发的,java语言我也不懂,这样的话岂不是无法实现这样的整合了吗?或者留下你的联系方式,跟你请教一下吧!
如果你不是搞开发的,那这一套对你来说就没有太大用处了。我想你就是想实时收集日志数据,最终存储到HDFS吧,那么就没有必要搞这一套了,可以直接使用Flume+HDFS这两个就能搞定,不需要编写任何代码,只需配置即可。
请问flume+hdfs 只能实现日志收集的功能吗,现在开发不做这块,如果单从运维层面出发,又要做日志收集,又要做日志分析请问那种架构比较合适
关键看你怎么使用,结合你的业务特点,有一些还是很适合的。Flume是一个日志收集工具,它支持直接收集日志文件(如指定日志生成目录),也可以自定义Flume Agent的source组件使Flume变成一个服务(如启动一个Socket服务)。HDFS就是用来存储数据,你要是想分析数据,完全可以依赖Hadoop平台的计算能力去处理(使用Python等编写MapReduce程序)。从运维的层面来实现,可以使用Flume+HDFS+Hive,将日志收集过来写入HDFS,然后导入Hive表进行分析,我想类SQL语句写起来,应该比直接编写难度较大的程序代码要容易多了吧。
这个yunwei还是可以玩玩ELK的
ELK是什么?
数据分析方案
楼主,您能不能把您上面的程序包发我一下,特别是那个KafkaSpout怎么处理的万分感谢!
这个类是Storm发行包的external中有的,可以查看这个Module:storm-kafka,或者直接在Maven配置pom.xml中增加该依赖。
感谢楼主提供的资料,已参照部署到集群中。但要注意一个问题:
配置kafka时,如果使用zookeeper create /kafka创建了节点,kafka与storm集成时new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然会报java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。
storm-kafka插件默认kafka的 zk_path如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = “/brokers”;
对,你说的没错。安装Kafka的时候如果不覆盖默认的zk路径,Kafka所使用的相关路径都会在zk的根路径下面。
哥们,能详细说说你这个问题吗?我好像遇到类似问题了。
博主,这套东西可以实现从HDFS上读取数据到STORM中实时处理么
可以啊,开发一个自己的Spout,其中包含从HDFS中读取数据的逻辑。
求这个逻辑实现,军哥
您好,博主
broker.id=1 # 在h1修改
broker.id=2 # 在h2修改
哪儿h3上是不是也要改一下,还有我的zookeeper是利用cdh,用yum的方式安装的,可以吗?
curl -LO http://archive.cloudera.com/cdh5/one-click-install/redhat/6/x86_64/cloudera-cdh-5-0.x86_64.rpm
sudo yum localinstall cloudera-cdh-5-0.x86_64.rpm -y
sudo yum clean all -y
yum repolist
sudo rpm –import http://archive.cloudera.com/cdh5/redhat/5/x86_64/cdh/RPM-GPG-KEY-cloudera
#3.Install Zookeeper
yum install zookeeper* -y
cat >/etc/zookeeper/conf/zoo.cfg <<EOF
maxClientCnxns=50
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/lib/zookeeper
clientPort=2181
server.1=h1:2888:3888
server.2=h2:2888:3888
server.3=h3:2888:3888
EOF
Kafka集群你有几个broker,就需要配置几个不同的id。
ZooKeeper和你使用什么方式安装没关系,不要把ZooKeeper里面的myid和Kafka的broker id搞混淆了。
楼主,您这一套用于生产环境了吗?求教我要实现用户行为分析,推荐系统的话Kafka+Storm+HDFS合适吗?新手,感谢赐教
是的。建议使用比较新的版本,Storm最新稳定版是0.9.5,老的版本有些BUG。
多谢楼主。还望多多指教。先行谢过
生产者产生消息,消费者没有消息输出。是什么原因?麻烦给点建议吧。
不知道楼主的storm与HDFS的集成有没有试过如果hadoop做了HA,是否还可以指定一个namenode节点
这个倒是没试过。不知道最新版的storm-hdfs有没有支持配置多个NN地址,如果没有的话,可以自己重写HdfsBolt这个的实现逻辑,如果发现Active NN与Standby NN发生切换,在HdfsBolt里面进行切换处理。
恕我冒昧的说一下。。。。。我按你的思路写了一个差不多的程序。。。。结果数据。。。有重复。。。就是一个把数据从kafka里取出来再写入本地文件里。。我存到10000条数据到kafka,写到文件里却是10648条。。。。发现有648条重复数据。。。不知是什么问题。。
我猜,可能是这个原因,Storm在写入HDFS(NN1)时已经成功,这时NameNode NN1挂掉,切换到NameNode NN2,但是此时还没有给HdfsBolt的前一个组件发送ACK,所以这种情况下Storm会认为那天记录没有发送成功,记录就会重新emit,重新emit后,数据再次写入HDFS(NN2),这样,一条记录重复写了2遍。
如果你不使用Storm Trident来保证(batch transaction),这种情况下就是存在重复的问题,但是基本它不会丢失数据,这就存在一条记录至少被发送一次(可能是多次)。你要有个应对这种情况的方案,比如识别记录唯一性,进行去重。
我是刚刚开始学strom,为什么我下载maven的依赖都下不来呢
有些依赖可能需要翻墙才能下载下来,你可以找个翻墙软件试试。如果不是这个问题,尝试使用Maven多构建几次,可能会构建成功。
拜读楼主好文!我想请问楼主,有没有做过数据流从storm流向kafka的呢?我的想法是数据源在storm上做一些数据的预处理,补空值或者加字段之类,然后输出到kafka,再由kafka流向另一个处理系统,所以请楼主指点,例子或者相关文档。谢谢!
这个就很容易了,你可以在Storm程序中,使用Kafka Producer将处理后的数据,再写回到Kafka的另一个topic中,其实就是简单地使用Kafka Producer API而已。
好的,谢谢楼主指导!
楼主有QQ吗,我现在正在开发一套后台日志 收集+分析系统,也是准备用 zk+kafka+storm, 但是不熟悉这套,请楼主指教一下 QQ 442583890
楼主 我按着你的粘过来 后 报这个错误java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
curator.jar找不到。你有遇到过这个吗?
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
估计是String zkRoot = “/storm”; // default zookeeper root configuration for storm
和我的不一样。你的注释没看懂,这是个什么?
1、你用的是哪个版本的Storm?这个问题应该是Storm的external库的依赖包缺少,你查看一下storm-kafka这个external库,它应该依赖了curator这个开源库。
2、Storm依赖Zookeeper,他需要保存一些元数据,需要在Zookeeper中创建一个根path,就是你安装Storm的时候配置中指定的,不指定的话默认是/storm,可以连接到Zookeeper上查看该path。
真的是storm版本的问题。谢谢楼主
请问楼主,要是我的数据源是其他小组通过指定端口发送过来的,有没有什么办法,直接配置一下就可以存到kafka队列中去?不然是不是还得在producer里面再去端口取数据,然后再存到队列中去?这样是不是有点麻烦?有没什么简单的办法?
用flume就可以实现了
按照楼主的教程配置后,worker日志中报错…
2015-10-29T09:13:45.136+0800 o.a.z.ClientCnxn [INFO] Socket connection established to VM195/172.19.5.231:2181, initiating session
2015-10-29T09:13:45.153+0800 o.a.z.ClientCnxn [INFO] Session establishment complete on server VM195/172.19.5.231:2181, sessionid = 0x150a2bacdd203e1, negotiated timeout = 20000
2015-10-29T09:13:45.162+0800 o.a.c.f.s.ConnectionStateManager [INFO] State change: CONNECTED
2015-10-29T09:13:45.208+0800 s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=localhost:9092}}
2015-10-29T09:13:45.211+0800 s.k.KafkaUtils [INFO] Task [1/1] assigned [Partition{host=localhost:9092, partition=0}]
2015-10-29T09:13:45.211+0800 s.k.ZkCoordinator [INFO] Task [1/1] Deleted partition managers: []
2015-10-29T09:13:45.211+0800 s.k.ZkCoordinator [INFO] Task [1/1] New partition managers: [Partition{host=localhost:9092, partition=0}]
2015-10-29T09:13:45.750+0800 s.k.PartitionManager [INFO] Read partition information from: /storm/word/partition_0 –> null
2015-10-29T09:13:45.842+0800 k.c.SimpleConsumer [INFO] Reconnect due to socket error: java.nio.channels.ClosedChannelException
2015-10-29T09:13:45.852+0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.8.2.2.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.PartitionManager.(PartitionManager.java:83) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.5.jar:0.9.5]
… 6 common frames omitted
2015-10-29T09:13:45.853+0800 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.nio.channels.ClosedChannelException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:565) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
Caused by: java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[kafka_2.11-0.8.2.2.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.8.2.2.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.PartitionManager.(PartitionManager.java:83) ~[storm-kafka-0.9.5.jar:0.9.5]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.5.jar:0.9.5]
… 6 common frames omitted
2015-10-29T09:13:45.904+0800 b.s.util [ERROR] Halting process: (“Worker died”)
java.lang.RuntimeException: (“Worker died”)
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__7028$fn__7029.invoke(worker.clj:497) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$mk_executor_data$fn__6480$fn__6481.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65]
2015-10-29T09:13:45.909+0800 b.s.d.worker [INFO] Shutting down worker MyKafkaTopology-2-1446081174 618ce090-046e-43fb-bdd7-a26e98eb57ba 6700
我现在也遇到这个问题了,初步百度到时因为kafka服务端配置了host.name=localhost 。不知道你后来是怎么解决的
看看你的Kafka是不是和Storm在一个节点上,如果不是,Storm里面Kafka Spout读local怎么可能读到Kafka里面的Topic呢
您好,没太明白您这句话的意思。能否解答一下。Kafka和Storm在一个节点上指的是zookeeper的节点么?需要怎么查看呢。
哦,Kafka如果配置的多节点的集群,配置中host.name肯定是要改成实际的IP或主机名,然后在Storm中读取Kakfa数据时指定正确的ZooKeeper地址信息(参考文中配置的ZK的信息——Host:Port列表),ZooKeeper是要知道Kafka Broker实际地址的(而非Kafka默认的localhost)。(PS:是我上面回复产生误解了,忽略掉,以这个为准吧)
我的kafka集群命令行可以读到数据,而且在集群外的kafka节点也可以消费集群中的数据。现在代码报如下错误。能帮看一下么。或者我能加下您的联系方式么?困扰已久,望大神帮忙。我的qq 867015234.
64669 [Thread-8-kafka-reader] ERROR backtype.storm.util – Async loop died!
java.lang.RuntimeException: java.nio.channels.ClosedByInterruptException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_79]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681) ~[na:1.7.0_79]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.9.2-0.8.1.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.PartitionManager.(PartitionManager.java:82) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 6 common frames omitted
64669 [Thread-8-kafka-reader] ERROR backtype.storm.daemon.executor –
java.lang.RuntimeException: java.nio.channels.ClosedByInterruptException
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__3284$fn__3299$fn__3328.invoke(executor.clj:563) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
Caused by: java.nio.channels.ClosedByInterruptException: null
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) ~[na:1.7.0_79]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681) ~[na:1.7.0_79]
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[kafka_2.9.2-0.8.1.1.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.9.2-0.8.1.1.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.PartitionManager.(PartitionManager.java:82) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 6 common frames omitted
64671 [Thread-8-kafka-reader] ERROR org.apache.zookeeper.server.NIOServerCnxnFactory – Thread Thread[Thread-8-kafka-reader,5,main] died
java.lang.RuntimeException: java.lang.InterruptedException
at backtype.storm.util$wrap_in_runtime.invoke(util.clj:44) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$exists_node_QMARK_$fn__1658.invoke(zookeeper.clj:102) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:98) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:114) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.cluster$mk_distributed_cluster_state$reify__1866.mkdirs(cluster.clj:109) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.cluster$mk_storm_cluster_state$reify__2285.report_error(cluster.clj:368) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$throttled_report_error_fn$fn__3132.invoke(executor.clj:178) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$mk_executor_data$fn__3185$fn__3186.invoke(executor.clj:237) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__452.invoke(util.clj:441) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) ~[na:1.7.0_79]
Caused by: java.lang.InterruptedException: null
at java.lang.Object.wait(Native Method) ~[na:1.7.0_79]
at java.lang.Object.wait(Object.java:503) ~[na:1.7.0_79]
at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1309) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1036) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[curator-client-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) ~[curator-framework-2.4.0.jar:na]
at backtype.storm.zookeeper$exists_node_QMARK_$fn__1658.invoke(zookeeper.clj:101) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
… 9 common frames omitted
原因分析:程序从zookeeper上获取topic上的分区信息中的broker的host信息为localhost:9092
由于测试机器和zookeeper集群不在一个局域网,不能解析主机名为localhost的信息,导致异常。
解决方法:
1 修改测试机器上的hosts文件,将DP映射到具体的ip地址
2 或者将执行程序放到和zookeeper集群一个网段的机器上执行。
非常详细的好文
您好,我在您的教程“安装配置storm”中,storm ui显示错误如下:
org.apache.thrift7.transport.TTransportException: java.net.ConnectException: Connection refused
at org.apache.thrift7.transport.TSocket.open(TSocket.java:183)
at org.apache.thrift7.transport.TFramedTransport.open(TFramedTransport.java:81)
at backtype.storm.thrift$nimbus_client_and_conn.invoke(thrift.clj:75)
at backtype.storm.ui.core$cluster_summary.invoke(core.clj:455)
at backtype.storm.ui.core$fn__8223.invoke(core.clj:789)
at compojure.core$make_route$fn__3365.invoke(core.clj:93)
at compojure.core$if_route$fn__3353.invoke(core.clj:39)
at compojure.core$if_method$fn__3346.invoke(core.clj:24)
at compojure.core$routing$fn__3371.invoke(core.clj:106)
at clojure.core$some.invoke(core.clj:2443)
at compojure.core$routing.doInvoke(core.clj:106)
at clojure.lang.RestFn.applyTo(RestFn.java:139)
at clojure.core$apply.invoke(core.clj:619)
我的storm.yaml如下:
storm.zookeeper.servers:
– “1.1.1.1″
– “2.2.2.2″
– “3.3.3.3″
– “4.4.4.4″
storm.zookeeper.port: 2333
#
nimbus.host: “2.2.2.2″
supervisor.slots.ports:
– 6700
– 6701
– 6702
– 6703
– 6704
storm.local.dir: “/tmp/storm”
请问怎么回事呢?如果信息提供不完善的话,请您告知一下~
查看一下启动Storm的各个进程的日志吧:nimbus/ui,貌似Storm集群没有启动成功。
对的,nimbus log有这个:
2015-11-16 02:04:33 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=202.114.10.131:2333,202.114.10.134:2333,211.69.198.234:2333,211.69.198.247:2333 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@7296c1fc
2015-11-16 02:04:33 o.a.z.ClientCnxn [INFO] Opening socket connection to server 211.69.198.234/211.69.198.234:2333. Will not attempt to authenticate using SASL (unknown error)
2015-11-16 02:04:33 o.a.z.ClientCnxn [WARN] Session 0×0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_71]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_71]
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1068) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
2015-11-16 02:04:41 o.a.z.ClientCnxn [INFO] Opening socket connection to server 202.114.10.131/202.114.10.131:2333. Will not attempt to authenticate using SASL (unknown error)
2015-11-16 02:04:41 o.a.z.ClientCnxn [WARN] Session 0×0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
然后一直Connection refused。。。
也就是说,reconnect接下来就是connection refused
ZooKeeper的地址不是随便配置的,一般ZooKeeper集群都是奇数个节点,将storm.zookeeper.servers参数配置成有效的ZooKeeper集群地址(ZooKeeper集群节点上要把防火墙关掉)。
解决,果然是ZooKeeper的问题。。谢谢您!
博主,
org.apache.storm.hdfs.bolt.HdfsBolt
在哪个jar中
比如,使用0.9.3版本的storm,可以看到apache-storm-0.9.3\external\storm-hdfs\storm-hdfs-0.9.3.jar这个文件,就在这里的。
博主,我用storm集群从kafka读取数据,处理后写到hdfs。为什么storm运行一段时间大概半小时就变成脑残了,重新又写一次。不会是因为重复读取kafka数据的缘故吧?我的kafka里面有几万条数据,处理一次大概几分钟就处理完了。
根据你的描述,任何一个环节都有可能有问题,无法定位具体原因。
博主,请问你日志是怎么进去消息队列的。
自己程序中写入的还是,把原先生成的log文件中的内容收集进去的?
用Flume的话,1.6.0+版本自带一个Kafka Sink,只需要配置即可,其实写入kafka,就是Producer做的事情,如果你想自己写,直接熟悉一下kafka的Producer使用即可。
楼主您好!你的這一套资料、源码及文档还在吗?可不可以给我发一份呢?想学学!
代码我基本都贴出来了,其他的也就是你配置一下环境就可以了。
博主,请教你一个问题。数据采集的解析工作适合放在flume端还是放在storm端比较合适?
如果能够保证解析的逻辑通用,并且后续几乎不会随着实际业务需求变化,可以考虑将简单的解析逻辑在flume端实现。更一般来说,你最好把Flume就看成一个收集日志的工具,具体包含业务逻辑处理的内容都放到外部,比如storm中。
看了您的文章,思路得到了新的提高,现在想把hadoop里的数据推送到移动端或邮件中,kafka有这样的功能么?
或者推送到微信中等,对于微信订阅推送也还没有了解呢?
完全可以的,不过实现方式和使用其他一些MQ可能有很大不同。要知道,从Kafka消费某个Topic的时候,是PULL模式消费,需要自己实现:从Kafka拉取数据并主动推送到手机端或邮件中。
后来怎么解决的
博主,您好,
想问一下,能不能用kafka直接写入HDFS,不经过storm这一环节呢?
可以的,可以参考下LinkedIn的Camus,现在已经被Gobblin取代了,参考链接:https://github.com/linkedin/camus
你好,我遇到这么一个错误:java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8434 [Thread-17-storm-hdfs] INFO backtype.storm.daemon.executor – Preparing bolt storm-hdfs:(6)
8435 [Thread-15-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8435 [Thread-17-storm-hdfs] ERROR backtype.storm.util – Async loop died!
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8436 [Thread-17-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8443 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×51 zxid:0×22 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955 Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955
8445 [Thread-19-storm-hdfs] INFO backtype.storm.daemon.executor – Preparing bolt storm-hdfs:(7)
8447 [Thread-19-storm-hdfs] ERROR backtype.storm.util – Async loop died!
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8447 [Thread-19-storm-hdfs] ERROR backtype.storm.daemon.executor –
java.lang.IllegalStateException: SyncPolicy must be specified.
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:86) ~[stormwordcount.jar:na]
at backtype.storm.daemon.executor$fn__6647$fn__6659.invoke(executor.clj:692) ~[storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:461) ~[storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8455 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×55 zxid:0×24 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs
8456 [Thread-21-word-splitter] INFO backtype.storm.daemon.executor – Preparing bolt word-splitter:(8)
8460 [Thread-21-word-splitter] INFO backtype.storm.daemon.executor – Prepared bolt word-splitter:(8)
8461 [ProcessThread(sid:0 cport:-1):] INFO org.apache.storm.zookeeper.server.PrepRequestProcessor – Got user-level KeeperException when processing sessionid:0x1534fdf653e000b type:create cxid:0×56 zxid:0×25 txntype:-1 reqpath:n/a Error Path:/storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs Error:KeeperErrorCode = NodeExists for /storm/errors/MyKafkaTopologytoHdfs-1-1457333955/storm-hdfs
8490 [Thread-17-storm-hdfs] ERROR backtype.storm.util – Halting process: (“Worker died”)
java.lang.RuntimeException: (“Worker died”)
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__7024$fn__7025.invoke(worker.clj:493) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.daemon.executor$mk_executor_data$fn__6480$fn__6481.invoke(executor.clj:240) [storm-core-0.9.5.jar:0.9.5]
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:473) [storm-core-0.9.5.jar:0.9.5]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
8490 [Thread-15-storm-hdfs] ERROR backtype.storm.util – Halting process: (“Worker died”)
博主你好,请问对于socket连接接收的实时数据,以实时存储为目的,可以用kafka+strom+hdfs的框架吗?
可以啊,通过Socket Server接收数据,推进Kafka里面,后面的就直接进入Storm处理,最后存储到HDFS。
HDFS应该是对一次写,多次读比较擅长,那么Strom实时存储数据到HDFS合理吗?Strom往HDFS存的方式是来一个存一个还是mini的批量呢?或者两种方式都可以的话是怎样设置选择其中一种呢?
请教个问题,我想接别人的数据所以自己部署的storm,zookeeper和别人的kafka、zookeeper不在同一个集群,比如:我的storm在cm1,cm2,cm3,cm4,cm5,zookeeper在cm1,cm2,cm3。而别人的kafka和zookeeper在sd-jp01,sd-jp02,sd-jp03 部分代码如下:
, String zks=”sd-jp01:2181,sd-jp02:2181,sd-jp03:2181″;
String topic=”hydra.conversions”;
String zkRoot=”/kafka”;
String id=”word”;
BrokerHosts brokerHosts=new ZkHosts(zks);
SpoutConfig spoutConf=new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme=new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers=Arrays.asList(new String[]{“cm1″,”cm2″,”cm3″});
spoutConf.zkPort=2181;
其中zkRoot是kafka在zookeeper下的路径,但是为什么有这个错误:ava.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/hydra.conversions/partitions at storm.kafka.???请指点一下谢谢,或者加我qq:937038088
这种错误,很显然你是在访问对方Kafka的默认ZooKeeper路径,所以可以确定对方Kafka集群安装时所指定的不是默认路径。
代码中zkRoot指的是Storm在ZooKeeper中存储元数据的路径,也就是你自己搭建的ZooKeeper集群中的路径。
在集群sd-jp01–sd-jp03中kafka的zookeeper中我看到了/kafka路径,如果将zkRoot设置成我自己搭建storm在zookeeper集群路径,那怎么读取kafka数据呢?以我上面的例子看,该怎么写代码?可否给出这段代码的详细过程???谢谢
BrokerHosts hosts = new ZkHosts(zkConnString,”/kafka/brokers”);
默认是/brokers
楼主,你好!看到你这牌文章觉得写很好,新手非常实用。能给个联系方式吗?方便以后交流!谢谢
页面右上角有联系方式。
楼主,你好!你贴的代码里, String zkRoot = “/storm”; // default zookeeper root configuration for storm,zkRoot应该是/kafka,是笔误吧
zkRoot是为Storm配置的,不是Kafka。
楼主,我storm是0.8.2 zookeeper是3.4.6运行报错
6366 [Thread-20] ERROR backtype.storm.util – Async loop died!
java.lang.NoSuchMethodError: backtype.storm.task.TopologyContext.registerMetric(Ljava/lang/String;Lbacktype/storm/metric/api/IMetric;I)Lbacktype/storm/metric/api/IMetric;
at storm.kafka.KafkaSpout.open(KafkaSpout.java:81)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
6369 [Thread-20] ERROR backtype.storm.daemon.executor –
java.lang.NoSuchMethodError: backtype.storm.task.TopologyContext.registerMetric(Ljava/lang/String;Lbacktype/storm/metric/api/IMetric;I)Lbacktype/storm/metric/api/IMetric;
at storm.kafka.KafkaSpout.open(KafkaSpout.java:81)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
看这个错误NoSuchMethodError,明显是你使用的Storm用的JDK编译版本,与运行环境JDK版本不同导致的。
版本是相同的,都是jdk1.7,这和storm插件里pom相关配置没影响吗?因为我看插件里pom的一些引用和我实际项目里版本不同,如storm-kafka-0.9.6插件里storm版本是0.9.6
现在插件换成storm-kafka-0.9.6,报错如下:
4400 [Thread-6] INFO backtype.storm.daemon.worker – Worker 00da0547-69a2-4b73-bae6-da12a3d2bf02 for storm word-1-1461636951 on 874443c4-4b17-4dda-b65b-f56b16f21aee:1 has finished loading
4591 [Thread-16] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl – Starting
4636 [Thread-16] ERROR backtype.storm.util – Async loop died!
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:174)
at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:264)
at storm.kafka.ZkState.(ZkState.java:62)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
4656 [Thread-16] ERROR backtype.storm.daemon.executor –
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V
at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29)
at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:174)
at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94)
at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55)
at org.apache.curator.ConnectionState.reset(ConnectionState.java:219)
at org.apache.curator.ConnectionState.start(ConnectionState.java:103)
at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:190)
at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:264)
at storm.kafka.ZkState.(ZkState.java:62)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:85)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)
4711 [Thread-16] INFO backtype.storm.util – Halting process: (“Worker died”)
楼主你好,我是storm-0.9.4,kafka_2.11-0.9.0.1的环境,提交拓扑没问题,但是读不到kafka里的数据,之前是0.9.6的storm,更改至此仍然报错如下,求解。
java.lang.NoClassDefFoundError: org/apache/curator/RetryPolicy at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) at backtype.storm.daemon.executor$fn__4654$fn__4669.invoke(executor.clj:522) at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:724) Caused by: java.lang.ClassNotFoundException: org.apache.curator.RetryPolicy at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) … 5 more
你好,你这个问题解决了吗,我也遇到了提交拓扑没问题,但是读不到kafka里的数据。
楼主,你好,在storm与kafka集成中遇到一个问题:
2016-05-19 04:47:13.696 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=master:2181,worker01:
2181,worker02:2181 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@6bfa2a58
2016-05-19 04:47:13.699 o.a.z.ZooKeeper [INFO] Initiating client connection, connectString=master:2181,worker01:
2181,worker02:2181 sessionTimeout=20000 watcher=org.apache.curator.ConnectionState@1bcde435
2016-05-19 04:47:13.712 o.a.z.ClientCnxn [INFO] Opening socket connection to server master/192.168.216.128:2181.
Will not attempt to authenticate using SASL (unknown error)
2016-05-19 04:47:13.729 o.a.z.ClientCnxn [INFO] Socket connection established to master/192.168.216.128:2181, in
itiating session
2016-05-19 04:47:13.703 o.a.s.util [ERROR] Async loop died!
java.lang.NoSuchMethodError: org.apache.curator.utils.ZKPaths.fixForNamespace(Ljava/lang/String;Ljava/lang/Strin
g;Z)Ljava/lang/String;
at org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:104) ~[stormjar.ja
r:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594)
~[stormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:182) ~[s
tormjar.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) ~[st
ormjar.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) ~[stormja
r.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) ~[stormjar.ja
r:?]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__8158$fn__8173.invoke(executor.clj:602) ~[storm-core-1.0.0.jar:1.
0.0]
at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:482) [storm-core-1.0.0.jar:1.0.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
2016-05-19 04:47:13.749 o.a.s.d.executor [ERROR]
找了好久,也没有找到解决方法,请帮忙看下,谢谢
可以给我看看你的代码么。我最近也在学习这块知识。874083937
Node /kafka/brokers/ids/1 does not exist
跟您的配置一样的,但是一直提示这个错误是为什么呢?
请问楼主,在maven install的结果报错:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project kafkaWordCount: Compilation failure: Compilation failure:
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[10,34] 程序包org.apache.commons.logging不存在
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[11,34] 程序包org.apache.commons.logging不存在
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[36,38] 找不到符号
[ERROR] 符号: 类 Log
[ERROR] 位置: 类 topology.MyKafkaTopology.KafkaWordSplitter
[ERROR] /D:/Study/eclipseWorkshop/kafkaWordCount/src/main/java/topology/MyKafkaTopology.java:[65,38] 找不到符号
[ERROR] 符号: 类 Log
[ERROR] 位置: 类 topology.MyKafkaTopology.WordCounter
这个是什么问题呀?谢谢
请问一下楼主,Storm和kafka的版本有版本对应吗?我现在使用的是storm0.10.0,应该使用哪一版的kafka呀?谢谢
最近公司也想搭建一个实时日志采集和分析平台,主要有两个需求:
1、实时日志分析:打算采用 flume+kafaka+storm+hdfs
2、日志要保留到HDFS中:采用flume+kafaka+hdfs
这里有问问题想请教楼主:flume可以直接把数据写入Hdfs,有必要加入kafka(flume+kafaka+hdfs)作为缓冲吗?
你要考虑到,如果不使用Kafka缓冲消息,你的HDFS的写入速度是否能满足实时数据收集的速度,如果可以保证就可以不使用kafka。
使用Kafka的好处是,即使Flume路由的消息量很大,速度很快,通过加上Kakfa层进行缓冲,能够保证不丢失消息。
感谢!
还有个问题:如果使用flume+kafka+hdfs,如何实现kafka写入hdfs呢。我在官网的文档问看到“ HDFSSinkConnector ”,但是没有找到什么具体的案例,不知道楼主有没有什么经验或者这方面的建议?
我没有试过直接从Kafka将数据导出到HDFS,如果没有开源的组件,你可以自己实现Kafka Consumer,将数据写到HDFS,这个也比较简单可控。
请问下楼主,我的拓扑运行一段时间后就会报这样的错误,而且会有很多这样的错误,会是什么原因啊,困扰好久了
2016-08-25T13:20:33.605+0800 o.a.s.h.b.HdfsBolt [WARN] write/sync failed.
org.apache.hadoop.ipc.RemoteException: 0
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:505)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3528)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:702)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getAdditionalDatanode(AuthorizationProviderProxyClientProtocol.java:228)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:506)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
at org.apache.hadoop.ipc.Client.call(Client.java:1347) ~[stormjar.jar:na]
at org.apache.hadoop.ipc.Client.call(Client.java:1300) ~[stormjar.jar:na]
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) ~[stormjar.jar:na]
at com.sun.proxy.$Proxy7.getAdditionalDatanode(Unknown Source) ~[na:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_45]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) ~[stormjar.jar:na]
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[stormjar.jar:na]
at com.sun.proxy.$Proxy7.getAdditionalDatanode(Unknown Source) ~[na:na]
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823) ~[stormjar.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475) ~[stormjar.jar:na]
博主,您好,请问有监控kafka低级API消费者的工具吗?我用storm消费kafka数据,因为storm用的低级API,所以kafka eagle和manager好像都监控不到topic的消费情况。storm ui上有storm的数据,但是想看看kafka的监控记录
无法从zk中读取offset信息,每次都从开始读取,查看日志:
19534 [Thread-16-kafkaspout-executor[3 3]] INFO o.a.s.k.PartitionManager – Read partition information from: /kafkaspout/kafkaspout/partition_0 –> null
代码如下:
public class GpsMsgTopoplgy {
public static void main(String[] args)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
// 配置Zookeeper地址
String zk = SysConfig.getProperty(“storm.zookeeper.quorum”);
BrokerHosts brokerHosts = new ZkHosts(zk);
// 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
String topic = SysConfig.getProperty(“storm.topic”);
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, “/kafkaspout”, “kafkaspout”);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.zkPort=2181;
List servers = new ArrayList();
servers.add(“14.215.109.186″);
servers.add(“14.215.108.126″);
servers.add(“14.215.109.30″);
spoutConfig.zkServers = servers;
// 配置KafkaBolt中的kafka.broker.properties
Config conf = new Config();
Properties prop = new Properties();
// 配置Kafka broker地址
String brokerList = SysConfig.getProperty(“storm.kafka.brokerlist”);
prop.put(“bootstrap.servers”, brokerList);
// serializer.class为消息的序列化类
String serClass = SysConfig.getProperty(“storm.kafka.serializer.class”);
prop.put(“key.serializer”, serClass);
prop.put(“value.serializer”, serClass);
prop.put(“timeout.ms”, “60000″);
prop.put(“request.timeout.ms”, “60000″);
// 配置KafkaBolt生成的topic
String outputTopic = SysConfig.getProperty(“storm.kafka.output.topic”);
prop.put(“topic”, outputTopic);
conf.put(“topic”, outputTopic);
//spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“kafkaspout”, new KafkaSpout(spoutConfig), 1);// 启动线程数与kafka分区数一致
builder.setBolt(“computebolt”, new GpsMsgComputeBolt()).setNumTasks(1).shuffleGrouping(“kafkaspout”);
// builder.setBolt(“hbasebolt”, new
// GpsInfoHbaseBolt()).shuffleGrouping(“computebolt”);
// builder.setBolt(“hbasebolt”, new
// GpsInfoHbaseBolt()).fieldsGrouping(“computebolt”, new
// Fields(“company”));
/*
* builder.setBolt(“kafkabolt”, new
* KafkaBolt().withTupleToKafkaMapper(new
* GpsTupleToKafkaMapperImp())
* .withProducerProperties(prop)) .fieldsGrouping(“computebolt”, new
* Fields(“key”));
*/
//builder.setBolt(“hbasebolt”, new DistanceComputeToHbaseBolt()).setNumTasks(1).fieldsGrouping(“computebolt”, new Fields(“resmsg”,”detailmsg”));
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“Topo”, conf, builder.createTopology());
// Utils.sleep(100000);
// cluster.killTopology(“Topo”);
// cluster.shutdown();
}
}
}
麻烦帮忙看看是什么原因,谢谢啦
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl(“hdfs://h1:8020″)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
楼主我把“h1:8020”换成自己的hadoop集群名就不行,没法运行,报错:
java.lang.RuntimeException: Error preparing HdfsBolt: java.net.UnknownHostException: pengtai
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:109) ~[stormjar.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: pengtai
知道啥情况吗,找了好几天没找到原因为啥不能用集群名,只能用主机名
在你的Storm集群的各个节点上,增加主机名pengtai到IP的映射。
答主,我的pengtai是handoop集群的名字。用集群名主要是为了在namenode1挂掉后嫩个自动切换到namenode2上。pengtai不是一个主机名。谢谢,请指教一下。
答主,我的storm集群是可以访问hadoop集群,可以查看文件等操作(命令行方式),就是代码中的接口改用集群名就报错。拜托了,请指点一下。谢谢~
楼主你好,请问kafka+spark streaming做实时分析可以吗?
如果你的业务应用场景对实时性要求不是很高,可以用这个方案,Spark Streaming走的是micro batch的方法。
最后的hdfs相关的class,是需要在maven加入jar对吗?好像博主没有写明。
KeeperErrorCode = NoNode for /kafka/brokers/topics/test/partitions
我按上文写的,出现这个错误。该如何办呢?有qq吗?
楼主,我想问下,我用的是storm 1.0.2,spoutConf.forceFromStart这个属性该怎么办啊?结果不发他也增加
Storm 1.x版本我也没尝试过,可能有些配置改了吧
楼主,为什么我的build的setBolt方法传参hdfsBolt的时候报错呢,提示不能解析这个方法,看了下源码 他需要传的是IRichBolt的实现,hdfsBolt继承的是BaseRichBolt
遇到类似问题
楼主,我使用的版本如下
storm-core 1.0.2
storm-kafka 1.0.2
kafka_2.10 0.10.1.0
配置文件如下
kafka
server.properties
listeners=PLAINTEXT://:9092
zookeeper.connect=test-s1:2181,test-s2:2181,test-s3:2181 使用的是zookeeper跟节点
查看了各个日志
现在就内部调用的spout报错,错误信息如下
k.c.SimpleConsumer [INFO] Reconnect due to error:
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) [stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2016-11-09 11:39:14.228 o.a.s.k.KafkaUtils [WARN] Network error when fetching messages:
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[?:1.7.0_80]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.7.0_80]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.7.0_80]
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) [stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) [stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
2016-11-09 11:39:14.229 o.a.s.k.KafkaSpout [WARN] Fetch failed
org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:200) ~[stormjar.jar:?]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:149) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) [storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) [storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_80]
Caused by: java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229) ~[?:1.7.0_80]
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.7.0_80]
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.7.0_80]
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) ~[stormjar.jar:?]
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[stormjar.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[stormjar.jar:?]
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[stormjar.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[stormjar.jar:?]
at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[stormjar.jar:?]
… 7 more
报错信息如下
看看是不是ZK集群所在节点的防火墙未关闭呢?
三台内网服务器防火墙都是关闭的
iptables 0:off 1:off 2:off 3:off 4:off 5:off 6:off
您好,你的问题解决了吗,我也遇到这个问题了
大概跟kafka配置文件有关,这行要加上,your.host.name替换为ip,
listeners = PLAINTEXT://your.host.name:9092
没在右上角看到您的联系方式,没想到您回的这么及时,我的qq是 151487510 网名:七个猕猴桃 若不嫌打扰,请加下我,谢谢!!
想问一下,您使用common-logging把日志记录在哪里了,本地文件吗
2014年的博文,不知道现在马上2017年有没有什么可替换的方案
只要能用、够用就你可以了。如果对新技术比较感兴趣,这个可替代方案很多的,可以尝试下面这几个开源框架:Flink、Spark Streaming、Apache Apex、Heron、Druid等等,每个都有不同的特点。
楼主,在命令行和代码中的h1,h2,h3都要替换成实际的ip地址吗?谢谢指教
我这里用的是主机名,你用IP也可以的。
博主您好,能帮我看一下我这个错误吗?
Exception in thread “main” java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at storm.kafka.KafkaConfig.(KafkaConfig.java:43)
at storm.kafka.SpoutConfig.(SpoutConfig.java:32)
at cn.tt.bigdata.storm_kafka.KafkaTopo.main(KafkaTopo.java:23)
Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 3 more
异常信息不是告诉你了, 类找不到?!
在整合HDFS的时候,我的pom依赖总是用不对,代码上导入包还是会出错,前辈能给出整合HDFS时的pom依赖吗?
看这里:https://github.com/shirdrn/opensource/blob/master/opensource-storm/pom.xml
还是建议你,用比较新版本的Storm,我这个实践时还比较早一些,有些依赖资源兼容性不是特别好。
楼主感谢您的分享,受益匪浅。在使用storm toplogy时出现了一次异常,maven编译通过,但是用Java命令运行class文件时,报错:
[root@stormtestsrv01 classes]# java MyKafkaTopology
Error: A JNI error has occurred, please check your installation and try again
Exception in thread “main” java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 7 more
能帮我看一下问题出在哪里吗?不胜感激
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
看看是不是没有引入这个类对应的JAR文件,可能是在你打包的时候,没有将对应的依赖JAR文件都打进去。
楼主,能帮我看个错误吗,当我向storm提交toplogy时报错,但是本地运行正常:SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/storm/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread “main” java.lang.ExceptionInInitializerError
at org.apache.storm.config$read_storm_config.invoke(config.clj:78)
at org.apache.storm.config$fn__908.invoke(config.clj:100)
at org.apache.storm.config__init.load(Unknown Source)
at org.apache.storm.config__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at clojure.lang.RT.classForName(RT.java:2154)
at clojure.lang.RT.classForName(RT.java:2163)
at clojure.lang.RT.loadClassForName(RT.java:2182)
at clojure.lang.RT.load(RT.java:436)
at clojure.lang.RT.load(RT.java:412)
at clojure.core$load$fn__5448.invoke(core.clj:5866)
at clojure.core$load.doInvoke(core.clj:5865)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.core$load_one.invoke(core.clj:5671)
at clojure.core$load_lib$fn__5397.invoke(core.clj:5711)
at clojure.core$load_lib.doInvoke(core.clj:5710)
at clojure.lang.RestFn.applyTo(RestFn.java:142)
at clojure.core$apply.invoke(core.clj:632)
at clojure.core$load_libs.doInvoke(core.clj:5753)
at clojure.lang.RestFn.applyTo(RestFn.java:137)
at clojure.core$apply.invoke(core.clj:634)
at clojure.core$use.doInvoke(core.clj:5843)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at org.apache.storm.command.config_value$loading__5340__auto____12276.invoke(config_value.clj:16)
at org.apache.storm.command.config_value__init.load(Unknown Source)
at org.apache.storm.command.config_value__init.(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at clojure.lang.RT.classForName(RT.java:2154)
at clojure.lang.RT.classForName(RT.java:2163)
at clojure.lang.RT.loadClassForName(RT.java:2182)
at clojure.lang.RT.load(RT.java:436)
at clojure.lang.RT.load(RT.java:412)
at clojure.core$load$fn__5448.invoke(core.clj:5866)
at clojure.core$load.doInvoke(core.clj:5865)
at clojure.lang.RestFn.invoke(RestFn.java:408)
at clojure.lang.Var.invoke(Var.java:379)
at org.apache.storm.command.config_value.(Unknown Source)
Caused by: java.lang.RuntimeException: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:383)
at org.apache.storm.utils.Utils.readDefaultConfig(Utils.java:427)
at org.apache.storm.utils.Utils.readStormConfig(Utils.java:463)
at org.apache.storm.utils.Utils.(Utils.java:177)
… 39 more
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at org.apache.storm.utils.Utils.getConfigFileInputStream(Utils.java:409)
at org.apache.storm.utils.Utils.findAndReadConfigFile(Utils.java:362)
… 42 more
Running: /usr/java/jdk1.8.0_101/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/storm -Dstorm.log.dir=/storm/logs -Djava.library.path= -Dstorm.conf.file= -cp /storm/lib/log4j-1.2.17.jar:/storm/lib/slf4j-api-1.7.21.jar:/storm/lib/kafka_2.10-0.10.2.0.jar:/storm/lib/disruptor-3.3.2.jar:/storm/lib/objenesis-2.1.jar:/storm/lib/storm-rename-hack-1.1.0.jar:/storm/lib/log4j-core-2.8.jar:/storm/lib/clojure-1.7.0.jar:/storm/lib/log4j-over-slf4j-1.6.6.jar:/storm/lib/asm-5.0.3.jar:/storm/lib/ring-cors-0.1.5.jar:/storm/lib/metrics-core-2.2.0.jar:/storm/lib/jopt-simple-5.0.3.jar:/storm/lib/log4j-slf4j-impl-2.8.jar:/storm/lib/snappy-java-1.1.2.6.jar:/storm/lib/kryo-3.0.3.jar:/storm/lib/reflectasm-1.10.1.jar:/storm/lib/storm-core-1.1.0.jar:/storm/lib/servlet-api-2.5.jar:/storm/lib/scala-library-2.10.6.jar:/storm/lib/log4j-api-2.8.jar:/storm/lib/kafka-clients-0.10.2.0.jar:/storm/lib/zkclient-0.10.jar:/storm/lib/minlog-1.3.0.jar:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar:/storm/conf:/storm/bin -Dstorm.jar=/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} MyKafkaTopology 10.123.16.200
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/storm/lib/log4j-slf4j-impl-2.8.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Exception in thread “main” java.lang.RuntimeException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar. [jar:file:/storm/lib/storm-core-1.1.0.jar!/defaults.yaml, jar:file:/storm/kafkalink/target/kafkalink-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/defaults.yaml]
at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:140)
at backtype.storm.utils.Utils.readDefaultConfig(Utils.java:167)
at backtype.storm.utils.Utils.readStormConfig(Utils.java:191)
at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:92)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:160)
at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(StormSubmitter.java:142)
at MyKafkaTopology.main(MyKafkaTopology.java:134)
谢谢你
可能是你的程序打包有问题,可以仔细看下这个提示:
Caused by: java.io.IOException: Found multiple defaults.yaml resources. You’re probably bundling the Storm jars with your topology jar.
检查一下,是不是自己实现的代码中有defaults.yaml这个配置文件。
请问在kafka+storm+hdfs架构中,由于storm中bolt的重发机制,如何保证消费的每条kafka数据只会往hdfs中写入一次?
楼主能发一下你这个storm整合的源代码吗?想学习一下
我去,我看到这篇帖子竟然都18年了 楼主能分享下整合的源码吗?
可以在这里查看源码:https://github.com/shirdrn/opensource/tree/master/opensource-storm/src/main/java/org/shirdrn/storm/examples
涉及到的相关代码,一般都可以到我的github上找到。
我也是18年.跟个新技术太难了,军哥已经不知道去哪高就了
无论我在哪里,博客都永久在这里。有问题可以随时讨论哈~0_0~
这个回复暖心度爆棚了!。。。
运行KafkaSpout 时老是报错找不到Node /brokers/ids/1,楼主知道怎么解决吗?
28064 [refresh-active-timer] INFO o.a.s.d.worker – All connections are ready for worker baa33fd0-5ead-4435-811b-76417fc97fd8:1027 with id d7752aea-f43d-44e6-b1ec-dc5fc20489e2
28283 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor – Preparing bolt __system:(-1)
28300 [Thread-21-__system-executor[-1 -1]] INFO o.a.s.d.executor – Prepared bolt __system:(-1)
28318 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Opening spout spout:(3)
28331 [Thread-19-__acker-executor[1 1]] INFO o.a.s.d.executor – Preparing bolt __acker:(1)
28335 [Thread-19-__acker-executor[1 1]] INFO o.a.s.d.executor – Prepared bolt __acker:(1)
28336 [Thread-15-bolt1-executor[2 2]] INFO o.a.s.d.executor – Preparing bolt bolt1:(2)
28337 [Thread-15-bolt1-executor[2 2]] INFO o.a.s.d.executor – Prepared bolt bolt1:(2)
28619 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
28782 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
33456 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
33514 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
33580 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
33583 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
33584 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader – Read partition info from zookeeper: GlobalPartitionInformation{topic=topic2, partitionMap={}}
33586 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
33592 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Opened spout spout:(3)
33595 [Thread-17-spout-executor[3 3]] INFO o.a.s.d.executor – Activating spout spout:(3)
33596 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Refreshing partition manager connections
38163 [Thread-17-spout-executor[3 3]-EventThread] INFO o.a.c.f.s.ConnectionStateManager – State change: CONNECTED
38168 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
38171 [Thread-17-spout-executor[3 3]] ERROR o.a.s.k.DynamicBrokersReader – Node /brokers/ids/1 does not exist
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader – Read partition info from zookeeper: GlobalPartitionInformation{topic=topic2, partitionMap={}}
38172 [Thread-17-spout-executor[3 3]] WARN o.a.s.k.KafkaUtils – there are more tasks than partitions (tasks: 1; partitions: 0), some tasks will be idle
38172 [Thread-17-spout-executor[3 3]] WARN o.a.s.k.KafkaUtils – Task [1/1] no partitions assigned
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Deleted partition managers: []
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] New partition managers: []
38172 [Thread-17-spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator – Task [1/1] Finished refreshing
1、检查一下你的Kafka集群正常吗?
2、你使用的这个Kafka的 topic对应ZK中的元数据是否正常?
Kafka集群可以正常在虚拟机内生产消费数据,但是在zkCli.sh中查看也没有/brokers/ids,,这是哪一部分的配置没做好吗?
WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[controller_epoch, storm, brokers, zookeeper, admin, consumers, config]
[zk: localhost:2181(CONNECTED) 1] ls /brokers
[ids, topics]
[zk: localhost:2181(CONNECTED) 2] ls /ids
Node does not exist: /ids
[zk: localhost:2181(CONNECTED) 3] ls /topics
Node does not exist: /topics
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[tracker, topic1, topic2, wordCount, orderMq, order]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/ids
[]
[zk: localhost:2181(CONNECTED) 6]
Exception in thread “main” java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 6 more
上传jar包到集群,这个错误是为什么啊?大神
看下你打包时,有把storm/kafka/BrokerHosts对应的jar文件打进去吗?
调了些后,现在storm UI上出现这个错误
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partition
b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partitions
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:42) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.daemon.executor$fn__5624$fn__5639.invoke(executor.clj:564) ~[storm-core-0.10.0.jar:0.10.0]
at backtype.storm.util$async_loop$fn__545.invoke(util.clj:477) [storm-core-0.10.0.jar:0.10.0]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_79]
Caused by: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/my-replicated-topic5/partitions
at storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:94) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:65) ~[storm-kafka-0.9.2-incubating.jar:0.9.2-incubating]
… 7 more
大神求赐教啊!
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/client/HdfsDataOutputStream$SyncFlag
at org.shirdrn.storm.examples.DistributeWordTopology.main(DistributeWordTopology.java:120)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hdfs.client.HdfsDataOutputStream$SyncFlag
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 1 more
楼主这个错是为啥??求解答
java.lang.NoClassDefFoundError: org/apache/hadoop/hdfs/client/HdfsDataOutputStream$SyncFlag
找不到这个类啊,看下你的project里面的依赖,是不是少了。
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/RemoteIterator
楼主 这个是在哪个包啊?
这个类在hdfs对应的包里面。
这个怎么批处理啊
实现批处理有很多种选择:
1、可以将Flume之前收集的原始日志,从业务服务器上批量同步到HDFS,或LOAD到HIve,然后基于HDFS上数据,选择合适的批处理计算框架进行处理,如Spark、Hive、Impala等等;
2、通过Flume的Sink直接将收集的数据写入HDFS,或LOAD到Hive,然后进行批处理;
3、从Kafka集群,将数据LOAD到HDFS,或LOAD到Hive,然后进行批处理;
4、Storm处理过程中,走两个分支,一个是实时处理,另一个就是直接存储数据到HDFS,然后进行批处理。
根据你实际的情况,选择一种即可,推荐第1种选择。
storm on yarn 如何链接kafka
作为什么都不懂的小白,只是想问一下大博主,用消息中间件的话会不会造成消息处理的延时呢,如果不用的话,在spout接收消息时是不是还会产生额外处理,同样会增加延迟。谢谢~
博主,代码报错了,想要一份完整的 pom,急求回复啊!在线等,谢谢博主,感激不尽
楼主运行Kafkaspout的时候,报错,您看看这是为什么
21646 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
36729 [Thread-17-spout-executor[3 3]] ERROR o.a.c.ConnectionState – Connection timed out for connection string (192.168.52.138,192.168.52.135,192.168.52.139) and timeout (15000) / elapsed (15070)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-2.7.0.jar:?]
at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) [curator-2.7.0.jar:?]
at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:492) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.daemon.executor$fn__7885$fn__7900.invoke(executor.clj:601) [storm-core-1.0.1.jar:1.0.1]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) [storm-core-1.0.1.jar:1.0.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
楼主帮忙看一下运行Kafkaspout的时候出现如下错误
21646 [Thread-17-spout-executor[3 3]] INFO o.a.c.f.i.CuratorFrameworkImpl – Starting
36729 [Thread-17-spout-executor[3 3]] ERROR o.a.c.ConnectionState – Connection timed out for connection string (192.168.52.138,192.168.52.135,192.168.52.139) and timeout (15000) / elapsed (15070)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss
at org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:197) [curator-2.7.0.jar:?]
at org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:87) [curator-2.7.0.jar:?]
at org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:115) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.CuratorFrameworkImpl.getZooKeeper(CuratorFrameworkImpl.java:492) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) [curator-2.7.0.jar:?]
at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:1) [curator-2.7.0.jar:?]
at org.apache.storm.kafka.DynamicBrokersReader.getNumPartitions(DynamicBrokersReader.java:111) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:84) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:44) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:58) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:77) [storm-kafka-1.0.1.jar:1.0.1]
at org.apache.storm.daemon.executor$fn__7885$fn__7900.invoke(executor.clj:601) [storm-core-1.0.1.jar:1.0.1]
at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:482) [storm-core-1.0.1.jar:1.0.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
楼主,请问有 Strom + Druid 的例子吗