Kafka SCRAM 动态认证授权配置实践

Kafka 使用 JAAS(Java Authentication and Authorization Service)来实现 SASL 认证授权。我们这里通过使用 Kafka 提供的 SCRAM 方式实现动态认证授权的配置,动态认证授权的好处非常直接,不用根据实际需求修改各种配置文件添加账号及授权,可以像操作数据库服务器一样,通过命令行就能实现认证授权的管理。 准备工作 操作系统: 我们使用 CentOS 系统,系统版本是 CentOS-7.9。 我们使用默认的单机安装方式,ZooKeeper 是内置在 Kakfa 安装包里面的,所以不需要单独下载。如果希望单独安装,可以自行准备。 为了方便操作,保证使用 ZooKeeper 的可访问性,这里直接把防火墙关闭掉: systemctl stop firewalld 下载软件: wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz 解压缩包: tar xzvf jdk-17_linux-x64_bin.tar.gz tar kafka_2.13-3.3.1.tgz cd kafka_2.13-3.3.1 JDK 配置这里就不累述了。 SCRAM 动态认证授权配置 基于 SCRAM 的

StarRocks 技术内幕:查询原理浅析

作者 康凯森 | StarRocks 核心研发、StarRocks 查询团队负责人 | 2022/04/26 15:46 | https://my.oschina.net/u/5658056/blog/5519656 一条查询 SQL 在关系型分布式数据库中的处理,通常需要经过 3 大步骤: 将 SQL 文本转换成一个 “最佳的” 分布式物理执行计划 将执行计划调度到计算节点 计算节点执行具体的物理执行计划 本文将详细解释在 StarRocks 中如何完成一条查询 SQL 的处理。 首先来了解 StarRocks 中的基本概念: FE: 负责查询解析,查询优化,查询调度和元数据管理 BE: 负责查询执行和数据存储 #01 从 SQL 文本到执行计划 从 SQL 文本到分布式物理执行计划,在 StarRocks 中,需要经过以下 5 个步骤: SQL Parse:将 SQL 文本转换成一个 AST(抽象语法树) SQL Analyze:基于 AST 进行语法和语义分析 SQL Logical Plan:将 AST 转换成逻辑计划 SQL Optimize:基于关系代数、统计信息、Cost 模型,对逻辑计划进行重写、转换,选择出 Cost “最低” 的物理执行计划 生成 Plan Fragment:将 Optimizer 选择的物理执行计划转换为 BE 可以直接执行

StarRocks-2.1.5 集群安装部署

StarRocks 是新一代的 MPP 数据库,它具有很高的查询性能,能够支持各种场景的数据查询分析使用,主要包含如下几个场景: OLAP多维分析 实时数据分析 高并发查询 统一分析 关于 StarRocks 更详细的介绍,可以查看官方文档,非常详细(见后面参考链接)。 下面,我们基于开源的 StarRocks 社区版,版本是 2.1.5 来进行集群的安装配置。StarRocks 集群的部署模式,采用 FE 与 BE 分离的模式。 集群部署规划 1 基础环境和软件 软件 版本 操作系统 CentOS-7.8 StarRocks 2.1.5 JDK jdk1.8.0_212 MySQL Client 5.7.37 2 集群主机规划 IP 地址 主机名称 角色 备注信息 172.168.0.1 VM-0-1-centos FE FE Master 172.168.0.2 VM-0-2-centos FE FE OBSERVER 172.168.0.3 VM-0-3-centos FE FE FOLLOWER 172.168.0.4 VM-0-4-centos BE BE 172.168.0.5 VM-0-5-centos BE BE 172.168.0.6 VM-0-6-centos BE BE 172.168.0.7 VM-0-7-centos BE BE 172.168.0.8 VM-0-8-centos BE BE 172.168.0.9 VM-0-9-centos BE BE 3 集群主机目录规划

Apache Hudi 架构设计和基本概念

