StarRocks 技术内幕:查询原理浅析

作者 康凯森 | StarRocks 核心研发、StarRocks 查询团队负责人 | 2022/04/26 15:46 | https://my.oschina.net/u/5658056/blog/5519656 一条查询 SQL 在关系型分布式数据库中的处理,通常需要经过 3 大步骤: 将 SQL 文本转换成一个 “最佳的” 分布式物理执行计划 将执行计划调度到计算节点 计算节点执行具体的物理执行计划 本文将详细解释在 StarRocks 中如何完成一条查询 SQL 的处理。 首先来了解 StarRocks 中的基本概念: FE: 负责查询解析,查询优化,查询调度和元数据管理 BE: 负责查询执行和数据存储 #01 从 SQL 文本到执行计划 从 SQL 文本到分布式物理执行计划,在 StarRocks 中,需要经过以下 5 个步骤: SQL Parse:将 SQL 文本转换成一个 AST(抽象语法树) SQL Analyze:基于 AST 进行语法和语义分析 SQL Logical Plan:将 AST 转换成逻辑计划 SQL Optimize:基于关系代数、统计信息、Cost 模型,对逻辑计划进行重写、转换,选择出 Cost “最低” 的物理执行计划 生成 Plan Fragment:将 Optimizer 选择的物理执行计划转换为 BE 可以直接执行

StarRocks-2.1.5 集群安装部署

StarRocks 是新一代的 MPP 数据库,它具有很高的查询性能,能够支持各种场景的数据查询分析使用,主要包含如下几个场景: OLAP多维分析 实时数据分析 高并发查询 统一分析 关于 StarRocks 更详细的介绍,可以查看官方文档,非常详细(见后面参考链接)。 下面,我们基于开源的 StarRocks 社区版,版本是 2.1.5 来进行集群的安装配置。StarRocks 集群的部署模式,采用 FE 与 BE 分离的模式。 集群部署规划 1 基础环境和软件 软件 版本 操作系统 CentOS-7.8 StarRocks 2.1.5 JDK jdk1.8.0_212 MySQL Client 5.7.37 2 集群主机规划 IP 地址 主机名称 角色 备注信息 172.168.0.1 VM-0-1-centos FE FE Master 172.168.0.2 VM-0-2-centos FE FE OBSERVER 172.168.0.3 VM-0-3-centos FE FE FOLLOWER 172.168.0.4 VM-0-4-centos BE BE 172.168.0.5 VM-0-5-centos BE BE 172.168.0.6 VM-0-6-centos BE BE 172.168.0.7 VM-0-7-centos BE BE 172.168.0.8 VM-0-8-centos BE BE 172.168.0.9 VM-0-9-centos BE BE 3 集群主机目录规划

Apache Hudi架构设计和基本概念

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力: Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。 Hudi基于Spark来对HDFS上的数据进行更新、插入、删除等。 Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。 Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。 Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。 Hudi通过Savepoint来实现数据恢复。 目前,Hudi支持Spark 2.x版本,建议使用2.4.4+版本的Spark。 基本架构 与Kudu相比,Kudu是一个支持OLTP workload的数据存储系统,而Hudi的设计目标是基于Hadoop兼容的文件系统(如HDFS、S3等),重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力,Hudi支持Incremental Pulling而Kudu不支持。 Hudi能够整合Batch和Streaming处理的能力,这是通过利用

Flink批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示: 通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。 基本数据

经典的卷积神经网络

这里,我们主要简单介绍在卷积神经网络发展过程中,一些经常用的改进模型,主要包括LeNet-5、AlexNet、VGGNet、GoogLeNet、ResNet、DenseNet、ZFNet这7个模型。本文不会非常深入讲解各个CNN模型,而是希望能够快速了解到各个模型起源,基本结构是什么样子,以及其它模型相比有什么明显的不同。 LeNet-5 LeNet-5是第一个由Yann LeCun提出的卷积神经网络,它也是最基础的一个卷积神经网络,网络结构可以参考论文《Gradient-Based Learning Applied To Document Recognition》,如下图所示: LeNet-5是一个8层CNN网络(包含输入层),其中包含卷积层块和全连接层块两个部分。卷积层用来识别图像里的空间模式,如线条和物体局部,之后的最大池化层则用来降低卷积层对位置的敏感性,卷积层块由两个这样的基本单位重复堆叠构成。当卷积层块的输出传入全连接层块时,全连接层块会将小批量中每个样本变平(Flatten)。 AlexNet AexNet模型的名字来源于论文第一作者Alex Krizhevsky的名字,使用了8层卷积神经网络,并以很大的优势赢得了ImageNet 2012图像识别挑战赛。AlexNe

卷积神经网络介绍

