使用 Flink 处理数据时,可以基于 Flink 提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API 来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为 Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为 Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的 Sink Operator 实现。
Flink 批式处理模式,运行 Flink Batch Job 时作用在有界的输入数据集上,所以 Job 运行的时间是有时限的,一旦 Job 运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个 Hive SQL 查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的 OutputFormat,然后基于该 OutputFormat 来构建一个 Sink,下面看下 OutputFormat 接口的定义,如下所示:
@Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IOException; void writeRecord(IT record) throws IOException; void close() throws IOException; }
上面,configure() 方法用来配置一个 OutputFormat 的一些输出参数;open() 方法用来实现与外部存储系统建立连接;writeRecord() 方法用来实现对 Flink Batch Job 处理后,将数据记录输出到外部存储系统。开发 Batch Job 时,通过调用 DataSet的output() 方法,参数值使用一个 OutputFormat 的具体实现即可。后面,我们会基于 Elasticsearch 来实现上面接口中的各个方法。
Flink 流式处理模式,运行 Flink Streaming Job 时一般输入的数据集为流数据集,也就是说输入数据元素会持续不断地进入到 Streaming Job 的处理过程中,但你仍然可以使用一个HDFS数据文件作为 Streaming Job 的输入,即使这样,一个 Flink Streaming Job 启动运行后便会永远运行下去,除非有意外故障或有计划地操作使其终止。在流式处理模式下处理数据的输出时,我们需要是实现一个 SinkFunction,它指定了如下将流数据处理后的结果,输出到指定的外部存储系统中,下面看下 SinkFunction 的接口定义,如下所示:
@Public public interface SinkFunction<IN> extends Function, Serializable { @Deprecated default void invoke(IN value) throws Exception {} default void invoke(IN value, Context context) throws Exception { invoke(value); } @Public interface Context<T> { long currentProcessingTime(); long currentWatermark(); Long timestamp(); } }
通过上面接口可以看到,需要实现一个 invoke() 方法,实现该方法来将一个输入的 IN value 输出到外部存储系统中。一般情况下,对一些主流的外部存储系统,Flink 实现了一下内置(社区贡献)的 SinkFunction,我们只需要配置一下就可以直接使用。而且,对于 Streaming Job 来说,实现的 SinkFunction 比较丰富一些,可以减少自己开发的工作量。开发 Streaming Job 时,通过调用 DataStream的addSink() 方法,参数是一个 SinkFlink 的具体实现。
下面,我们分别基于批式处理模式和批式处理模式,分别使用或实现对应组件将 Streaming Job 和 Batch Job 的处理结果输出到 Elasticsearch 中:
基于 Flink DataSteam API 实现
在开发基于 Flink 的应用程序过程中,发现 Flink Streaming API 对 Elasticsearch 的支持还是比较好的,比如,如果想要从 Kafka 消费事件记录,经过处理最终将数据记录索引到 Elasticsearch 5.x,可以直接在 Maven 的 POM 文件中添加如下依赖即可:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>1.5.3</version> </dependency>
我们使用 Flink Streaming API 来实现将流式数据处理后,写入到 Elasticsearch 中。其中,输入数据源是 Kafka 中的某个 Topic;输出处理结果到 Elasticsearch 中,我们使用使用 Transport API 的方式来连接 Elasticsearch,需要指定 Transport 地址和端口。具体实现,对应的 Scala 代码,如下所示:
def main(args: Array[String]): Unit = { // parse input arguments val params = ParameterTool.fromArgs(args) if (params.getNumberOfParameters < 9) { val cmd = getClass.getName println("Missing parameters!\n" + "Usage: " + cmd + " --input-topic <topic> " + "--es-cluster-name <es cluster name> " + "--es-transport-addresses <es address> " + "--es-port <es port> " + "--es-index <es index> " + "--es-type <es type> " + "--bootstrap.servers <kafka brokers> " + "--zookeeper.connect <zk quorum> " + "--group.id <some id> [--prefix <prefix>]") return } val env = StreamExecutionEnvironment.getExecutionEnvironment val kafkaConsumer = new FlinkKafkaConsumer010[String]( params.getRequired("input-topic"), new SimpleStringSchema(), params.getProperties ) val dataStream = env .addSource(kafkaConsumer) .filter(!_.isEmpty) val esClusterName = params.getRequired("es-cluster-name") val esAddresses = params.getRequired("es-transport-addresses") val esPort = params.getInt("es-port", 9300) val transportAddresses = new java.util.ArrayList[InetSocketAddress] val config = new java.util.HashMap[String, String] config.put("cluster.name", esClusterName) // This instructs the sink to emit after every element, otherwise they would be buffered config.put("bulk.flush.max.actions", "100") esAddresses.split(",").foreach(address => { transportAddresses.add(new InetSocketAddress(InetAddress.getByName(address), esPort)) }) val esIndex = params.getRequired("es-index") val esType = params.getRequired("es-type") val sink = new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(element: String): IndexRequest = { return Requests.indexRequest() .index(esIndex) .`type`(esType) .source(element) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { requestIndexer.add(createIndexRequest(t)) } }) dataStream.addSink(sink) val jobName = getClass.getSimpleName env.execute(jobName) }
上面有关数据索引到 Elasticsearch 的处理中, 最核心的就是创建一个 ElasticsearchSink,然后通过 DataStream 的 API 调用 addSink() 添加一个 Sink,实际是一个 SinkFunction 的实现,可以参考 Flink对应DataStream 类的 addSink() 方法代码,如下所示:
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] = stream.addSink(sinkFunction)
基于 Flink DataSet API 实现
目前,Flink 还没有在 Batch 处理模式下实现对应 Elasticsearch 对应的 Connector,需要自己根据需要实现,所以我们基于 Flink 已经存在的 Streaming 处理模式下已经实现的 Elasticsearch Connector 对应的代码,经过部分修改,可以直接拿来在 Batch 处理模式下,将数据记录批量索引到 Elasticsearch 中。
我们基于 Flink 1.6.1 版本,以及 Elasticsearch 6.3.2 版本,并且使用 Elasticsearch 推荐的 High Level REST API 来实现(为了复用 Flink 1.6.1 中对应的 Streaming 处理模式下的 Elasticsearch 6 Connector 实现代码,我们选择使用该 REST Client),需要在 Maven 的 POM 文件中添加如下依赖:
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> </dependency>
我们实现的各个类的类图及其关系,如下图所示:
如果熟悉 Flink Streaming 处理模式下 Elasticsearch 对应的 Connector 实现,可以看到上面的很多类都在 org.apache.flink.streaming.connectors.elasticsearch 包里面存在,其中包括批量向 Elasticsearch 中索引数据(内部实现了使用 BulkProcessor)。上图中引入的 ElasticsearchApiCallBridge,目的是能够实现对 Elasticsearch 不同版本的支持,只需要根据 Elasticsearch 不同版本中不同 Client 实现,进行一些适配,上层抽象保持不变。
如果需要在 Batch 处理模式下批量索引数据到 Elasticsearch,可以直接使用 ElasticsearchOutputFormat 即可实现。但是创建 ElasticsearchOutputFormat,需要几个参数:
private ElasticsearchOutputFormat( Map<String, String> bulkRequestsConfig, List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction, DocWriteRequestFailureHandler failureHandler, RestClientFactory restClientFactory) { super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler); }
当然,我们可以通过代码中提供的 Builder 来非常方便的创建一个 ElasticsearchOutputFormat。下面,我们看下我们 Flink Batch Job 实现逻辑。
- 实现 ElasticsearchSinkFunction
我们需要实现 ElasticsearchSinkFunction 接口,实现一个能够索引数据到 Elasticsearch 中的功能,代码如下所示:
final ElasticsearchSinkFunction<String> elasticsearchSinkFunction = new ElasticsearchSinkFunction<String>() { @Override public void process(String element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element, parameterTool)); } private IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { LOG.info("Create index req: " + element); JSONObject o = JSONObject.parseObject(element); return Requests.indexRequest() .index(parameterTool.getRequired("es-index")) .type(parameterTool.getRequired("es-type")) .source(o); } };
上面代码,主要是把一个将要输出的数据记录,通过 RequestIndexer 来实现索引到 Elasticsearch 中。
- 读取 Elasticsearch 配置参数
配置连接 Elasticsearch 的参数。从程序输入的 ParameterTool 中读取 Elasticsearch 相关的配置:
String esHttpHosts = parameterTool.getRequired("es-http-hosts"); LOG.info("Config: esHttpHosts=" + esHttpHosts); int esHttpPort = parameterTool.getInt("es-http-port", 9200); LOG.info("Config: esHttpPort=" + esHttpPort); final List<HttpHost> httpHosts = Arrays.asList(esHttpHosts.split(",")) .stream() .map(host -> new HttpHost(host, esHttpPort, "http")) .collect(Collectors.toList()); int bulkFlushMaxSizeMb = parameterTool.getInt("bulk-flush-max-size-mb", 10); int bulkFlushIntervalMillis = parameterTool.getInt("bulk-flush-interval-millis", 10 * 1000); int bulkFlushMaxActions = parameterTool.getInt("bulk-flush-max-actions", 1);
- 创建 ElasticsearchOutputFormat
创建一个我们实现的 ElasticsearchOutputFormat,代码片段如下所示:
final ElasticsearchOutputFormat outputFormat = new Builder<>(httpHosts, elasticsearchSinkFunction) .setBulkFlushBackoff(true) .setBulkFlushBackoffRetries(2) .setBulkFlushBackoffType(ElasticsearchApiCallBridge.FlushBackoffType.EXPONENTIAL) .setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb) .setBulkFlushInterval(bulkFlushIntervalMillis) .setBulkFlushMaxActions(bulkFlushMaxActions) .build();
上面很多配置项指定了向 Elasticsearch 中进行批量写入的行为,在 ElasticsearchOutputFormat 内部会进行设置并创建 Elasticsearch6BulkProcessorIndexer,优化索引数据处理的性能。
- 实现 Batch Job 主控制流程
最后我们就可以构建我们的 Flink Batch 应用程序了,代码如下所示:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.readTextFile(file) .filter(line -> !line.isEmpty()) .map(line -> line) .output(outputFormat); final String jobName = ImportHDFSDataToES.class.getSimpleName(); env.execute(jobName);
我们输入的 HDFS 文件中,是一些已经加工好的 JSON 格式记录行,这里为了简单,直接将原始 JSON 字符串索引到 Elasticsearch 中,而没有进行更多其他的处理操作。
有关 Flink 批式处理模式下,Elasticsearch 对应的 OutputFormat 实现的完整代码,可以参考这里:
https://github.com/shirdrn/flink-app-jobs/tree/master/src/main/java/org/shirdrn/flink/connector/batch/elasticsearch。
参考链接
- https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/elasticsearch.html
- https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/#data-sinks
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。