Apache Hudi 是一个 Data Lakes 的开源方案,Hudi 是 Hadoop Updates and Incrementals 的简写,它是由 Uber 开发并开源的 Data Lakes 解决方案。Hudi 具有如下基本特性/能力: Hudi 能够摄入(Ingest)和管理(Manage)基于 HDFS 之上的大型分析数据集,主要目的是高效的减少入库延时。 Hudi 基于 Spark 来对 HDFS 上的数据进行更新、插入、删除等。 Hudi 在 HDFS 数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。 Hudi 可以对 HDFS 上的 parquet 格式数据进行插入/更新操作。 Hudi 通过自定义 InputFormat 与 Hadoop 生态系统(Spark、Hive、Parquet)集成。 Hudi 通过 Savepoint 来实现数据恢复。 目前,Hudi 支持 Spark 2.x 版本,建议使用 2.4.4+ 版本的 Spark。 基本架构 与 Kudu 相比,Kudu 是一个支持 OLTP workload 的数据存储系统,而 Hudi 的设计目标是基于 Hadoop 兼容的文件系统(如 HDFS、S3 等),重度依赖 Spark 的数据处理能力来实现增量处理和丰富的查询能力,Hudi 支持 Incremental Pulling 而 Kudu 不

Flink 批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。 生成最佳执行计划是一个递归计算的过程:正向从 DataSinkNode 开始直到 DataSourceNode,分别计算每个 OptimizerNode 的最佳计划,然后反向逐步将整个 OptimizerNode DAG 图转换为 PlanNode DAG 图,得到一个最优计划。其中,PlanNode 的类继承结构,如下图所示: 通过与 OptimizerNode 对应的节点结构类图对比,PlanNode 更加抽象了一个层次,更关注 Operator 之间的数据交换策略。 其中,生成最佳执行计划的过程,可以在 Optimizer 类中看到,如下代码所示: // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); 这是一个递归的过程,每个 OptimizerNode 都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从 OptimizerNode DAG 转换为 PlanNode DAG。后面,我们会对 SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode 创建的处理过程

Flink 批处理生成 OptimizerNode DAG 图

OptimizerNode DAG,是基于程序 Plan 中创建好的 Operator DAG,并以 OptimizerNode 作为 DAG 图节点构建完成的。所以,我们先看一下组成 OptimizerNode DAG,都有哪些类型的 OptimizerNode,如下图所示: 通过上面类图,可以看到其中主要有 SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode 等这几大类,根据各个 OptimizerNode 具体实现类名称,可以对应到具体操作功能的 Operator 实现类。比如,MapNode 对应 map 操作,JoinNode 对应 join 操作,等等。 OptimizerNode 结构 下面看一下 OptimizerNode 的数据结构,了解它都抽象了哪些内容,如下图所示: 上图中的结构是每个 OptimizerNode 都包含的内容,具体到每个 OptimizerNode 实现,都会根据 Operator 的属性及行为,补充一些其他的信息。上面的 cachedPlans 是一个基于 PlanNode 的 DAG 的列表,在生成 OptimizerNode 过程中并没有用到,而是基于 OptimizerNode DAG 生成最佳执行计划的时候,通过 PlanNode DAG 来表示的,后续文章我们会单独详细分析,这里先跳过。 下面将对 OptimizerNod

Flink 批处理生成执行计划

我们这里所说的优化执行计划,是指从生成 Plan 之后,一直到生成 JobGraph 之前这一期间对 Plan 进行各种转换、增强、优化处理。从用户编程的 Flink 批处理程序开始,一直到生成 JobGraph 的整个过程,这个过程中,会创建很多数据结构来完成执行计划的优化工作,而且,通过分层增强与优化的方式,有利于我们对每层进行了哪些处理有一个很好地理解。 我们从整理流程上来看,都有哪些核心的处理,然后会对每一步进行概要说明,从宏观上理解每一步都做了哪些操作。批处理执行计划生成及优化的主要流程,如下图所示: 根据上图,我们将Flink批处理执行计划生成的流程,分为如下几个阶段进行说明。 构建用户编程 Operator DAG 把用户程序中设置的各种 DataSource、Transformation、DataSink 及其配置信息进行收集,以最原始的形态保存在 ExecutionEnvironment 上线文环境对象中,当然也跟踪了从 DataSource 到 DataSink 之间的操作顺序,最后通过 ExecutionEnvironment 对象,基于 DataSink 就可以从后向前遍历,直接链接到 DataSource。 在上面的处理过程中,涉及到3

Flink 批处理生成 JobGraph 流程分析