卷积神经网络(Convolutional Neural Networks,CNN)是由纽约大学的Yann Lecun于1998年提出的,其本质是一个多层感知机,它是一类包含卷积计算且具有深度结构的前馈神经网络(Feedforward Neural Networks),是深度学习(Deep Learning)的代表算法之一。卷积神经网络是一种特殊的多层神经网络,像其它的神经网络一样,卷积神经网络也使用一种反向传播算法来进行训练,不同之处在于网络的结构。 卷积神经网络(CNN)具有一些传统技术所没有的优点: 良好的容错能力、并行处理能力和自学习能力,可处理环境信息复杂,背景知识不清楚,推理规则不明确情况下的问题; 它允许样本有较大的缺损、畸变,运行速度快,自适应性能好,具有较高的分辨率; 它是通过结构重组和减少权值将特征抽取功能融合进多层感知器,省略识别前复杂的图像特征抽取过程。 CNN基本特征 下面,我们根据网上大家分享的有关卷积神经网络(CNN)的内容,梳理总结CNN所具有的一些特征,如下所示: 具有多层层次网络结构 卷积神经网络(CNN)被认为是第一个真正成功的、采用多层层次结构网络的

使用TensorFlow处理MNIST手写体数字识别问题

使用TensorFlow官方提供了一个例子,基于MNIST数据集,实现一个图片分类的应用,本文是基于TensorFlow 2.0.0版本来学习和试验的。 MNIST数据集是一个非常出名的手写体数字识别数据集,它包含了60000张图片作为训练集,10000张图片作为测试集,每张图片中的手写体数字是0~9中的一个,图片是28×28像素大小,并且每个数字都是位于图片的正中间的。 使用TensorFlow对MNIST数据集进行分类,整个实现对应的完整的Python代码,如下所示: from __future__ import absolute_import, division, print_function, unicode_literals import tensorflow as tf # 下载 MNIST 数据集 mnist = tf.keras.datasets.mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train, x_test = x_train / 255.0, x_test / 255.0 # 创建 tf.keras.Sequential 模型 model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.k

Flink批处理生成OptimizerNode DAG图

OptimizerNode DAG,是基于程序Plan中创建好的Operator DAG,并以OptimizerNode作为DAG图节点构建完成的。所以,我们先看一下组成OptimizerNode DAG,都有哪些类型的OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode等这几大类,根据各个OptimizerNode具体实现类名称,可以对应到具体操作功能的Operator实现类。比如,MapNode对应map操作,JoinNode对应join操作,等等。 OptimizerNode结构 下面看一下OptimizerNode的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个OptimizerNode都包含的内容,具体到每个OptimizerNode实现,都会根据Operator的属性及行为,补充一些其他的信息。上面的cachedPlans是一个基于PlanNode的DAG的列表,在生成OptimizerNode过程中并没有用到,而是基于OptimizerNode DAG生成最佳执行计划的时候,通过PlanNode DAG来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对OptimizerNode节点结构包含的一些重要属性信息进行说明,如

Flink批处理生成执行计划

我们这里所说的优化执行计划,是指从生成Plan之后,一直到生成JobGraph之前这一期间对Plan进行各种转换、增强、优化处理。从用户编程的Flink批处理程序开始,一直到生成JobGraph的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程Operator DAG 把用户程序中设置的各种DataSource、Transformation、DataSink及其配置信息进行收集,以最原始的形态保存在ExecutionEnvironment上线文环境对象中,当然也跟踪了从DataSource到DataSink之间的操作顺序,最后通过ExecutionEnvironment对象,基于DataSink就可以从后向前遍历,直接链接到DataSource。 在上面的处理过程中,涉及到3个比较重要的内容:一是用

Flink批处理生成JobGraph流程分析

我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1);

Flink编程API设计分析

使用Flink开发批式或流式Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是Flink提供的基本框架,是如何在API层面进行统一处理的,或者说尽量使API统一,这样有助于我们对Flink框架更深入地理解。目前使用Flink 1.10版本开发批式和流式Job,在API层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的Core APIs层,结合批式和流式Job开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的JobGraph,以及生成的过程有什么不同。 编程API设计 Flink编程API中,我们开发的数据处理Job程序,都是始于一个Source,对应的输入数据源,终于一个Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程API对批式Job DAG中每个节点的抽象使用的是DataSet,而流式编程API中对应的是DataStream。 对于批式Job DAG中,Data

使用Flink实现索引数据到Elasticsearch

使用Flink处理数据时,可以基于Flink提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的Sink Operator实现。 Flink批式处理模式,运行Flink Batch Job时作用在有界的输入数据集上,所以Job运行的时间是有时限的,一旦Job运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个Hive SQL查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的OutputFormat,然后基于该OutputFormat来构建一个Sink,下面看下OutputFormat接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IOException; void wri