Apache Flink 1.4.0:Standalone集群模式实践

Apache Flink是一个开源的流处理框架,提供了分布式的、高性能的、高可用的特性,同时能够为流式应用程序提供多种编程语言的API,更多有关Flink的内容,不再累述,请参考官方文档,本文主要以Flink Standalone集群安装配置、编程实践为主。

Flink集群安装配置

首先,选择一主二从共3个节点来安装配置Flink Standalone集群:

Master:ali-bj01-tst-cluster-001.xiweiai.cn
Worker:ali-bj01-tst-cluster-002.xiweiai.cn
Worker:ali-bj01-tst-cluster-003.xiweiai.cn

为了方便安装文件远程拷贝,单独创建一个hadoop用户,并打通从Master节点使用ssh到Worker节点之间的免密码登录。
在Master节点上下载、准备Flink 1.4.0安装文件,执行如下命令:

wget http://mirror.bit.edu.cn/apache/flink/flink-1.4.0/flink-1.4.0-bin-hadoop26-scala_2.11.tgz
tar xvzf flink-1.4.0-bin-hadoop26-scala_2.11.tgz

接着,修改Flink配置文件flink-1.4.0/conf/flink-conf.yaml,修改后的内容如下所示:

env.java.home: /usr/local/java/jdk1.8.0_131/
jobmanager.rpc.address: ali-bj01-tst-cluster-001.xiweiai.cn
# The heap size for the JobManager JVM
jobmanager.heap.mb: 2048
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048

通过设置env.java.home属性,指定JAVA_HOME环境变量的值;jobmanager.rpc.address表示Flink集群的JobManager的RPC地址;设置jobmanager.heap.mb和taskmanager.heap.mb分别指定JobManager和TaskManager的堆内存大小,这个需要根据实际节点的资源情况进行配置,这里每个Worker节点都配置为2G大小。
然后,修改conf/slaves文件,增加Worker节点的主机名或IP列表,内容如下所示:

ali-bj01-tst-cluster-002.xiweiai.cn
ali-bj01-tst-cluster-003.xiweiai.cn

最后,将配置完成的Flink安装文件,远程拷贝到各个Worker节点:

scp -r /mnt/bd/data/flink/flink-1.4.0 ali-bj01-tst-cluster-002.xiweiai.cn:/mnt/bd/data/flink/
scp -r /mnt/bd/data/flink/flink-1.4.0 ali-bj01-tst-cluster-003.xiweiai.cn:/mnt/bd/data/flink/

启动集群

启动Flink集群,执行如下命令:

bin/start-cluster.sh

它会在Master节点上启动JobManager进程,然后ssh到Worker节点上启动各个TaskManager进程。
当然可选地,也可以单独启动JobManager和TaskManager,在Master节点上启动JobManager:

bin/jobmanager.sh start

在各个Worker节点上启动TaskManager:

bin/taskmanager.sh start

启动完成后,可以查看对应的日志,验证Flink集群是否启动成功:

tail -100f log/flink-hadoop-jobmanager-0-ali-bj01-tst-cluster-001.xiweiai.cn.log
tail -100f log/flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-002.xiweiai.cn.log 
tail -100f log/flink-hadoop-taskmanager-0-ali-bj01-tst-cluster-003.xiweiai.cn.log 

或者,查看Flink Web Dashboard UI来确认,通过浏览器打开链接:http://ali-bj01-tst-cluster-001.xiweiai.cn:8081/,如下图所示:
Flink-Web-Dashboard
可见,TaskManager都已经注册到JobManager,整个Flink集群具有了计算资源(Task Slot),集群启动成功。

创建Flink项目

通过Flink官方文档,可以看到如何创建quickstart项目,假设我们使用Scala来编写Flink程序,默认使用sbt来管理依赖和程序构建,可以执行如下命令来创建:

bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)

可以根据向导提示,一步一步进行(示例是我们创建的flink-demo项目):

➜  bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 11354  100 11354    0     0   5393      0  0:00:02  0:00:02 --:--:--  5474
This script creates a Flink project using Scala and SBT.

Project name (Flink Project): flink-demo
Organization (org.example): org.shirdrn.flink
Version (0.1-SNAPSHOT): 0.0.1-SNAPSHOT
Scala version (2.11.7): 2.11.7
Flink version (1.3.2): 1.4.0

-----------------------------------------------
Project Name: flink-demo
Organization: org.shirdrn.flink
Version: 0.0.1-SNAPSHOT
Scala version: 2.11.7
Flink version: 1.4.0
-----------------------------------------------
Create Project? (Y/n): Y
Creating Flink project under flink-demo

