Spark编程基础

基本概念

每个 Spark 应用程序由一个 main 函数和执行各种并行操作的 driver program(驱动程序)组成。

Spark 提出的最主要抽象概念是 弹性分布式数据集,它是一个有容错机制(划分到集群的各个节点上)并可以被并行操作的元素集合。

  • 分布在集群中的对象集合
  • 存储在磁盘或者内存中
  • 通过并行转换操作构造
  • 失效后自动重构

可以通过2种方式创建RDD:

  • 并行集合:接收一个已经存在的数据集合,然后进行各种并行计算。
  • 外部数据集:外部存储系统,例如一个共享的文件系统,HDFS、HBase以及任何支持 Hadoop InputFormat 的数据源。
    这两种类型的 RDD 都可以通过相同的方式进行操作。用户可以让 Spark 保留一个 RDD 在内存中,使其能在并行操作中被有效的重复使用,并且,RDD 能自动从节点故障中恢复。

Spark 的第二个抽象概念是共享变量,可以在并行操作中使用。在默认情况下,Spark 通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。

Spark 支持两种类型的共享变量:广播变量,可以在内存的所有的结点上缓存变量;累加器:只能用于做加法的变量,例如计数或求和。

如何编程

初始化 SparkContext

在一个Spark程序中要做的第一件事就是创建一个 SparkContext 对象来告诉 Spark 如何连接一个集群。为了创建 SparkContext ,你首先需要创建一个 SparkConf 对象,这个对象会包含你的应用的一些相关信息。这个通常是通过下面的构造器来实现的:

1
new SparkContext(master, appName, [sparkHome], [jars])

参数说明:

  • master :用于指定所连接的 Spark 或者 Mesos 集群的 URL。
  • appName :应用的名称,将会在集群的 Web 监控 UI 中显示。
  • sparkHome :可选,你的集群机器上 Spark 的安装路径(所有机器上路径必须一致)。
  • jars :可选,在本地机器上的 JAR 文件列表,其中包括你应用的代码以及任何的依赖,Spark 将会把他们部署到所有的集群结点上。

在 python 中初始化,示例代码如下:

1
2
3
//sc = SparkContext("local", "Hello Spark")
conf = SparkConf().setAppName("Hello Spark").setMaster("local")
sc = SparkContext(conf=conf)

说明:如果部署到集群,在分布式模式下运行,最后两个参数是必须的,第一个参数可以是以下任一种形式:

Master URL 含义
local 默认值,使用一个 Worker 线程本地化运行(完全不并行)
local[N] 使用 N 个 Worker 线程本地化运行,N 为 * 时,表示使用系统中所有核
local[N,M] 第一个代表的是用到的核个数;第二个参数代表的是容许该作业失败M次
spark://HOST:PORT 连接到指定的 Spark 单机版集群 master 进程所在的主机和端口,端口默认是7077
mesos://HOST:PORT 连接到指定的 Mesos 集群。host 参数是Moses master的hostname。端口默认是5050

如果你在一个集群上运行 spark-shell,则 master 参数默认为 local。在实际使用中,当你在集群中运行你的程序,你一般不会把 master 参数写死在代码中,而是通过用 spark-submit 运行程序来获得这个参数。但是,在本地测试以及单元测试时,你仍需要自行传入 local 来运行Spark程序。

运行代码

运行代码有几种方式,一是通过 spark-shell 来运行 scala 代码,一是编写 java 代码并打成包以 spark on yarn 方式运行,还有一种是通过 PySpark 来运行 python 代码。

在 spark-shell 和 PySpark 命令行中,一个特殊的集成在解释器里的 SparkContext 变量已经建立好了,变量名叫做 sc,创建你自己的 SparkContext 不会起作用。

弹性分布式数据集(RDD)

并行集合

并行集合是通过调用 SparkContext 的 parallelize 方法,在一个已经存在的 Scala 集合上创建一个 Seq 对象。

parallelize 方法还可以接受一个参数 slices,表示数据集切分的份数。Spark 将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个 CPU 上分布 2-4个 slices。一般来说,Spark 会尝试根据集群的状况,来自动设定 slices 的数目,当然,你也可以手动设置。

Python 示例程序:

1
2
3
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
distData.reduce(lambda a, b: a + b)

外部数据源

Spark可以从存储在 HDFS,或者 Hadoop 支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase 等等)上的文件创建分布式数据集。Spark 支持 TextFileSequenceFiles 以及其他任何 Hadoop InputFormat 格式的输入。

TextFile 的 RDD 可以通过下面方式创建,该方法接受一个文件的 URI 地址,该地址可以是本地路径,或者 hdfs://s3n:// 等 URL 地址。

1
distFile = sc.textFile("data.txt")

注意

  • 使用本地文件路径时,要保证在worker节点上这个文件也能够通过这个路径访问。这点可以通过将这个文件拷贝到所有worker上或者使用网络挂载的共享文件系统来解决。
  • 包括textFile在内的所有基于文件的Spark读入方法,都支持将文件夹压缩文件、包含通配符的路径作为参数。比如,以下代码都是合法的:
    1
    2
    3
    sc.textFile("/my/directory")
    sc.textFile("/my/directory/*.txt")
    sc.textFile("/my/directory/*.gz")
  • textFile方法可以传入第二个可选参数来控制文件的分片数量。默认情况下,Spark会为文件的每一个块(在HDFS中块的大小默认是64MB)创建一个分片。但是你也可以通过传入一个更大的值来要求Spark建立更多的分片。注意,分片的数量绝不能小于文件块的数量。
  • SparkContext.wholeTextFiles 方法可以读取一个包含多个小文件的目录,并以 <filename,content> 键值对的方式返回结果。
  • 对于 SequenceFiles,可以使用 SparkContext 的 sequenceFile[K, V] 方法创建。像 IntWritable 和 Text 一样,它们必须是 Hadoop 的 Writable 接口的子类。另外,对于几种通用 Writable 类型,Spark 允许你指定原生类型来替代。例如:sequencFile[Int, String] 将会自动读取 IntWritable 和 Texts。
  • 对于其他类型的 Hadoop 输入格式,你可以使用 SparkContext.hadoopRDD 方法,它可以接收任意类型的 JobConf 和输入格式类,键类型和值类型。按照像 Hadoop 作业一样的方法设置输入源就可以了。
  • RDD.saveAsObjectFileSparkContext.objectFile 提供了以 Java 序列化的简单方式来保存 RDD。虽然这种方式没有 Avro 高效,但也是一种简单的方式来保存任意的 RDD。
有收获,赞赏下