当前位置:首页 > 数码 > b-入门指南-b-Spark-从基础概念到通常运行全解析 (bi入门教程)

b-入门指南-b-Spark-从基础概念到通常运行全解析 (bi入门教程)

admin5个月前 (05-09)数码13

在这个数据驱动的时代,信息的处置和剖析变得越来越关键。而在泛滥的大数据处置框架中,「Spark」以其共同的好处锋芒毕露。

本篇文章,咱们将一同走进Spark的环球,探求并了解其相关的基础概念和经常使用方法。本文关键指标是让初学者能够对Spark有一个片面的意识,并能实践运行到各类疑问的处置之中。

一、Spark是什么

学习一个物品之前先要知道这个物品是什么。

Spark是一个开源的大数据处置引擎,它提供了一整套开发API,包括流计算和机器学习。它支持批处置和流处置。

Spark的一个清楚特点是它能够在内存中启动迭代计算,从而放慢数据处置速度。虽然Spark是用Scala开发的,但它也为、Scala、/target=_blankclass=infotextkey>Python和R等初级编程言语提供了开发接口。

1.Spark组件

Spark提供了6大外围组件:

(1)SparkCore

SparkCore是Spark的基础,它提供了内存计算的才干,是散布式处置大数据集的基础。它将散布式数据形象为弹性散布式数据集(RDD),并为运转在其上的下层组件提供API。一切Spark的下层组件都树立在SparkCore的基础之上。

(2)SparkSQL

SparkSQL是一个用于处置结构化数据的Spark组件。它准许经常使用SQL语句查问数据。Spark支持多种数据源,包括Hive表、Parquet和JSON等。

(3)SparkStreaming

SparkStreaming是一个用于处置灵活数据流的Spark组件。它能够开收回弱小的交互和数据查问程序。在处置灵活数据流时,流数据会被宰割成庞大的批处置,这些巨大量处置将会在SparkCore上按时期顺序极速执行。

(4)SparkMLlib

SparkMLlib是Spark的机器学习库。它提供了罕用的机器学习算法和适用程序,包括分类、回归、聚类、协同过滤、降维等。MLlib还提供了一些底层优化原语和高层流水线API,可以协助开发人员更快地创立和调试机器学习流水线。

(5)SparkGraphX

SparkGraphX是Spark的图形计算库。它提供了一种散布式图形处置框架,可以协助开发人员更快地构建和剖析大型图形。

2.Spark的好处

Spark有许多好处,其中一些关键好处包括:

上手写一个繁难的代码例子,上方是一个WordCount的Spark程序:

importorg.apache.spark.{SparkConf,SparkContext}objectSparkWordCount{defmn(args:Array[String]):Unit={//setMaster("local[9]")示意在本地运转Spark程序,经常使用9个线程。local[*]示意经常使用一切可用的处置器外围。//这种形式通罕用于本地测试和开发。valconf=newSparkConf().setName("WordCount").setMaster("local[9]");valsc=newSparkContext(conf);sc.setLogLevel("ERROR")val>importorg.apache.spark.sql.SparkSessionobjectWordCount{defmain(args:Array[String]){//创立SparkSession对象,它是SparkApplication的入口valspark=SparkSession.builder.appName("WordCount").getOrCreate()//读取文本文件并创立>#设置driver内存大小driver-memory1024m

3.Master&Worker

在Spark中,Master是独立集群的控制者,而Worker是上班者。

一个Spark独立集群须要启动一个Master和多个Worker。Worker就是物理节点,Worker上方可以启动Executor进程。

4.Executor

在每个Worker上为某运行启动的一个进程,该进程担任运转Task,并且担任将数据存在内存或许磁盘上。

每个义务都有各自独立的Executor。Executor是一个执行Task的容器。实践上它是一组计算资源(cpu外围、memory)的汇合。

一个Worker节点可以有多个Executor。一个Executor可以运转多个Task。

Executor创立成功后,在日志文件会显示如下信息:

INFOExecutor:StartingexecutorID[executorId]onhost[executorHostname]

RDD(ResilientDistributed/>

上图中,Stage示意一个可以顺滑成功的阶段。曲线示意Shuffle环节。

假设Stage能够复用前面的Stage的话,那么会显示灰色。

在Spark中,Shuffle是指在不同阶段之间从新调配数据的环节。它通常出当初须要对数据启动聚合或分组操作的时刻,例如reduceByKey或groupByKey等操作。

在Shuffle环节中,Spark会将数据依照键值启动分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处置属于它自己分区的数据。

10.Stage的划分

Stage的划分,繁难来说是以宽依赖来划分的。

关于窄依赖,Partition的转换处置在Stage中成功计算,不划分(将窄依赖尽量放在在同一个Stage中,可以成功流水线计算)。

关于宽依赖,由于有Shuffle的存在,只能在父RDD处置成功后,才干开局接上去的计算,也就是说须要划分Stage。

Spark会依据Shuffle/宽依赖经常使用回溯算法来对DAG启动Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把以后的RDD参与到以后的Stage阶段中。

至于什么是窄依赖和宽依赖,下文马上就会提及。

11.窄依赖&宽依赖

(1)窄依赖

父RDD的一个分区只会被子RDD的一个分区依赖。比如:map,filter和union,这种依赖称之为「窄依赖」。

窄依赖的多个分区可以并行计算,并且窄依赖的一个分区的数据假设失落只须要从新计算对应的分区的数据就可以了。

(2)宽依赖

指子RDD的分区依赖于父RDD的一切分区,称之为「宽依赖」。

关于宽依赖,必定等到上一阶段计算成功才干计算下一阶段。

有向无环图,其实说白了就是RDD之间的依赖相关图。

三、Spark执行流程

Spark的执行流程大抵如下:

四、Spark运转形式

Spark支持多种运转形式,包括本地形式、独立形式、Mesos形式、YARN形式和Kubees形式。

五、RDD详解

RDD的概念在Spark中十分关键,上方只是繁难的引见了一下,上方详细的对RDD开展引见。

RDD是ResilientDistributed>

转换操作

形容

将函数运行于RDD中的每个元素,并前往一个新的RDD

前往一个新的RDD,其中蕴含满足给定谓词的元素

将函数运行于RDD中的每个元素,并将前往的迭代器展平为一个新的RDD

前往一个新的RDD,其中蕴含两个RDD的元素

前往一个新的RDD,其中蕴含原始RDD中不同的元素

groupByKey

将键值对RDD中具有相反键的元素分组到一同,并前往一个新的RDD

reduceByKey

将键值对RDD中具有相反键的元素聚合到一同,并前往一个新的RDD

前往一个新的键值对RDD,其中元素依照键排序

(2)执行操作(Action)

Action是数据执行局部,其经过执行count,reduce,collect等方法真正执行数据的计算局部。

Action操作

形容

经过函数聚合RDD中的一切元素

将RDD中的一切元素前往到驱动程序

前往RDD中的元素个数

前往RDD中的第一个元素

前往RDD中的前n个元素

takeOrdered

前往RDD中的前n个元素,依照人造顺序或指定的顺序排序

saveAsTextFile

将RDD中的元素保留到文本文件中

将函数运行于RDD中的每个元素

3.RDD的创立方式

创立RDD有3种不同方式:

(1)从外部存储系统

由外部存储系统的数据集创立,包括本地的文件系统,还有一切Hadoop支持的数据集,比如HDFS、Cassandra、HBase等:

valrdd1=sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

(2)从其余RDD

经过已有的RDD经过算子转换生成新的RDD:

valrdd2=rdd1.flatMap(_.split(""))

(3)由一个曾经存在的Scala汇合创立

valrdd3=sc.parallelize(Array(1,2,3,4,5,6,7,8))或许valrdd4=sc.makeRDD(List(1,2,3,4,5,6,7,8))

其实makeRDD方法底层调用了parallelize方法:

4.RDD缓存机制

RDD缓存是在内存存储RDD计算结果的一种优化技术。把两边结果缓存起来以便在须要的时刻重复经常使用,这样才干有效减轻计算压力,优化运算性能。

要耐久化一个RDD,只需调用其cache()或许persist()方法即可。在该RDD第一次性被计算进去时,就会间接缓存在每个节点中。而且Spark的耐久化机制还是智能容错的,假设耐久化的RDD的任何partition失落了,那么Spark会智能经过其源RDD,经常使用transformation操作从新计算该partition。

valrdd1=sc.textFile("hdfs://node01:8020/words.txt")valrdd2=rdd1.flatMap(x=>x.split("")).map((_,1)).reduceByKey(_+_)rdd2.cache//缓存/耐久化rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行耐久化rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,由于rdd2曾经耐久化到内存中了

须要留意的是,在触发action的时刻,才会去执行耐久化。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,就是调用persist(MEMORY_ONLY),将数据耐久化到内存中。

假设须要从内存中去除缓存,那么可以经常使用unpersist()方法。

rdd.persist(StorageLevel.MEMORY_ONLY)rdd.unpersist()

5.存储级别

RDD存储级别关键有以下几种。

级别

经常使用空间

CPU时期

能否在内存中

能否在磁盘上

备注

MEMORY_ONLY

经常使用未序列化的Java对象格局,将数据保留在内存中。假设内存不够寄存一切的数据,则数据或许就不会启动耐久化。

MEMORY_ONLY_2

数据存2份

MEMORY_ONLY_SER

基本含意同MEMORY_ONLY。惟一的区别是,会将RDD中的数据启动序列化。这种方式愈加节俭内存

MEMORY_ONLY_SER_2

数据序列化,数据存2份

MEMORY_AND_DISK

中等

局部

局部

假设数据在内存中放不下,则溢写到磁盘

MEMORY_AND_DISK_2

中等

局部

局部

数据存2份

MEMORY_AND_DISK_SER

局部

局部

基本含意同MEMORY_AND_DISK。惟一的区别是,会将RDD中的数据启动序列化

MEMORY_AND_DISK_SER_2

局部

局部

数据存2份

经常使用未序列化的Java对象格局,将数据所有写入磁盘文件中。

DISK_ONLY_2

数据存2份

这个目前是实验型选项,相似MEMORY_ONLY_SER,但是数据是存储在堆外内存的。

关于上述恣意一种耐久化战略,假设加上后缀_2,代表的是将每个耐久化的数据,都复制一份正本,并将正本保留到其余节点上。

这种基于正本的耐久化机制关键用于启动容错。假设某个节点挂掉了,节点的内存或磁盘中的耐久化数据失落了,那么后续对RDD计算时还可以经常使用该数据在其余节点上的正本。假设没有正本的话,就只能将这些数据从源头处从新计算一遍了。

6.RDD的血统相关

血统相关是指RDD之间的依赖相关。当你对一个RDD执行转换操作时,Spark会生成一个新的RDD,并记载这两个RDD之间的依赖相关。这种依赖相关就是血统相关。

血统相关可以协助Spark在出现缺点时复原数据。当一个分区失落时,Spark可以依据血统相关从新计算失落的分区,而不须要从头开局从新计算整个RDD。

血统相关还可以协助Spark优化计算环节。Spark可以依据血统相关兼并多个延续的窄依赖转换,缩小数据传输和通讯开支。

咱们可以执行toDebugString打印RDD的依赖相关。

上方是一个繁难的例子:

valconf=newSparkConf().setAppName("LineageExample").setMaster("local")valsc=newSparkContext(conf)val>(2)MapPartitionsRDD[2]atfilterat<console>:26[]|MapPartitionsRDD[1]atmapat<console>:24[]|ParallelCollectionRDD[0]atparallelizeat<console>:22[]

这个输入示意最终的RDD是经过两个转换操作(map和filter)从原始的ParallelCollectionRDD转换而来的。

六、CheckPoint

CheckPoint可以将RDD从其依赖相关中抽进去,保留到牢靠的存储系统(例如HDFS,S3等),即它可以将数据和元数据保留到审核指向目录中。因此,在程序出现解体的时刻,Spark可以复原此数据,并从中止的任何中央开局。

CheckPoint分为两类:

开发人员可以经常使用RDD.checkpoint()方法来设置审核点。在经常使用审核点之前,必定经常使用SparkContext.setCheckpointDir(directory:String)方法设置审核点目录。

上方是一个繁难的例子:

importorg.apache.spark.{SparkConf,SparkContext}objectCheckpointExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("CheckpointExample").setMaster("local")valsc=newSparkContext(conf)//设置checkpoint目录sc.setCheckpointDir("/tmp/checkpoint")val>

参数名

参数说明

master的地址,提交义务到哪里执行,例如spark://host:port,yarn,local。详细指可参考上方关于Master_URL的列表

—deploy-mode

在本地(client)启动driver或在cluster上启动,自动是client

运行程序的主类,仅针对java或scala运行

运行程序的称号

用逗号分隔的本地jar包,设置后,这些jar将蕴含在driver和executor的classpath下

蕴含在driver和executor的classpath中的jar的maven坐标

—exclude-packages

为了防止抵触而指定不蕴含的package

bi入门教程

—repositories

远程repository

—confPROP=VALUE

指定spark性能属性的值,例如-confspark.executor.extraJavaOptinotallow=-XX:MaxPermSize=256m

—properties-file

加载的性能文件,默以为conf/spark-defaults.conf

—driver-memory

Driver内存,自动1G

—driver-java-options

传给driver的额外的Java选项

—driver-library-path

传给driver的额外的库门路

—driver-class-path

传给driver的额外的类门路

—driver-cores

Driver的核数,自动是1。在yarn或许standalone下经常使用

—executor-memory

每个executor的内存,自动是1G

—total-executor-cores

一切executor总共的核数。仅仅在mesos或许standalone下经常使用

—num-executors

启动的executor数量。默以为2。在yarn下经常使用

—executor-core

每个executor的核数。在yarn或许standalone下经常使用

2.Master_URL的值

含意

经常使用1个worker线程在本地运转Spark运行程序

经常使用K个worker线程在本地运转Spark运行程序

经常使用一切残余worker线程在本地运转Spark运行程序

spark://HOST:PORT

衔接到SparkStandalone集群,以便在该集群上运转Spark运行程序

mesos://HOST:PORT

衔接到Mesos集群,以便在该集群上运转Spark运行程序

yarn-client

以client方式衔接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver在client运转。

yarn-cluster

以cluster方式衔接到YARN集群,集群的定位由环境变量HADOOP_CONF_DIR定义,该方式driver也在集群中运转。

八、Spark共享变量

普通状况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上方运转时,Spark操作实践上操作的是这个函数所用变量的一个独立正本。

这些变量被复制到每台机器上,并且这些变量在远程机器上的一切更新都不会传递回驱动程序。通常跨义务的读写变量是低效的,所以,Spark提供了两种共享变量:「广播变量(broadcastvariable)」和「累加器(accumulator)」。

1.广播变量

广播变量准许程序员缓存一个只读的变量在每台机器上方,而不是每个义务保留一份拷贝。说白了其实就是共享变量。

假设Executor端用到了Driver的变量,假设不经常使用广播变量在Executor有多少task就有多少Driver端的变量正本。假设经常使用广播变量在每个Executor中只要一份Driver端的变量正本。

一个广播变量可以经过调用SparkContext.broadcast(v)方法从一个初始变量v中创立。广播变量是v的一个包装变量,它的值可以经过value方法访问,上方的代码说明了这个环节:

importorg.apache.spark.{SparkConf,SparkContext}objectBroadcastExample{defmain(args:Array[String]):Unit={valconf=newSparkConf().setAppName("BroadcastExample").setMaster("local")valsc=newSparkContext(conf)val>importorg.apache.spark.{SparkConf,SparkContext}objectAccumulatorExample{defmain(args:Array[String]){valconf=newSparkConf().setAppName("AccumulatorExample")valsc=newSparkContext(conf)valaccum=sc.longAccumulator("MyAccumulator")sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))println(accum.value)//输入10}}

这个示例中,咱们创立了一个名为MyAccumulator的累加器,并经常使用sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))来对其启动累加。最后,咱们经常使用println(accum.value)来输入累加器的值,结果为10。

咱们可以应用子类AccumulatorParam创立自己的累加器类型。AccumulatorParam接口有两个方法:zero方法为你的数据类型提供一个0值(zerovalue),addInPlace方法计算两个值的和。例如,假定咱们有一个Vector类代表数学上的向量,咱们能够如下定义累加器:

objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{defzero(initialValue:Vector):Vector={Vector.zeros(initialValue.size)}defaddInPlace(v1:Vector,v2:Vector):Vector={v1+=v2}}//Then,createanAccumulatorofthistype:valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam)

九、SparkSQL

Spark为结构化数据处置引入了一个称为SparkSQL的编程模块。它提供了一个称为DataFrame的编程形象,并且可以充任散布式SQL查问引擎。

1.SparkSQL的特性

2.SparkSQL数据类型

数字类型包括:

字符串类型包括:

二进制类型包括:

布尔类型包括:

区间类型包括:

复合类型包括:

3.DataFrame

DataFrame是Spark中用于处置结构化数据的一种数据结构。它相似于相关数据库中的表,具有行和列。每一列都有一个称号和一个类型,每一行都是一条记载。

DataFrame支持多种数据源,包括结构化数据文件、Hive表、外部数据库和现有的RDD。它提供了丰盛的操作,包括挑选、聚合、分组、排序等。

DataFrame的好处在于它提供了一种初级的形象,使得用户可以经常使用相似于SQL的言语启动数据处置,而无需关心底层的成功细节。此外,Spark会智能对>importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DataFrameExample").getOrCreate()importspark.implicits._val>importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("Create>importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("Create>importorg.apache.spark.sql.{Row,SparkSession}importorg.apache.spark.sql.types.{IntegerType,StringType,StructField,StructType}valspark=SparkSession.builder.appName("Create>importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DSLandSQL").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.select("name","age").filter($"age">25).show()

SQL是一种结构化查问言语,它用于治理相关数据库系统。在Spark中,可以经常使用SQL对> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DSLandSQL").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.createOrReplaceTempView("people")spark.sql("SELECTname,ageFROMpeopleWHEREage>25").show()

DSL和SQL的区别在于语法微格调。DSL经常使用方法调用链来构建查问,而SQL经常使用申明式言语来形容查问。选用哪种方式取决于团体喜好和经常使用场景。

6.SparkSQL数据源

SparkSQL支持多种数据源,包括Parquet、JSON、CSV、JDBC、Hive等。

上方是示例代码:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("DataSourcesExample").getOrCreate()//Parquetvaldf=spark.read.parquet("path/to/parquet/file")//JSONvaldf=spark.read.json("path/to/json/file")//CSVvaldf=spark.read.option("header","true").csv("path/to/csv/file")//JDBCvaldf=spark.read.format("jdbc").option("url","jdbc:://host:port/database").option("dbtable","table").option("user","username").option("password","password").load()df.show()

7.load&save

在Spark中,load函数用于从外部数据源读取数据并创立> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("LoadandSaveExample").getOrCreate()valdf=spark.read.load("path/to/parquet/file")df.show()

上方是将> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("LoadandSaveExample").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.write.save("path/to/parquet/file")

8.函数

此外,SparkSQL还支持「自定义函数(User-DefinedFunction,UDF)」,可以让用户编写自己的函数并在查问中经常使用。

上方是一个经常使用SQL语法编写自定义函数的示例代码:

importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.functions.udfvalspark=SparkSession.builder.appName("UDFExample").getOrCreate()importspark.implicits._valdf=Seq(("Alice",25),("Bob",30),("Charlie",35)).toDF("name","age")df.createOrReplaceTempView("people")valsquare=udf((x:Int)=>x*x)spark.udf.register("square",square)spark.sql("SELECTname,square(age)FROMpeople").show()

在这个示例中,咱们首先定义了一个名为square的自定义函数,它接受一个整数参数并前往它的平方。而后,咱们经常使用createOrReplaceTempView方法创立一个暂时视图,并经常使用udf.register方法注册自定义函数。

最后,咱们经常使用spark.sql方法执行SQL查问,并在查问中调用自定义函数。

DataSet是Spark1.6版本中引入的一种新的数据结构,它提供了RDD的强类型和> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("Create> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("Create> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("Create> valrdd1=testDF.rddvalrdd2=testDS.rdd

RDD转> importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=rdd.map{line=>Coltest(line._1,line._2)}.toDS

可以留意到,定义每一行的类型(caseclass)时,曾经给出了字段名和类型,前面只需往caseclass外面参与值即可。

Dataset转> importspark.implicits._valtestDF=testDS.toDF

DataFrame转> importspark.implicits._caseclassColtest(col1:String,col2:Int)extendsSerializable//定义字段名和类型valtestDS=testDF.as[Coltest]

这种方法就是在给出每一列的类型后,经常使用as方法,转成Dataset,这在数据类型在DataFrame须要针对各个字段处置时极为繁难。

留意:在经常使用一些不凡的操作时,必定要加上importspark.implicits._不然toDF、toDS无法经常使用。

十、SparkStreaming

SparkStreaming的上班原理是将实时数据流拆分为小批量数据,并经常使用Spark引擎对这些小批量数据启动处置。这种微批处置(Micro-BatchProcessing)的方式使得SparkStreaming能够以近乎实时的提前处置大规模的数据流。

上方是一个繁难的SparkStreaming示例代码:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("SparkStreamingExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKey(_+_)wordCounts.print()ssc.start()ssc.awaitTermination()

咱们首先创立了一个StreamingContext对象,并指定了批处置距离为1秒。而后,咱们经常使用socketTextStream方法从套接字源创立了一个DStream。接上去,咱们对DStream启动了一系列操作,包括flatMap、map和reduceByKey。最后,咱们经常使用print方法打印出单词计数的结果。

1.SparkStreaming优缺陷

SparkStreaming作为一种实时流处置框架,具有以下好处:

但是,SparkStreaming也有一些缺陷:

DStream(团圆化流)是SparkStreaming中用于示意实时数据流的一种形象。它由一系列延续的RDD组成,每个RDD蕴含一段时期内搜集到的数据。

在SparkStreaming中,可以经过以下几种方式创立DStream:

(1)从输入源创立。例如,从套接字源创立DStream:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)lines.print()ssc.start()ssc.awaitTermination()

(2)经过转换操作创立。例如,对现有的DStream启动map操作:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))words.print()ssc.start()ssc.awaitTermination()

(3)经过衔接操作创立。例如,对两个DStream启动union操作:

importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("DStreamExample")valssc=newStreamingContext(conf,Seconds(1))vallines1=ssc.socketTextStream("localhost",9999)vallines2=ssc.socketTextStream("localhost",9998)vallines=lines1.union(lines2)lines.print()ssc.start()ssc.awaitTermination()

总结:繁难来说DStream就是对RDD的封装,你对DStream启动操作,就是对RDD启动操作。关于> importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.{Seconds,StreamingContext}valconf=newSparkConf().setAppName("WindowExample")valssc=newStreamingContext(conf,Seconds(1))vallines=ssc.socketTextStream("localhost",9999)valwords=lines.flatMap(_.split(""))valpairs=words.map(word=>(word,1))valwordCounts=pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(30),Seconds(10))wordCounts.print()ssc.start()ssc.awaitTermination()

在这个示例中,咱们首先创立了一个DStream,并对其启动了一系列转换操作。而后,咱们经常使用reduceByKeyAndWindow函数对DStream启动窗口化处置,指定了窗口大小为30秒,滑动距离为10秒。最后,咱们经常使用print方法打印出单词计数的结果。

4.输入操作

SparkStreaming准许DStream的数据输入到外部系统,如数据库或文件系统,输入的数据可以被外部系统所经常使用,该操作相似于RDD的输入操作。SparkStreaming支持以下输入操作:

十一、StructuredStreaming

StructuredStreaming是Spark2.0版本中引入的一种新的流处置引擎。它基于SparkSQL引擎,提供了一种申明式的API来处置结构化数据流。

与SparkStreaming相比,StructuredStreaming具有以下好处:

上方是一个繁难的StructuredStreaming示例代码:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()

在这个示例中,咱们首先创立了一个SparkSession对象。而后,咱们经常使用readStream方法从套接字源创立了一个> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()importspark.implicits._valwords=lines.as[String].flatMap(_.split(""))valwordCounts=words.groupBy("value").count()valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()

SQL语法:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()lines.createOrReplaceTempView("lines")valwordCounts=spark.sql("""|SELECTvalue,COUNT(*)ascount|FROM(|SELECTexplode(split(value,''))asvalue|FROMlines|)|GROUPBYvalue""".stripMargin)valquery=wordCounts.writeStream.outputMode("complete").format("console").start()query.awaitTermination()

StructuredStreaming支持多种输入源,包括文件源(如文本文件、Parquet文件、JSON文件等)、Kafka、Socket等。上方是一个经常使用Scala言语从Kafka中读取数据的例子:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//订阅一个主题valdf=spark.readStream.format("kafka").option("kafka.bootstrap.servers","host1:port1,host2:port2").option("subscribe","topic1").load()df.selectExpr("CAST(keyASSTRING)","CAST(valueASSTRING)").as[(String,String)]

StructuredStreaming支持多种输入方式,包括控制台输入、内存输入、文件输入、数据源输入等。上方是将数据写入到Parquet文件中的例子:

importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//从socket中读取数据vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()//将数据写入到Parquet文件中lines.writeStream.format("parquet").option("path","path/to/output/dir").option("checkpointLocation","path/to/checkpoint/dir").start()

3.OutputMode

每当结果表更新时,咱们都宿愿将更改后的结果行写入外部接纳器。

Outputmode指定了数据写入输入接纳器的方式。StructuredStreaming支持以下三种outputmode:

OutputMode

形容

只将流>

每当有更新时,将流>

每当有更新时,只将流> importorg.apache.spark.sql.SparkSessionvalspark=SparkSession.builder.appName("StructuredStreaming").getOrCreate()//从socket中读取数据vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()//将数据写入到Parquet文件中lines.writeStream.format("parquet").option("path","path/to/output/dir").option("checkpointLocation","path/to/checkpoint/dir").start()//将数据写入到Kafka中//selectExpr是一个> importorg.apache.spark.sql.SparkSessionimportorg.apache.spark.sql.functions._objectPVUVExample{defmain(args:Array[String]):Unit={valspark=SparkSession.builder.appName("PVUVExample").getOrCreate()importspark.implicits._//假定咱们有一个蕴含用户ID和访问的URL的输入流vallines=spark.readStream.format("socket").option("host","localhost").option("port",9999).load()val> user1,

那么程序将输入以下结果:

-------------------------------------------Batch:0-------------------------------------------+--------------------+---+|url|pv|+--------------------+---+||3|||3|+--------------------+---+-------------------------------------------Batch:0-------------------------------------------+--------------------+---+|url|uv|+--------------------+---+||2|||3|+--------------------+---+

总结

在此,咱们对Spark的基本概念、经常使用方式以及局部原理启动了繁难的引见。Spark以其弱小的处置才干和灵敏性,曾经成为大数据处置畛域的一个关键工具。但是,这只是冰山一角。Spark的环球里还有许多深度和广度期待着咱们去探求。

作为初学者,你或许会感觉这个畛域庞大且复杂。但请记住,每个都是从初学者开局的。不时的学习和通常,你将能够更好的了解和把握Spark,并将其运行于处置实践疑问。这篇文章或许不能涵盖一切的常识点,但我宿愿它能带给你收获和思索。


如何入门大数据

大数据数据科学并没有一个独立的学科体系,统计学,机器学习,数据挖掘,数据库,分布式计算,云计算,信息可视化等技术或方法来对付数据。 但从狭义上来看,我认为数据科学就是解决三个问题:1. target=_blank>),stackoverflow上有tag-R的问题集(Newest ‘r’ Questions),遇到复杂的问题可在上面搜索,总会找到解决方案的。 这样一来,用这本书拿来入门学习也问题不大。 而且这本书作者写得也比较轻松,紧贴实战。 Data analysis and graphics using R:使用R语言做数据分析的入门书。 这本书的特点也是紧贴实战,没有过多地讲解统计学理论,所以喜欢通过情境应用来学习的人应该会喜欢这本入门书。 而且这本书可读性比较强,也就是说哪怕你手头没电脑写不了代码,有事没事拿出这本书翻一翻,也能读得进去。 但如果你先用R来从事实实在在的数据工作,那么上面两本恐怕不够,还需要这些:Modern applied statistics with S:这本书里统计学的理论就讲得比较多了,好处就是你可以用一本书既复习了统计学,又学了R语言。 (S/Splus和R的关系就类似于Unix和Linux,所以用S教程学习R,一点问题都没有)Data manipulation with R:这本书实务性很强,它教给你怎么从不同格式的原始数据文件里读取、清洗、转换、整合成高质量的数据。 当然和任何一本注重实战的书一样,本书也有丰富的真实数据或模拟数据供你练习。 对于真正从事数据处理工作的人来说,这本书的内容非常重要,因为对于任何研究,一项熟练的数据预处理技能可以帮你节省大量的时间和精力。 否则,你的研究总是要等待你的数据。 R Graphics Cookbook:想用R做可视化,就用这本书吧。 150多个recipes,足以帮你应付绝大多数类型的数据。 以我现在极业余的可视化操作水平来看,R是最容易做出最漂亮的图表的工具了。 An introduction to statistical learning with application in R:这本书算是著名的the element of statistical learning的姊妹篇,后者更注重统计(机器)学习的模型和算法,而前者所涉及的模型和算法原没有后者全面或深入,但却是用R来学习和应用机器学习的很好的入口。 A handbook of statistical analysis using R:这本书内容同样非常扎实,很多统计学的学生就是用这本书来学习用R来进行统计建模的。 PythonThink Python,Think Stats,Think Bayes:这是Allen B. Downey写的著名的Think X series三大卷。 其实是三本精致的小册子,如果想快速地掌握Python在统计方面的操作,好好阅读这三本书,认真做习题,答案链接在书里有。 这三本书学通了,就可以上手用Python进行基本的统计建模了。 Python For target=_blank>的网页展示他的数据可视化作品,这本书告诉你该选择什么样的可视化工具,然后告诉你怎样visualize关系型数据、时间序列、空间数据等,最后你就可以用数据讲故事了。 如果你只想感受一下数据可视化是个什么,可以直接点开下面这个链接感受下吧!A tour through the visualization zoo(A Tour Through the Visualization Zoo)Machine Learning & target=_blank>)和homeworks and solutions: ()PyData:PyData是来自各个domain的用Python做数据的人每年举行一次的聚会,期间会有各路牛人举行一些规模不大的seminar或workshop,有好心人已经把video上传到github,有兴趣的去认领吧(DataTau/datascience-anthology-pydata · GitHub)工具R/Python/MATLAB(必备):如果是做数据分析和模型开发,以我的观察来看,使用这三种工具的最多。 R生来就是一个统计学家开发的软件,所做的事也自然围绕统计学展开。 MATLAB虽然算不上是个专业的数据分析工具,但因为很多人不是专业做数据的,做数据还是为了自己的domain expertise(特别是科学计算、信号处理等),而MATLAB又是个强大无比的Domain expertise工具,所以很多人也就顺带让MATLAB也承担了数据处理的工作,虽然它有时候显得效率不高。 Python虽然不是做数据分析的专业软件,但作为一个面向对象的高级动态语言,其开源的生态使Python拥有无比丰富的库,Numpy, Scipy 实现了矩阵运算/科学计算,相当于实现了MATLAB的功能,Pandas又使Python能够像R一样处理dataframe,scikit-learn又实现了机器学习。 SQL(必备):虽然现在人们都说传统的关系型数据库如Oracle、MySQL越来越无法适应大数据的发展,但对于很多人来说,他们每天都有处理数据的需要,但可能一辈子都没机会接触TB级的数据。 不管怎么说,不论是用关系型还是非关系型数据库,SQL语言是必须要掌握的技能,用什么数据库视具体情况而定。 MongoDB(可选):目前最受欢迎的非关系型数据库NoSQL之一,不少人认为MongoDB完全可以取代mySQL。 确实MongoDB方便易用,扩展性强,Web2.0时代的必需品。 Hadoop/Spark/Storm(可选): MapReduce是当前最著名也是运用最广泛的分布式计算框架,由Google建立。 Hadoop/Spark/storm都是基于MapReduce的框架建立起来的分布式计算系统,要说他们之间的区别就是,Hadoop用硬盘存储数据,Spark用内存存储数据,Storm只接受实时数据流而不存储数据。 一言以蔽之,如果数据是离线的,如果数据比较复杂且对处理速度要求一般,就Hadoop,如果要速度,就Spark,如果数据是在线的实时的流数据,就Storm。 OpenRefine(可选):Google开发的一个易于操作的数据清洗工具,可以实现一些基本的清洗功能。 Tableau(可选):一个可交互的数据可视化工具,操作简单,开箱即用。 而且图表都设计得非常漂亮。 专业版1999美刀,终身使用。 媒体和公关方面用得比较多。 Gephi(可选):跟Tableau类似,都是那种可交互的可视化工具,不需要编程基础,生成的图表在美学和设计上也是花了心血的。 更擅长复杂网络的可视化。

数据分析师要学会什么技能?

数据分析师要学会Excel、掌握SQL Server或者Oracle的SQL语句、掌握可视化工具。

首先是Excel,貌似这个很简单,其实未必。Excel不仅能够做简单二维表、复杂嵌套表,能画折线图、Column chart、Bar chart、Area chart、饼图、雷达图、Combo char、散点图、Win Loss图等,而且能实现更高级的功能。

包括透视表(类似于BI的多维分析模型Cube),以及Vlookup等复杂函数,处理100万条以内的数据没有大问题。最后,很多更高级的工具都有Excel插件,例如一些AI Machine Learning的开发工具。

掌握SQL Server或者Oracle的SQL语句,虽然你是业务分析师,但如果取数据能少依赖于IT人员和IT工具(比如BI的多维分析模型,有时候并不能获取你想要的数据),对于做业务分析,无疑是如虎添翼,我曾经见过华为的会计能写七层嵌套的SQL语句,很吃惊。

包括join、group by、order by、distinct、sum、count、average,各种统计函数等。

掌握可视化工具,比如BI,如Cognos、Tableau、FineBI等,具体看企业用什么工具,像我之前用的是FineBI。

这些工具做可视化非常方便,特别是分析报告能含这些图,一定会吸引高层领导的眼球,一目了然了解,洞察业务的本质。另外,作为专业的分析师,用多维分析模型Cube能够方便地自定义报表,效率大大提升。

免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。

标签: Spark