基于该Demo项目,就可以开发我们自己的Flink程序了。
下面,我们分别选择使用Flink提供的API来开发批量数据处理程序、流式数据处理程序、基于窗口(Windowing)的数据处理,来实现通过Flink API进行编程并提交运行。

批量数据处理

我们基于Flink发行包自带的example代码,修改WordCount程序为HdfsWordCount,支持从HDFS读取文件,然后将计算结果写入到HDFS文件,非常简单,代码如下所示:

import org.apache.flink.api.scala._

object HdfsWordCount {
  def main(args: Array[String]) {

    if(args.length != 2) {
      println("Parameter error!")
      println("Usage: org.shirdrn.flink.HdfsWordCount <inputFile> <outputFile>")
      System.exit(-1)
    }

    val inputFile = args(0)
    val outputFile = args(1)

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.readTextFile(inputFile)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // execute and save result
    counts
      .map(t => Seq(t._1, t._2).mkString("\t"))
      .writeAsText(outputFile)

    val jobName = getClass.getSimpleName
    env.execute(jobName)
  }
}

上面代码中,env.readTextFile(inputFile)返回了DataSet[String],表示Flink中对String类型数据元素的数据集的抽象;写入HDFS,调用DataSet的writeAsText方法,传递一个HDFS路径即可。
构建打包程序,执行如下命令:

sbt clean assembly

可以看到,生成了文件target/scala-2.11/flink-demo-assembly-0.0.1-SNAPSHOT.jar,可以将其提交到Flink集群上运行。有两种方式,一种方式是,通过Flink Web Dashboard,在页面上上传上述jar文件,然后配置输入的参数值,不做过多说明;另一种方式是,通过命令行的方式提交,这种方式比较方便,对应的提交运行命令,如下所示:

bin/flink run --class org.shirdrn.flink.HdfsWordCount flink-demo-assembly-0.0.1-SNAPSHOT.jar hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020/data/beam/input/Notre-Dame-de-Paris.txt hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020/user/hadoop/count_result.txt

输入和输出文件,都要加上HDFS前缀: hdfs://ali-bj01-tst-cluster-001.xiweiai.cn:8020,指明HDFS集群的地址。
提交运行后,可以在Flink Web Dashboard看到对应的Job运行情况,可以通过命令查看输出结果文件的内容:

hdfs dfs -cat /user/hadoop/count_result.txt

流式数据处理

示例中,我们打算从一个具有3节点的Kakfa集群的topic中读取数据,数据来自网站用户浏览的Apache Server的日志,我们需要将日志行中有用的信息抽取出来,然后再写入到一个新的Kakfa topic中,供下游的其它程序处理,比如可以使用的场景有:实时推荐、实时监控,等等。
首先,创建2个Kafka topic,执行如下命令行:

kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic raw_apache_log_event
kafka-topics --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --partitions 3 --replication-factor 2 --create --topic etld_user_behavior_event

raw_apache_log_event存储原始日志记录,etld_user_behavior_event存储经过抽取、过滤后的记录。
然后,基于上面创建的flink-demo项目,在build.sbt中添加Flink Streaming及其Kakfa connector依赖,如下所示:

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.9" % flinkVersion)

接着,可以继续开发代码,实现我们计划的逻辑,代码如下所示:

package org.shirdrn.flink

import java.util.regex.Pattern
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}

/**
 * Read raw events from a Kafka topic, and then write ETLed events to another Kafka topic.
 */
object KafkaStreamingETL {

  private val LINE_PATTERN = Pattern.compile(
    "([^\\s]+)\\s\\-\\s\\-\\s\\[([^\\]]+)\\]\\s\"([^\"]+)\"\\s([^\\s]+)\\s\\-\\s\"([^\"]+).*(\\d+)$"
  );

  private def checkParams(params: ParameterTool) = {
    if (params.getNumberOfParameters < 5) {
      println("Missing parameters!\n"
        + "Usage: Kafka "
        + "--raw-topic <raw_topic> "
        + "--etled-topic <etled_topic> "
        + "--bootstrap.servers <kafka brokers> "
        + "--zookeeper.connect <zk quorum> "
        + "--group.id <group id>")
      System.exit(-1)
    }
  }

  def main(args: Array[String]): Unit = {
    // parse & check input arguments
    val params = ParameterTool.fromArgs(args)
    checkParams(params)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    configureEnv(params, env)

    // create a Kafka streaming source consumer for Kafka 0.9.x
    val kafkaConsumer = new FlinkKafkaConsumer09(
      params.getRequired("raw-topic"),
      new SimpleStringSchema, params.getProperties
    )

    // process streaming events
    val messageStream = env
      .addSource(kafkaConsumer)
      .map(parseEvent(_))
      .filter(!_.isEmpty)

    // create a Kafka producer for Kafka 0.9.x
    val kafkaProducer = new FlinkKafkaProducer09(
      params.getRequired("etled-topic"),
      new SimpleStringSchema, params.getProperties
    )

    // write data into Kafka
    messageStream.addSink(kafkaProducer)

    env.execute("KafkaStreamingETL")
  }