我们先通过一个简单的批量处理 WordCount 的程序,来了解一下,在编程 API 层面的基本使用方法,以及 Flink 是如何基于 Pipeline 风格的 API 实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount 程序代码,示例如下所示: final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .s

Flink 编程 API 设计分析

使用 Flink 开发批式或流式 Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是 Flink 提供的基本框架,是如何在 API 层面进行统一处理的,或者说尽量使 API 统一,这样有助于我们对 Flink 框架更深入地理解。目前使用 Flink 1.10 版本开发批式和流式 Job,在 API 层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。 Flink 数据流编程模型的分层设计,如下图所示: 这里,我们关注的是上面的 Core APIs 层,结合批式和流式 Job 开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的 JobGraph,以及生成的过程有什么不同。 编程 API 设计 Flink 编程 API 中,我们开发的数据处理 Job 程序,都是始于一个 Source,对应的输入数据源,终于一个 Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程 API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程 API 对批式 Job DAG 中每个节点的抽象使用的是 DataSet,而流式编程 API 中对应的

使用 Flink 实现索引数据到 Elasticsearch

使用 Flink 处理数据时,可以基于 Flink 提供的批式处理(Batch Processing)和流式处理(Streaming Processing)API 来实现,分别能够满足不同场景下应用数据的处理。这两种模式下,输入处理都被抽象为 Source Operator,包含对应输入数据的处理逻辑;输出处理都被抽象为 Sink Operator,包含了对应输出数据的处理逻辑。这里,我们只关注输出的 Sink Operator 实现。 Flink 批式处理模式,运行 Flink Batch Job 时作用在有界的输入数据集上,所以 Job 运行的时间是有时限的,一旦 Job 运行完成,对应的整个数据处理应用就已经结束,比如,输入是一个数据文件,或者一个 Hive SQL 查询对应的结果集,等等。在批式处理模式下处理数据的输出时,主要需要实现一个自定义的 OutputFormat,然后基于该 OutputFormat 来构建一个 Sink,下面看下 OutputFormat 接口的定义,如下所示: @Public public interface OutputFormat<IT> extends Serializable { void configure(Configuration parameters); void open(int taskNumber, int numTasks) throws IO

Flink 使用 Broadcast State 实现流处理配置实时更新

Broadcast State 是 Flink 支持的一种 Operator State。使用 Broadcast State,可以在 Flink 程序的一个 Stream 中输入数据记录,然后将这些数据记录广播(Broadcast)到下游的每个 Task 中,使得这些数据记录能够为所有的 Task 所共享,比如一些用于配置的数据记录。这样,每个 Task 在处理其所对应的 Stream 中记录的时候,读取这些配置,来满足实际数据处理需要。 另外,在一定程度上,Broadcast State 能够使得 Flink Job 在运行过程中与外部的其他系统解耦合。比如,通常 Flink 会使用 YARN 来管理计算资源,使用 Broadcast State 就可以不用直接连接 MySQL 数据库读取相关配置信息了,也无需对 MySQL 做额外的授权操作。因为在一些场景下,会使用 Flink on YARN 部署模式,将 Flink Job 运行的资源申请和释放交给 YARN 去管理,那么就存在 Hadoop 集群节点扩缩容的问题,如新加节点可能需要对一些外部系统的访问,如 MySQL 等进行连接操作授权,如果忘记对 MysQL 访问授权,Flink Job 被调度到新增的某个新增节点上连接并读取 MySQL 配置信息就会出错。 Broad

Flink Checkpoint、Savepoint 配置与实践

Flink Checkpoint Checkpoint 是 Flink 实现容错机制最核心的功能,它能够根据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些 Snapshot 进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下 Flink Checkpoin t机制,如官网下图所示: Checkpoint 指定触发生成时间间隔后,每当需要触发 Checkpoint 时,会向 Flink 程序运行时的多个分布式的 Stream Source 中插入一个 Barrier 标记,这些 Barrier 会根据 Stream 中的数据记录一起流向下游的各个 Operator。当一个 Operator 接收到一个 Barrier 时,它会暂停处理 Steam 中新接收到的数据记录。因为一个 Operator 可能存在多个输入的 Stream,而每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输入 Stream 中的 Barrier 都到达。当所有 Stream 中的 Barrier 都已经到达该 Operator,这时所有的 Barrier 在时间上看来是同一个时刻点(表示已经对齐)