作者 康凯森 | StarRocks 核心研发、StarRocks 查询团队负责人 | 2022/04/26 15:46 | https://my.oschina.net/u/5658056/blog/5519656 一条查询 SQL 在关系型分布式数据库中的处理,通常需要经过 3 大步骤: 将 SQL 文本转换成一个 “最佳的” 分布式物理执行计划 将执行计划调度到计算节点 计算节点执行具体的物理执行计划 本文将详细解释在 StarRocks 中如何完成一条查询 SQL 的处理。 首先来了解 StarRocks 中的基本概念: FE: 负责查询解析,查询优化,查询调度和元数据管理 BE: 负责查询执行和数据存储 #01 从 SQL 文本到执行计划 从 SQL 文本到分布式物理执行计划,在 StarRocks 中,需要经过以下 5 个步骤: SQL Parse:将 SQL 文本转换成一个 AST(抽象语法树) SQL Analyze:基于 AST 进行语法和语义分析 SQL Logical Plan:将 AST 转换成逻辑计划 SQL Optimize:基于关系代数、统计信息、Cost 模型,对逻辑计划进行重写、转换,选择出 Cost “最低” 的物理执行计划 生成 Plan Fragment:将 Optimizer 选择的物理执行计划转换为 BE 可以直接执行
按分类浏览文章: 开源技术
StarRocks-2.1.5 集群安装部署
StarRocks 是新一代的 MPP 数据库,它具有很高的查询性能,能够支持各种场景的数据查询分析使用,主要包含如下几个场景: OLAP多维分析 实时数据分析 高并发查询 统一分析 关于 StarRocks 更详细的介绍,可以查看官方文档,非常详细(见后面参考链接)。 下面,我们基于开源的 StarRocks 社区版,版本是 2.1.5 来进行集群的安装配置。StarRocks 集群的部署模式,采用 FE 与 BE 分离的模式。 集群部署规划 1 基础环境和软件 软件 版本 操作系统 CentOS-7.8 StarRocks 2.1.5 JDK jdk1.8.0_212 MySQL Client 5.7.37 2 集群主机规划 IP 地址 主机名称 角色 备注信息 172.168.0.1 VM-0-1-centos FE FE Master 172.168.0.2 VM-0-2-centos FE FE OBSERVER 172.168.0.3 VM-0-3-centos FE FE FOLLOWER 172.168.0.4 VM-0-4-centos BE BE 172.168.0.5 VM-0-5-centos BE BE 172.168.0.6 VM-0-6-centos BE BE 172.168.0.7 VM-0-7-centos BE BE 172.168.0.8 VM-0-8-centos BE BE 172.168.0.9 VM-0-9-centos BE BE 3 集群主机目录规划
Apache Hudi架构设计和基本概念
Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力: Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。 Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。 Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。 Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。 Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。 Hudi通过Savepoint来实现数据恢复。 目前,Hudi支持Spark 2.x版本,建议使用2.4.4+版本的Spark。 基本架构 与Kudu相比,Kudu是一个支持OLTP workload的数据存储系统,而Hudi的设计目标是基于Hadoop兼容的文件系统(如HDFS、S3等),重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力,Hudi支持Incremental Pulling而Kudu不支持。 Hudi能够整合Batch和Streaming处理的能力,这是通过利用
Flink批处理生成最佳执行计划(上篇)
生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示: 通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。 基本数据
Flink批处理生成OptimizerNode DAG图
OptimizerNode DAG,是基于程序Plan中创建好的Operator DAG,并以OptimizerNode作为DAG图节点构建完成的。所以,我们先看一下组成OptimizerNode DAG,都有哪些类型的OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode等这几大类,根据各个OptimizerNode具体实现类名称,可以对应到具体操作功能的Operator实现类。比如,MapNode对应map操作,JoinNode对应join操作,等等。 OptimizerNode结构 下面看一下OptimizerNode的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个OptimizerNode都包含的内容,具体到每个OptimizerNode实现,都会根据Operator的属性及行为,补充一些其他的信息。上面的cachedPlans是一个基于PlanNode的DAG的列表,在生成OptimizerNode过程中并没有用到,而是基于OptimizerNode DAG生成最佳执行计划的时候,通过PlanNode DAG来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对OptimizerNode节点结构包含的一些重要属性信息进行说明,如
Flink批处理生成执行计划
我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程Operator DAG 把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。 在上面的处理过程中,涉及到3个比较重要的内容:一是用
Flink批处理生成JobGraph流程分析
我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);
Flink编程API设计分析
使用Flink开发批式或流式Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是Flink提供的基本框架,是如何在API层面进行统一处理的,或者说尽量使API统一,这样有助于我们对Flink框架更深入地理解。目前使用Flink 1.10版本开发批式和流式Job,在API层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的Core APIs层,结合批式和流式Job开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的JobGraph,以及生成的过程有什么不同。 编程API设计 Flink编程API中,我们开发的数据处理Job程序,都是始于一个Source,对应的输入数据源,终于一个Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程API对批式Job DAG中每个节点的抽象使用的是DataSet,而流式编程API中对应的是DataStream。 对于批式Job DAG中,Data
使用Flink实现索引数据到Elasticsearch
使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的Sink Operator实现。 Flink批式处理模式,运行Flink Batch Job时作用在有界的输入数据集上,所以Job运行的时间是有时限的,一旦Job运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个Hive SQL查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的OutputFormat,然后基于该OutputFormat来构建一个Sink,下面看下OutputFormat接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IOException; void wri
Flink使用Broadcast State实现流处理配置实时更新
Broadcast State是Flink支持的一种Operator State。使用Broadcast State,可以在Flink程序的一个Stream中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个Task中,使得这些数据记录能够为所有的Task所共享,比如一些用于配置的数据记录。这样,每个Task在处理其所对应的Stream中记录的时候,读取这些配置,来满足实际数据处理需要。 另外,在一定程度上,Broadcast State能够使得Flink Job在运行过程中与外部的其他系统解耦合。比如,通常Flink会使用YARN来管理计算资源,使用Broadcast State就可以不用直接连接MySQL数据库读取相关配置信息了,也无需对MySQL做额外的授权操作。因为在一些场景下,会使用Flink on YARN部署模式,将Flink Job运行的资源申请和释放交给YARN去管理,那么就存在Hadoop集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如MySQL等进行连接操作授权,如果忘记对MysQL访问授权,Flink Job被调度到新增的某个新增节点上连接并读取MySQL配置信息就会出错。 Broadcast State API 通常,我们首先会创建一个Keyed或
Flink Checkpoint、Savepoint配置与实践
Flink Checkpoint Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator的状态来生成Snapshot,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些Snapshot进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下Flink Checkpoint机制,如官网下图所示: Checkpoint指定触发生成时间间隔后,每当需要触发Checkpoint时,会向Flink程序运行时的多个分布式的Stream Source中插入一个Barrier标记,这些Barrier会根据Stream中的数据记录一起流向下游的各个Operator。当一个Operator接收到一个Barrier时,它会暂停处理Steam中新接收到的数据记录。因为一个Operator可能存在多个输入的Stream,而每个Stream中都会存在对应的Barrier,该Operator要等到所有的输入Stream中的Barrier都到达。当所有Stream中的Barrier都已经到达该Operator,这时所有的Barrier在时间上看来是同一个时刻点(表示已经对齐),在等待所有Barrier到达的过程中,Operator的Buffer中可
Azkaban集群内部调度原理分析
Azkaban是一个非常简单实用,而且开源的作业调度系统。在2.x版本中不支持集群模式部署,在3.x版本中支持集群模式部署,适用于作业量比较大一些的应用场景。有关Azkaban更多详细信息,如特点、功能、特性、作业定义等,可以参考官方文档,这里不再详述。 Azkaban集群架构 下面我们看一下Azkaban集群模式的架构,如下图所示: 从上图可见,Azkaban集群部署模式,主要有3个核心的组件: Azkaban WebServer Azkaban WebServer,是整个调度集群的核心,负责所有作业的管理和调度。 Azkaban ExecutorServer Azkaban ExecutorServer,整个调度集群中实际运行作业的节点,该类节点可能是作为一个作业提交的客户端,比如Spark on YARN部署模式下,cluster运行模式时只作为客户端使用,client运行模式时会有部分计算逻辑;比如普通的Java程序需要处理量级较小的数据作业,这时Executor Server节点可能有较大的工作负载,占用较多节点资源(内存、CPU)。 DB DB,是集群中所有节点运行共用的数据存储,包含作业信息、各种调度元数据等等。 核心调度概述 Azkaban WebServ