  private def configureEnv(params: ParameterTool, env: StreamExecutionEnvironment) = {
    env.getConfig.disableSysoutLogging

    val restartAttempts = 3
    val delayBetweenAttemptsMills = 30000
    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      restartAttempts, delayBetweenAttemptsMills))

    // create a checkpoint every 5 seconds
    val checkpointIntervalMillis = 1000
    env.enableCheckpointing(checkpointIntervalMillis)
    // make parameters available in the web interface
    env.getConfig.setGlobalJobParameters(params)
  }

  // processed event line example:
  // 123.150.182.192    28/Dec/2017:01:05:21    /wp-content/plugins/akismet/_inc/form.js?ver=3.0.1    304    http://myhost.com/archives/30025.html    1
  def parseEvent(msg: String): String = {
    val m = LINE_PATTERN.matcher(msg)
    if(m.find()) {
      val ip = m.group(1)
      val dt = m.group(2)
      val resource = m.group(3)
      val responseCode = m.group(4)
      val pageUrl = m.group(5)
      val timeTaken = m.group(6)
      Seq(
        ip, dt.split("\\s+")(0),
        resource.split("\\s+")(1),
        responseCode, pageUrl, timeTaken
      ).mkString("\t")
    } else {
      ""
    }
  }

}

上面代码中,env.addSource(kafkaConsumer)返回了DataStream[T],后续map和filter操作都作用在该DataStream[T]上,DataStream[T]是Flink对流式数据集的抽象表示。
构建打包后,提交到Flink集群上运行,执行如下命令:

bin/flink run --parallelism 3 --class org.shirdrn.flink.KafkaStreamingETL flink-demo-assembly-0.0.1-SNAPSHOT.jar \
  --raw-topic raw_apache_log_event \
  --etled-topic etld_user_behavior_event \
  --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
  --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
  --group.id G_XW_WLD_ETL

我们可以在Flink Web Dashboard上查看提交的KafkaStreamingETL的运行状况,如下图所示:
Flink-KafkaStreamingETL
也可以通过Kakfa自带的topic消费脚本工具,确认Streaming程序处理结果:

kafka-console-consumer --zookeeper 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 --topic raw_apache_log_event --from-beginning

基于窗口操作(Windowing)的数据处理

我们基于前面经过ETL解析处理的日志事件记录,进行实时Windowing操作。从Kafka中读取数据,然后经过对IP进行Windowing和统计,结果输出到另一个新的Kakfa topic中,实现代码,如下所示:

object WindowedUserBrowseringAnalytics {

  def checkParams(params: ParameterTool) = {
    if (params.getNumberOfParameters < 6) {
      println("Missing parameters!\n"
        + "Usage: Kafka "
        + "--etled-topic <etled_topic> "
        + "--windowed-topic <windowed_topic> "
        + "--bootstrap.servers <kafka brokers> "
        + "--zookeeper.connect <zk quorum> "
        + "--group.id <group id> "
        + "--window-size <window_size>")
      System.exit(-1)
    }
  }

  def main(args: Array[String]): Unit = {
    // parse & check input arguments
    val params = ParameterTool.fromArgs(args)
    checkParams(params)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    configureEnv(params, env)
    val windowSize = params.getRequired("window-size").toLong

    val kafkaConsumer = new FlinkKafkaConsumer09(
      params.getRequired("etled-topic"),
      new SimpleStringSchema, params.getProperties
    )

    val kafkaProducer = new FlinkKafkaProducer09(
      params.getRequired("windowed-topic"),
      new SimpleStringSchema, params.getProperties
    )

    val stream: DataStream[String] = env.addSource(kafkaConsumer)
    stream
      .map(event => (event.split("\t")(0), 1L))
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
      .reduce((event1, event2) => (event1._1, event1._2 + event2._2))
      .map(r => {
        val df = new SimpleDateFormat("yyyy-MM-dd:HH:mm")
        Seq(df.format(new Date()), r._1, r._2).mkString("\t")
      })
      .addSink(kafkaProducer)

    env.execute(getClass.getSimpleName)
  }
}

构建打包,提交到Flink集群运行:

bin/flink run --parallelism 2 --class org.shirdrn.flink.WindowedUserBrowseringAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
  --etled-topic etld_user_behavior_event \
  --windowed-topic windowed_analytics_result \
  --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
  --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
  --window-size 60 \
  --group.id G_XW_MON_ALTS

可以通过前面类似的方式,查看Job运行状况和结果。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>