基于YARN集群构建运行PySpark Application

Spark Application可以直接运行在YARN集群上,这种运行模式,会将资源的管理与协调统一交给YARN集群去处理,这样能够实现构建于YARN集群之上Application的多样性,比如可以运行MapReduc程序,可以运行HBase集群,也可以运行Storm集群,还可以运行使用Python开发机器学习应用程序,等等。
我们知道,Spark on YARN又分为client模式和cluster模式:在client模式下,Spark Application运行的Driver会在提交程序的节点上,而该节点可能是YARN集群内部节点,也可能不是,一般来说提交Spark Application的客户端节点不是YARN集群内部的节点,那么在客户端节点上可以根据自己的需要安装各种需要的软件和环境,以支撑Spark Application正常运行。在cluster模式下,Spark Application运行时的所有进程都在YARN集群的NodeManager节点上,而且具体在哪些NodeManager上运行是由YARN的调度策略所决定的。
对比这两种模式,最关键的是Spark Application运行时Driver所在的节点不同,而且,如果想要对Driver所在节点的运行环境进行配置,区别很大,但这对于PySpark Application运行来说是非常关键的。
PySpark是Spark为使用Python程序编写Spark Application而实现的客户端库,通过PySpark也可以编写Spark Application并在Spark集群上运行。Python具有非常丰富的科学计算、机器学习处理库,如numpy、pandas、scipy等等。为了能够充分利用这些高效的Python模块,很多机器学习程序都会使用Python实现,同时也希望能够在Spark集群上运行。

PySpark Application运行原理

理解PySpark Application的运行原理,有助于我们使用Python编写Spark Application,并能够对PySpark Application进行各种调优。PySpark构建于Spark的Java API之上,数据在Python脚本里面进行处理,而在JVM中缓存和Shuffle数据,数据处理流程如下图所示(来自Apache Spark Wiki):
pyspark-dataflow
Spark Application会在Driver中创建pyspark.SparkContext对象,后续通过pyspark.SparkContext对象来构建Job DAG并提交DAG运行。使用Python编写PySpark Application,在Python编写的Driver中也有一个pyspark.SparkContext对象,该pyspark.SparkContext对象会通过Py4J模块启动一个JVM实例,创建一个JavaSparkContext对象。PY4J只用在Driver上,后续在Python程序与JavaSparkContext对象之间的通信,都会通过PY4J模块来实现,而且都是本地通信。
PySpark Application中也有RDD,对Python RDD的Transformation操作,都会被映射到Java中的PythonRDD对象上。对于远程节点上的Python RDD操作,Java PythonRDD对象会创建一个Python子进程,并基于Pipe的方式与该Python子进程通信,将用户编写Python处理代码和数据发送到Python子进程中进行处理。

下面,我们基于Spark on YARN模式,并根据当前企业所具有的实际集群运行环境情况,来说明如何在Spark集群上运行PySpark Application,大致分为如下3种情况:

  • YARN集群配置Python环境

这种情况,如果是初始安装YARN、Spark集群,并考虑到了当前应用场景需要支持Python程序运行在Spark集群之上,这时可以准备好对应Python软件包、依赖模块,在YARN集群中的每个节点上进行安装。这样,YARN集群的每个NodeManager上都具有Python环境,可以编写PySpark Application并在集群上运行。目前比较流行的是直接安装Python虚拟环境,使用Anaconda等软件,可以极大地简化Python环境的管理工作。
这种方式的缺点是,如果后续使用Python编写Spark Application,需要增加新的依赖模块,那么就需要在YARN集群的每个节点上都进行该新增模块的安装。而且,如果依赖Python的版本,可能还需要管理不同版本Python环境。因为提交PySpark Application运行,具体在哪些NodeManager上运行该Application,是由YARN的调度器决定的,必须保证每个NodeManager上都具有Python环境(基础环境+依赖模块)。

  • YARN集群不配置Python环境

这种情况,更适合企业已经安装了规模较大的YARN集群,并在开始使用时并未考虑到后续会使用基于Python来编写Spark Application,并且不想在YARN集群的NodeManager上安装Python基础环境及其依赖模块。我们参考了Benjamin Zaitlen的博文(详见后面参考链接),并基于Anaconda软件环境进行了实践和验证,具体实现思路如下所示:

  1. 在任意一个Linux OS的节点上,安装Anaconda软件
  2. 通过Anaconda创建虚拟Python环境
  3. 在创建好的Python环境中下载安装依赖的Python模块
  4. 将整个Python环境打成zip包
  5. 提交PySpark Application时,并通过--archives选项指定zip包路径

下面进行详细说明:
首先,我们在CentOS 7.2上,基于Python 2.7,下载了Anaconda2-5.0.0.1-Linux-x86_64.sh安装软件,并进行了安装。Anaconda的安装路径为/root/anaconda2。
然后,创建一个Python虚拟环境,执行如下命令:

conda create -n mlpy_env --copy -y -q python=2 numpy pandas scipy

上述命令创建了一个名称为mlpy_env的Python环境,--copy选项将对应的软件包都安装到该环境中,包括一些C的动态链接库文件。同时,下载numpy、pandas、scipy这三个依赖模块到该环境中。
接着,将该Python环境打包,执行如下命令:

cd /root/anaconda2/envs
zip -r mlpy_env.zip mlpy_env

该zip文件大概有400MB左右,将该zip压缩包拷贝到指定目录中,方便后续提交PySpark Application:

cp mlpy_env.zip /tmp/

最后,我们可以提交我们的PySpark Application,执行如下命令:

PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python spark-submit \
 --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/mlpy_env/bin/python \
 --master yarn-cluster \
 --archives /tmp/mlpy_env.zip#ANACONDA \
 /var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py

上面的test_pyspark_dependencies.py文件中,使用了numpy、pandas、scipy这三个依赖包的函数,通过上面提到的YARN集群的cluster模式可以运行在Spark集群上。
可以看到,上面的依赖zip压缩包将整个Python的运行环境都包含在里面,在提交PySpark Application时会将该环境zip包上传到运行Application的所在的每个节点上,并解压缩后为Python代码提供运行时环境。如果不想每次都从客户端将该环境文件上传到集群中运行PySpark Application的节点上,也可以将zip包上传到HDFS上,并修改--archives参数的值为hdfs:///tmp/mlpy_env.zip#ANACONDA,也是可以的。
另外,需要说明的是,如果我们开发的/var/lib/hadoop-hdfs/pyspark/test_pyspark_dependencies.py文件中,也依赖的一些我们自己实现的处理函数,具有多个Python依赖的文件,想要通过上面的方式运行,必须将这些依赖的Python文件拷贝到我们创建的环境中,对应的目录为mlpy_env/lib/python2.7/site-packages/下面。

  • 基于混合编程语言环境

假如我们还是希望使用Spark on YARN模式来运行PySpark Application,但并不将Python程序提交到YARN集群上运行。这时,我们可以考虑使用混合编程语言的方式,来处理数据任务。比如,机器学习Application具有迭代计算的特性,更适合在一个高配的节点上运行;而普通的ETL数据处理具有多机并行处理的特点,适合放到集群上进行分布式处理。
一个完整的机器学习Application的设计与构建,可以将算法部分和数据准备部分分离出来,使用Scala/Java进行数据预处理,输出一个机器学习算法所需要(更便于迭代、寻优计算)的输入数据格式,这会极大地压缩算法输入数据的规模,从而使算法迭代计算充分利用单机本地的资源(内存、CPU、网络),这可能会比直接放到集群中计算要快得多。
因此,我们在对机器学习Application准备数据时,使用原生的Scala编程语言实现Spark Application来处理数据,包括转换、统计、压缩等等,将满足算法输入格式的数据输出到HDFS指定目录中。在性能方面,对数据规模较大的情况下,在Spark集群上处理数据,Scala/Java实现的Spark Application运行性能要好一些。然后,算法迭代部分,基于丰富、高性能的Python科学计算模块,使用Python语言实现,其实直接使用PySpark API实现一个机器学习PySpark Application,运行模式为YARN client模式。这时,就需要在算法运行的节点上安装好Python环境及其依赖模块(而不需要在YARN集群的节点上安装),Driver程序从HDFS中读取输入数据(缓存到本地),然后在本地进行算法的迭代计算,最后输出模型。

总结

对于重度使用PySpark的情况,比如偏向机器学习,可以考虑在整个集群中都安装好Python环境,并根据不同的需要进行依赖模块的统一管理,能够=极大地方便PySpark Application的运行。
不在YARN集群上安装Python环境的方案,会使提交的Python环境zip包在YARN集群中传输带来一定开销,而且每次提交一个PySpark Application都需要打包一个环境zip文件,如果有大量的Python实现的PySpark Application需要在Spark集群上运行,开销会越来越大。另外,如果PySpark应用程序修改,可能需要重新打包环境。但是这样做确实不在需要考虑YARN集群集群节点上的Python环境了,任何版本Python编写的PySpark Application都可以使用集群资源运行。
关于该问题,SPARK-13587(详见下面参考链接)也在讨论如果优化该问题,后续应该会有一个比较合适的解决方案。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(11): “基于YARN集群构建运行PySpark Application

  1. 博主你好,为什么我打包了环境之后用archives上传,但还是报错说找不到依赖的库呢?

    • 估计你是直接在本地的Python环境中下载的Python模块,比如直接通过pip下载安装,而不是通过Anaconda来安装的,这样就没能够把依赖安装到Anaconda的虚拟环境中。

  2. 博主,你好;如果我这些需要在jupyter上运行pyspark的cilent模式,是否也是要将python的anaconda文件打包压缩传到hdfs上

    • 如果不希望每次提交PySpark程序都上传这个几百M的压缩包,那就给传到HDFS上去吧,让集群去分发处理,客户端提交就比较轻一些;如果这个压缩包内容频繁修改,那就需要在客户端提交了(否则,每次改完后,都要上传到HDFS也很麻烦;或者你可以脚本化这些操作,就是每次上传可能会多花一些时间)。

  3. 博主你好,之前有个问题:

    为什么我打包了环境之后用archives上传,但还是报错说找不到依赖的库呢?

    这个问题,我也按照你的回复用conda安装,并且 scikit-learn已经在condalist中,

    但仍然会出现 ModuleNotFoundError: No module named ‘sklearn’ 的情况。(我已经测试import numpy通过,但是sklearn一直不能正常import)
    conda list中 sklearn的名字叫 scikit-learn,但一般在python环境中import sklearn就可以,当然,我也尝试了把代码里的import sklearn 改成import scikit-learn,也是不行。

    请问你遇到过这种情况么? 谢谢!

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>