Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。
Flink集群安装配置
首先,选择一主二从共3个节点来安装配置Flink Standalone集群:
1 2 3 | Master:ali-bj01-tst-cluster-001.xiweiai.cn Worker:ali-bj01-tst-cluster-002.xiweiai.cn Worker:ali-bj01-tst-cluster-003.xiweiai.cn |
为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。
在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令:
1 2 | wget http: //mirror .bit.edu.cn /apache/flink/flink-1 .4.0 /flink-1 .4.0-bin-hadoop26-scala_2.11.tgz tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz |
接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示:
1 2 3 4 5 6 | env.java.home: /usr/local/java/jdk1.8.0_131/ jobmanager.rpc.address: ali-bj01-tst-cluster-001.xiweiai.cn # The heap size for the JobManager JVM jobmanager.heap.mb: 2048 # The heap size for the TaskManager JVM taskmanager.heap.mb: 2048 |
通过设置env.java.home属性,指定JAVA_HOME环境变量的值;jobmanager.rpc.address表示Flink集群的JobManager的RPC地址;设置jobmanager.heap.mb和taskmanager.heap.mb分别指定JobManager和TaskManager的堆内存大小,这个需要根据实际节点的资源情况进行配置,这里每个Worker节点都配置为2G大小。
然后,修改conf/slaves文件,增加Worker节点的主机名或IP列表,内容如下所示:
1 2 | ali-bj01-tst-cluster-002.xiweiai.cn ali-bj01-tst-cluster-003.xiweiai.cn |
最后,将配置完成的Flink安装文件,远程拷贝到各个Worker节点:
1 2 | scp -r /mnt/bd/data/flink/flink-1 .4.0 ali-bj01-tst-cluster-002.xiweiai.cn: /mnt/bd/data/flink/ scp -r /mnt/bd/data/flink/flink-1 .4.0 ali-bj01-tst-cluster-003.xiweiai.cn: /mnt/bd/data/flink/ |
启动集群
启动Flink集群,执行如下命令:
1 | bin /start-cluster .sh |
它会在Master节点上启动JobManager进程,然后ssh到Worker节点上启动各个TaskManager进程。
当然可选地,也可以单独启动JobManager和TaskManager,在Master节点上启动JobManager:
1 | bin /jobmanager .sh start |
在各个Worker节点上启动TaskManager:
1 | bin /taskmanager .sh start |
启动完成后,可以查看对应的日志,验证Flink集群是否启动成功:
1 2 3 | tail -100f log /flink-hadoop-jobmanager-0-ali-bj01-tst-cluster-001 .xiweiai.cn.log tail -100f log /flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-002 .xiweiai.cn.log tail -100f log /flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-003 .xiweiai.cn.log |
或者,查看Flink Web Dashboard UI来确认,通过浏览器打开链接:http://ali-bj01-tst-cluster-001.xiweiai.cn:8081/,如下图所示:
可见,TaskManager都已经注册到JobManager,整个Flink集群具有了计算资源(Task Slot),集群启动成功。
创建Flink项目
通过Flink官方文档,可以看到如何创建quickstart项目,假设我们使用Scala来编写Flink程序,默认使用sbt来管理依赖和程序构建,可以执行如下命令来创建:
1 | bash <(curl https: //flink .apache.org /q/sbt-quickstart .sh) |
可以根据向导提示,一步一步进行(示例是我们创建的flink-demo项目):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | ➜ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh) % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 11354 100 11354 0 0 5393 0 0:00:02 0:00:02 --:--:-- 5474 This script creates a Flink project using Scala and SBT. Project name (Flink Project): flink-demo Organization (org.example): org.shirdrn.flink Version (0.1-SNAPSHOT): 0.0.1-SNAPSHOT Scala version (2.11.7): 2.11.7 Flink version (1.3.2): 1.4.0 ----------------------------------------------- Project Name: flink-demo Organization: org.shirdrn.flink Version: 0.0.1-SNAPSHOT Scala version: 2.11.7 Flink version: 1.4.0 ----------------------------------------------- Create Project? (Y/n): Y Creating Flink project under flink-demo |
基于该Demo项目,就可以开发我们自己的Flink程序了。
下面,我们分别选择使用Flink提供的API来开发批量数据处理程序、流式数据处理程序、基于窗口(Windowing)的数据处理,来实现通过Flink API进行编程并提交运行。
批量数据处理
我们基于Flink发行包自带的example代码,修改WordCount程序为HdfsWordCount,支持从HDFS读取文件,然后将计算结果写入到HDFS文件,非常简单,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | import org.apache.flink.api.scala. _ object HdfsWordCount { def main(args : Array[String]) { if (args.length ! = 2 ) { println( "Parameter error!" ) println( "Usage: org.shirdrn.flink.HdfsWordCount <inputFile> <outputFile>" ) System.exit(- 1 ) } val inputFile = args( 0 ) val outputFile = args( 1 ) // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // get input data val text = env.readTextFile(inputFile) val counts = text.flatMap { _ .toLowerCase.split( "\\W+" ) } .map { ( _ , 1 ) } .groupBy( 0 ) .sum( 1 ) // execute and save result counts .map(t = > Seq(t. _ 1 , t. _ 2 ).mkString( "\t" )) .writeAsText(outputFile) val jobName = getClass.getSimpleName env.execute(jobName) } } |
上面代码中,env.readTextFile(inputFile)返回了DataSet[String],表示Flink中对String类型数据元素的数据集的抽象;写入HDFS,调用DataSet的writeAsText方法,传递一个HDFS路径即可。
构建打包程序,执行如下命令:
1 | sbt clean assembly |
可以看到,生成了文件target/scala-2.11/flink-demo-assembly-0.0.1-SNAPSHOT.jar,可以将其提交到Flink集群上运行。有两种方式,一种方式是,通过Flink Web Dashboard,在页面上上传上述jar文件,然后配置输入的参数值,不做过多说明;另一种方式是,通过命令行的方式提交,这种方式比较方便,对应的提交运行命令,如下所示:
1 | bin /flink run --class org.shirdrn.flink.HdfsWordCount flink-demo-assembly-0.0.1-SNAPSHOT.jar hdfs: //ali-bj01-tst-cluster-001 .xiweiai.cn:8020 /data/beam/input/Notre-Dame-de-Paris .txt hdfs: //ali-bj01-tst-cluster-001 .xiweiai.cn:8020 /user/hadoop/count_result .txt |
输入和输出文件,都要加上HDFS前缀: hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020,指明HDFS集群的地址。
提交运行后,可以在Flink Web Dashboard看到对应的Job运行情况,可以通过命令查看输出结果文件的内容:
1 | hdfs dfs - cat /user/hadoop/count_result .txt |
流式数据处理
示例中,我们打算从一个具有3节点的Kakfa集群的topic中读取数据,数据来自网站用户浏览的Apache Server的日志,我们需要将日志行中有用的信息抽取出来,然后再写入到一个新的Kakfa topic中,供下游的其它程序处理,比如可以使用的场景有:实时推荐、实时监控,等等。
首先,创建2个Kafka topic,执行如下命令行:
1 2 | kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic raw_apache_log_event kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic etld_user_behavior_event |
raw_apache_log_event存储原始日志记录,etld_user_behavior_event存储经过抽取、过滤后的记录。
然后,基于上面创建的flink-demo项目,在build.sbt中添加Flink Streaming及其Kakfa connector依赖,如下所示:
1 2 3 4 | val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided" , "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided" , "org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion) |
接着,可以继续开发代码,实现我们计划的逻辑,代码如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | package org.shirdrn.flink import java.util.regex.Pattern import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala. _ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer 09 , FlinkKafkaProducer 09 } /** * Read raw events from a Kafka topic, and then write ETLed events to another Kafka topic. */ object KafkaStreamingETL { private val LINE _ PATTERN = Pattern.compile( "([^\\s]+)\\s\\-\\s\\-\\s\\[([^\\]]+)\\]\\s\"([^\"]+)\"\\s([^\\s]+)\\s\\-\\s\"([^\"]+).*(\\d+)$" ); private def checkParams(params : ParameterTool) = { if (params.getNumberOfParameters < 5 ) { println( "Missing parameters!\n" + "Usage: Kafka " + "--raw-topic <raw_topic> " + "--etled-topic <etled_topic> " + "--bootstrap.servers <kafka brokers> " + "--zookeeper.connect <zk quorum> " + "--group.id <group id>" ) System.exit(- 1 ) } } def main(args : Array[String]) : Unit = { // parse & check input arguments val params = ParameterTool.fromArgs(args) checkParams(params) val env = StreamExecutionEnvironment.getExecutionEnvironment configureEnv(params, env) // create a Kafka streaming source consumer for Kafka 0.9.x val kafkaConsumer = new FlinkKafkaConsumer 09 ( params.getRequired( "raw-topic" ), new SimpleStringSchema, params.getProperties ) // process streaming events val messageStream = env .addSource(kafkaConsumer) .map(parseEvent( _ )) .filter(! _ .isEmpty) // create a Kafka producer for Kafka 0.9.x val kafkaProducer = new FlinkKafkaProducer 09 ( params.getRequired( "etled-topic" ), new SimpleStringSchema, params.getProperties ) // write data into Kafka messageStream.addSink(kafkaProducer) env.execute( "KafkaStreamingETL" ) } private def configureEnv(params : ParameterTool, env : StreamExecutionEnvironment) = { env.getConfig.disableSysoutLogging val restartAttempts = 3 val delayBetweenAttemptsMills = 30000 env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( restartAttempts, delayBetweenAttemptsMills)) // create a checkpoint every 5 seconds val checkpointIntervalMillis = 1000 env.enableCheckpointing(checkpointIntervalMillis) // make parameters available in the web interface env.getConfig.setGlobalJobParameters(params) } // processed event line example: // 123.150.182.192 28/Dec/2017:01:05:21 /wp-content/plugins/akismet/_inc/form.js?ver=3.0.1 304 http://myhost.com/archives/30025.html 1 def parseEvent(msg : String) : String = { val m = LINE _ PATTERN.matcher(msg) if (m.find()) { val ip = m.group( 1 ) val dt = m.group( 2 ) val resource = m.group( 3 ) val responseCode = m.group( 4 ) val pageUrl = m.group( 5 ) val timeTaken = m.group( 6 ) Seq( ip, dt.split( "\\s+" )( 0 ), resource.split( "\\s+" )( 1 ), responseCode, pageUrl, timeTaken ).mkString( "\t" ) } else { "" } } } |
上面代码中,env.addSource(kafkaConsumer)返回了DataStream[T],后续map和filter操作都作用在该DataStream[T]上,DataStream[T]是Flink对流式数据集的抽象表示。
构建打包后,提交到Flink集群上运行,执行如下命令:
1 2 3 4 5 6 | bin /flink run --parallelism 3 --class org.shirdrn.flink.KafkaStreamingETL flink-demo-assembly-0.0.1-SNAPSHOT.jar \ --raw-topic raw_apache_log_event \ --etled-topic etld_user_behavior_event \ --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \ --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \ --group. id G_XW_WLD_ETL |
我们可以在Flink Web Dashboard上查看提交的KafkaStreamingETL的运行状况,如下图所示:
也可以通过Kakfa自带的topic消费脚本工具,确认Streaming程序处理结果:
1 | kafka-console-consumer --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --topic raw_apache_log_event --from-beginning |
基于窗口操作(Windowing)的数据处理
我们基于前面经过ETL解析处理的日志事件记录,进行实时Windowing操作。从Kafka中读取数据,然后经过对IP进行Windowing和统计,结果输出到另一个新的Kakfa topic中,实现代码,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | object WindowedUserBrowseringAnalytics { def checkParams(params : ParameterTool) = { if (params.getNumberOfParameters < 6 ) { println( "Missing parameters!\n" + "Usage: Kafka " + "--etled-topic <etled_topic> " + "--windowed-topic <windowed_topic> " + "--bootstrap.servers <kafka brokers> " + "--zookeeper.connect <zk quorum> " + "--group.id <group id> " + "--window-size <window_size>" ) System.exit(- 1 ) } } def main(args : Array[String]) : Unit = { // parse & check input arguments val params = ParameterTool.fromArgs(args) checkParams(params) val env = StreamExecutionEnvironment.getExecutionEnvironment configureEnv(params, env) val windowSize = params.getRequired( "window-size" ).toLong val kafkaConsumer = new FlinkKafkaConsumer 09 ( params.getRequired( "etled-topic" ), new SimpleStringSchema, params.getProperties ) val kafkaProducer = new FlinkKafkaProducer 09 ( params.getRequired( "windowed-topic" ), new SimpleStringSchema, params.getProperties ) val stream : DataStream[String] = env.addSource(kafkaConsumer) stream .map(event = > (event.split( "\t" )( 0 ), 1 L)) .keyBy( 0 ) .window(TumblingEventTimeWindows.of(Time.seconds(windowSize))) .reduce((event 1 , event 2 ) = > (event 1 . _ 1 , event 1 . _ 2 + event 2 . _ 2 )) .map(r = > { val df = new SimpleDateFormat( "yyyy-MM-dd:HH:mm" ) Seq(df.format( new Date()), r. _ 1 , r. _ 2 ).mkString( "\t" ) }) .addSink(kafkaProducer) env.execute(getClass.getSimpleName) } } |
构建打包,提交到Flink集群运行:
1 2 3 4 5 6 7 | bin /flink run --parallelism 2 --class org.shirdrn.flink.WindowedUserBrowseringAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \ --etled-topic etld_user_behavior_event \ --windowed-topic windowed_analytics_result \ --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \ --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \ --window-size 60 \ --group. id G_XW_MON_ALTS |
可以通过前面类似的方式,查看Job运行状况和结果。
参考链接
- http://www.apache.org/dyn/closer.lua/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz
- https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
博主好!我这两天也在尝试搭建Flink环境,根据官网教程(http://doc.flink-china.org/latest/quickstart/setup_quickstart.html#setup-download-and-start-flink)走的,用examples/streaming/SocketWindowWordCount.jar进行测试时,查看taskmanager的.out文件中有word的临时统计结果,查看jobmanager的.out文件中没有任何输出。。。菜鸟求教博主答疑解惑。。。
请问一下flink 的集群模式中一定要用到hdfs 或者nfs 这种嚒?不能都不依赖么?
可以不依赖HDFS, 单建一个Standalone模式的Flink集群,但是要保证你选择的文件系统,必须是Flink支持的。
你好,想问下RunningJobs里的jobName可以设置吗?