博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark概述
阅读量:5334 次
发布时间:2019-06-15

本文共 28263 字,大约阅读时间需要 94 分钟。

Spark

   

 

        概述
        Spark的技术背景
            无论是工业界还是学术界,都已经广泛使用高级集群编程模型来处理日益增长的数据,如MapReduce和Dryad。这些系统将分布式编程简化为自动提供位置感知性调度、容错以及负载均衡,使得大量用户能够在商用集群上分析超大数据集。
            大多数现有的集群计算系统都是基于非循环的数据流模型。即从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG(Directed Acyclic Graph,有向无环图),然后写回稳定存储。DAG数据流图能够在运行时自动实现任务调度和故障恢复。
            尽管非循环数据流是一种很强大的抽象方法,但仍然有些应用无法使用这种方式描述。这类应用包括:
                ①机器学习和图应用中常用的迭代算法(每一步对数据执行相似的函数)
                ②交互式数据挖掘工具(用户反复查询一个数据子集)
            基于数据流的框架并不明确支持工作集,所以需要将数据输出到磁盘,然后在每次查询时重新加载,这会带来较大的开销。针对上述问题,Spark实现了一种分布式的内存抽象,称为弹性分布式数据集(Resilient Distributed Dataset,RDD )。
            它支持基于工作集的应用,同时具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。
            RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
        Spark是是一种快速、通用、可扩展的大数据分析引擎。它是不断壮大的大数据分析解决方案家族中备受关注的明星成员,为分布式数据集的处理提供了一个有效框架,并以高效的方式处理分布式数据集。
        Spark集批处理、实时流处理、交互式查询、机器学习与图计算于一体,避免了多种运算场景下需要部署不同集群带来的资源浪费。
        Spark  VS  MapReduce
            MapReduce存在的问题
                一个 Hadoop job 通常都是这样的
                    1)从 HDFS 读取输入数据;
                    2)在 Map 阶段使用用户定义的 mapper function, 然后把结果Spill到磁盘;
                    3)在 Reduce 阶段,从各个处于 Map 阶段的机器中读取 Map 计算的中间结果,使用用户定义的 reduce function, 通常最后把结果写回 HDFS;
                Hadoop的问题在于,一个 Hadoop job 会进行多次磁盘读写,比如写入机器本地磁盘,或是写入分布式文件系统中(这个过程包含磁盘的读写以及网络传输)。考虑到磁盘读取比内存读取慢了几个数量级,所以像 Hadoop 这样高度依赖磁盘读写的架构就一定会有性能瓶颈。
                此外,在实际应用中我们通常需要设计复杂算法处理海量数据, 而且不是一个 Hadoop job 可以完成的。比如机器学习领域,需要大量使用迭代的方法训练机器学习模型。而像 Hadoop 的基本模型就只包括了一个 Map 和 一个 Reduce 阶段,想要完成复杂运算就需要切分出无数单独的 Hadoop jobs, 而且每个 Hadoop job 都是磁盘读写大户,这就让 Hadoop 显得力不从心。
            Spark的优势
                Spark 没有像 Hadoop 一样使用磁盘读写,而转用性能高得多的内存存储输入数据、处理中间结果、和存储最终结果。在大数据的场景中,很多计算都有循环往复的特点,像 Spark 这样允许在内存中缓存输入输出,上一个 job 的结果马上可以被下一个使用,性能自然要比 Hadoop MapReduce 好得多。
                同样重要的是,Spark 提供了更多灵活可用的数据操作,比如 filter, join, 以及各种对 key value pair 的方便操作,甚至提供了一个通用接口,让用户根据需要开发定制的数据操作。
                此外,Spark 本身作为平台也开发了 streaming 处理框架 spark streaming, SQL 处理框架 Dataframe, 机器学习库 MLlib, 和图处理库 GraphX. 如此强大,如此开放,基于 Spark 的操作,应有尽有。
            Hadoop 的 MapReduce 为什么不使用内存存储?
                是历史原因。当初 MapReduce 选择磁盘,除了要保证数据存储安全以外,更重要的是当时企业级数据中心购买大容量内存的成本非常高,选择基于内存的架构并不现实;现在 Spark 真的赶上了好时候,企业可以轻松部署多台大内存机器,内存大到可以装载所有要处理的数据。
    Spark架构
        为了更好地理解调度,我们先来鸟瞰一下集群模式下的Spark程序运行架构图。
        1. Driver Program
            用户编写的Spark程序称为Driver Program。每个Driver程序包含一个代表集群环境的SparkContext对象,程序的执行从Driver程序开始,所有操作执行结束后回到Driver程序中,在Driver程序中结束。如果你是用spark shell,那么当你启动 Spark shell的时候,系统后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc 的 SparkContext 对象。如果驱动器程序终止,那么Spark 应用也就结束了。
        2. SparkContext对象
            每个Driver Program里都有一个SparkContext对象,职责如下:
                1)SparkContext对象联系 cluster manager(集群管理器),让 cluster manager 为Worker Node分配CPU、内存等资源。此外, cluster manager会在 Worker Node 上启动一个执行器(专属于本驱动程序)。
                2)和Executor进程交互,负责任务的调度分配。
        3. cluster manager 集群管理器
            它对应的是Master进程。集群管理器负责集群的资源调度,比如为Worker Node分配CPU、内存等资源。并实时监控Worker的资源使用情况。一个Worker Node默认情况下分配一个Executor(进程)。
            从图中可以看到sc和Executor之间画了一根线条,这表明:程序运行时,sc是直接与Executor进行交互的。
            所以,cluster manager 只是负责资源的管理调度,而任务的分配和结果处理它不
        4.Worker Node
            Worker节点。集群上的计算节点,对应一台物理机器
        5.Worker进程
            它对应Worder进程,用于和Master进程交互,向Master注册和汇报自身节点的资源使用情况,并管理和启动Executor进程
        6.Executor
            负责运行Task计算任务,并将计算结果回传到Driver中。
        7.Task
            在执行器上执行的最小单元。比如RDD Transformation操作时对RDD内每个分区的计算都会对应一个Task。
        Spark调度模块
            Driver 的sc负责和Executor交互,完成任务的分配和调度,在底层,任务调度模块主要包含两大部分:
                1)DAGScheduler
                2)TaskScheduler
                它们负责将用户提交的计算任务按照DAG划分为不同的阶段并且将不同阶段的计算任务提交到集群进行最终的计算。
            RDD Objects可以理解为用户实际代码中创建的RDD,这些代码逻辑上组成了一个DAG。
            DAGScheduler主要负责分析依赖关系,然后将DAG划分为不同的Stage(阶段),其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。
            在DAGScheduler将这组Task划分完成后,会将这组Task提交到TaskScheduler。TaskScheduler通过Cluster Manager 申请计算资源,比如在集群中的某个Worker Node上启动专属的Executor,并分配CPU、内存等资源。接下来,就是在Executor中运行Task任务,如果缓存中没有计算结果,那么就需要开始计算,同时,计算的结果会回传到Driver或者保存在本地。
            Scheduler的实现概述
                任务调度模块涉及的最重要的三个类是:
                    1)org.apache.spark.scheduler.DAGScheduler  前面提到的DAGScheduler的实现。将一个DAG划分为一个一个的Stage阶段(每个Stage是一组Task的集合)然后把Task Set 交给TaskScheduler模块。
                    2)org.apache.spark.scheduler.TaskScheduler 它的作用是为创建它的SparkContext调度任务,即从DAGScheduler接收不同Stage的任务。向Cluster Manager 申请资源。然后Cluster Manager收到资源请求之后,在Worker为其启动进程
                    3)org.apache.spark.scheduler.SchedulerBackend 是一个trait,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的Task分配计算资源(即Executor),并且在分配的Executor上启动Task,完成计算的调度过程。
                    4)AKKA是一个网络通信框架,类似于Netty,此框架在Spark1.8之后已全部替换成Netty
            任务调度流程图
    Spark Core
        DAG概念
            有向无环图
            Spark会根据用户提交的计算逻辑中的RDD的转换(变换方法)和动作(action方法)来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG。
            RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么;还有就是依赖于parent RDD(s)的哪些Partition(s)。这个关系,就是RDD之间的依赖,org.apache.spark.Dependency。根据依赖于parent RDD(s)的Partitions的不同情况,Spark将这种依赖分为两种,一种是宽依赖,一种是窄依赖。
        DAG的生成与Stage的划分
            DAG的生成
                原始的RDD(s)通过一系列转换就形成了DAG。RDD之间的依赖关系,包含了RDD由哪些Parent RDD(s)转换而来和它依赖parent RDD(s)的哪些Partitions,是DAG的重要属性。
                借助这些依赖关系,DAG可以认为这些RDD之间形成了Lineage(血统,血缘关系)。借助Lineage,能保证一个RDD被计算前,它所依赖的parent RDD都已经完成了计算;同时也实现了RDD的容错性,即如果一个RDD的部分或者全部的计算结果丢失了,那么就需要重新计算这部分丢失的数据。
            Spark的Stage(阶段)
                Spark在执行任务(job)时,首先会根据依赖关系,将DAG划分为不同的阶段(Stage)
                处理流程是:
                    1)Spark在执行Transformation类型操作时都不会立即执行,而是懒执行(计算)
                    2)执行若干步的Transformation类型的操作后,一旦遇到Action类型操作时,才会真正触发执行(计算)
                    3)执行时,从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,直到碰到某一个宽依赖
                    4)因为宽依赖必须要进行shuffle,无法实现优化,所以将这一次段执行过程组装为一个stage
                    5)再从当前宽依赖开始继续向前找。重复刚才的步骤,从而将这个DAG还分为若干的stage
                在stage内部可以执行流水线优化,而在stage之间没办法执行流水线优化,因为有shuffle。但是这种机制已经尽力的去避免了shuffle
            Spark的Job和Task
                原始的RDD经过一系列转换后(一个DAG),会在最后一个RDD上触发一个动作,这个动作会生成一个Job。
                所以可以这样理解:一个DAG对应一个Spark的Job。
                在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算Spark的Task分为两种:
                    1)org.apache.spark.scheduler.ShuffleMapTask
                    2)org.apache.spark.scheduler.ResultTask
                简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShufffleMapTask。
        RDD
            RDD就是带有分区的集合类型
                RDD是分布式的,弹性的,容错的数据结构
                弹性分布式数据集(RDD),特点是可以并行操作,并且是容错的。有两种方法可以创建RDD:
                    1)执行Transform操作(变换操作),
                    2)读取外部存储系统的数据集,如HDFS,HBase,或任何与Hadoop有关的数据源。
                    注:创建RDD的方式有多种,比如案例一中是基于一个基本的集合类型(Array)转换而来,像parallelize这样的方法还有很多此外,我们也可以在读取数据集时就创建RDD。
                分区概念
                    可以在不同的机器上并行处理
                它是spark提供的一个特殊集合类。诸如普通的集合类型,如传统的Array:(1,2,3,4,5)是一个整体,但转换成RDD后,我们可以对数据进行Partition(分区)处理,这样做的目的就是为了分布式。
                    你可以让这个RDD有两个分区,那么有可能是这个形式:RDD(1,2) (3,4)。
                    这样设计的目的在于:可以进行分布式运算。
            RDD操作
                针对RDD的操作,分两种,一种是Transformation(变换),一种是Actions(执行)。
                Transformation(变换)操作属于懒操作(算子),不会真正触发RDD的处理计算。
                变换方法的共同点:1.不会马上触发计算 2.每当调用一次变换方法,都会产生一个新的RDD,Actions(执行)操作才会真正触发。
            RDD的依赖关系
                RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
                1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
                    对于窄依赖操作,它们只是将Partition的数据根据转换的规则进行转化,并不涉及其他的处理,可以简单地认为只是将数据从一个形式转换到另一个形式。
                    所以对于窄依赖,并不会引入昂贵的Shuffle。所以执行效率非常高。如果整个DAG中存在多个连续的窄依赖,则可以将这些连续的窄依赖整合到一起连续执行,中间不执行shuffle 从而提高效率,这样的优化方式称之为流水线优化。
                    此外,针对窄依赖,如果子RDD某个分区数据丢失,只需要找到父RDD对应依赖的分区,恢复即可。但如果是宽依赖,当分区丢失时,最糟糕的情况是要重算所有父RDD的所有分区。
                2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition。
                    对于groupByKey这样的操作,子RDD的所有Partition(s)会依赖于parent RDD的所有Partition(s),子RDD的Partition是parent RDD的所有Partition Shuffle的结果。
                Shuffle概述
                    spark中一旦遇到宽依赖就需要进行shuffle的操作,所谓的shuffle的操作的本质就是将数据汇总后重新分发的过程
                    这个过程数据要汇总到一起,数据量可能很大所以不可避免的需要进行数据落磁盘的操作,会降低程序的性能,所以spark并不是完全内存不读写磁盘,只能说它尽力避免这样的过程来提高效率 。
                    spark中的shuffle,在早期的版本中,会产生多个临时文件,但是这种多临时文件的策略造成大量文件的同时的读写,磁盘的性能被分摊给多个文件,每个文件读写效率都不高,影响spark的执行效率。所以在后续的spark中(1.2.0之后的版本)的shuffle中,只会产生一个文件,并且数据会经过排序再附加索引信息,减少了文件的数量并通过排序索引的方式提升了性能。
            RDD容错机制
                分布式系统通常在一个机器集群上运行,同时运行的几百台机器中某些出问题的概率大大增加,所以容错设计是分布式系统的一个重要能力。
                Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。
                RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage(血缘关系)。当一个RDD的某个分区丢失时,RDD是有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,这是Spark的一个创新。
            RDD的缓存
                相比Hadoop MapReduce来说,Spark计算具有巨大的性能优势,其中很大一部分原因是Spark对于内存的充分利用,以及提供的缓存机制
                RDD持久化(缓存)
                    持久化在早期被称作缓存(cache),但缓存一般指将内容放在内存中。虽然持久化操作在绝大部分情况下都是将RDD缓存在内存中,但一般都会在内存不够时用磁盘顶上去(比操作系统默认的磁盘交换性能高很多)。当然,也可以选择不使用内存,而是仅仅保存到磁盘中。所以,现在Spark使用持久化(persistence)这一更广泛的名称。
                默认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化(缓存)操作避免了这里的重复计算,实际测试也显示持久化对性能提升明显,这也是Spark刚出现时被人称为内存计算框架的原因。
                持久化的方法是调用persist()函数,除了持久化至内存中,还可以在persist()中指定storage level参数使用其他的类型,具体如下:
                    1)MEMORY_ONLY : 将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中. 如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算. 这是默认的级别。
                    cache()方法对应的级别就是MEMORY_ONLY级别
                    2)MEMORY_AND_DISK:将 RDD 以反序列化的 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
                    3)MEMORY_ONLY_SER :将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serialize时会节省更多的空间,但是在读取时会使得 CPU 的 read 变得更加密集。如果内存空间不够,部分数据分区将不会被缓存,在每次需要用到这些数据时重新进行计算。
                    4)MEMORY_AND_DISK_SER :类似于 MEMORY_ONLY_SER ,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。
                    5)DISK_ONLY:只在磁盘上缓存 RDD。
                    6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. :与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
                    7)OFF_HEAP 将数据存储在 off-heap memory 中。使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。使用堆外内存的好处:可能会利用到更大的内存存储空间。但是对于数据的垃圾回收会有影响,需要程序员来处理
                    注意,可能带来一些GC回收问题。
                缓存数据的清除
                    Spark 会自动监控每个节点上的缓存数据,然后使用 least-recently-used (LRU) 机制来处理旧的缓存数据。如果你想手动清理这些缓存的 RDD 数据而不是去等待它们被自动清理掉,
                    可以使用 RDD.unpersist( ) 方法。
                Spark 也会自动持久化一些在 shuffle 操作过程中产生的临时数据(比如 reduceByKey),即便是用户并没有调用持久化的方法。这样做可以避免当 shuffle 阶段时如果一个节点挂掉了就得重新计算整个数据的问题。如果用户打算多次重复使用这些数据,我们仍然建议用户自己调用持久化方法对数据进行持久化。
        Spark框架核心概念
            1.RDD。弹性分布式数据集,是Spark最核心的数据结构。有分区机制,所以可以分布式进行处理。有容错机制,通过RDD之间的依赖关系来恢复数据。
            2.依赖关系。RDD的依赖关系是通过各种Transformation(变换)来得到的。父RDD和子RDD之间的依赖关系分两种:①窄依赖  ②宽依赖
                ①针对窄依赖:父RDD的分区和子RDD的分区关系是:一对一
                窄依赖不会发生Shuffle,执行效率高,spark框架底层会针对多个连续的窄依赖执行流水线优化,从而提高性能。例如 map  flatMap等方法都是窄依赖方法
                ②针对宽依赖:父RDD的分区和子RDD的分区关系是:一对多
                宽依赖会产生shuffle,会产生磁盘读写,无法优化。
            3.DAG。有向无环图,当一整条RDD的依赖关系形成之后,就形成了一个DAG。一般来说,一个DAG,最后都至少会触发一个Action操作,触发执行。一个Action对应一个Job任务。
            4.Stage。一个DAG会根据RDD之间的依赖关系进行Stage划分,流程是:以Action为基准,向前回溯,遇到宽依赖,就形成一个Stage。遇到窄依赖,则执行流水线优化(将多个连续的窄依赖放到一起执行)
            5.task。任务。一个分区对应一个task。可以这样理解:一个Stage是一组Task的集合
            6.RDD的Transformation(变换)操作:懒执行,并不会立即执行
            7.RDD的Action(执行)操作:触发真正的执行
        Spark Shuffle详解
            Shuffle,翻译成中文就是洗牌。之所以需要Shuffle,还是因为具有某种共同特征的一类数据需要最终汇聚(aggregate)到一个计算节点上进行计算。这些数据分布在各个存储节点上并且由不同节点的计算单元处理。
            数据重新打乱然后汇聚到不同节点的过程就是Shuffle。但是实际上,Shuffle过程可能会非常复杂:
                1)数据量会很大,比如单位为TB或PB的数据分散到几百甚至数千、数万台机器上。
                2)为了将这个数据汇聚到正确的节点,需要将这些数据放入正确的Partition,因为数据大小已经大于节点的内存,因此这个过程中可能会发生多次硬盘续写。
                3)为了节省带宽,这个数据可能需要压缩,如何在压缩率和压缩解压时间中间做一个比较好的选择?
                4)数据需要通过网络传输,因此数据的序列化和反序列化也变得相对复杂。
                一般来说,每个Task处理的数据可以完全载入内存(如果不能,可以减小每个Partition的大小),因此Task可以做到在内存中计算。但是对于Shuffle来说,如果不持久化这个中间结果,一旦数据丢失,就需要重新计算依赖的全部RDD,因此有必要持久化这个中间结果。所以这就是为什么Shuffle过程会产生文件的原因。
                如果Shuffle过程不落地,①可能会造成内存溢出 ②当某分区丢失时,会重新计算所有父分区数据
            Shuffle Write
                Shuffle Write,即数据是如何持久化到文件中,以使得下游的Task可以获取到其需要处理的数据的(即Shuffle Read)。在Spark 0.8之前,Shuffle Write是持久化到缓存的,但后来发现实际应用中,shuffle过程带来的数据通常是巨量的,所以经常会发生内存溢出的情况,所以在Spark 0.8以后,Shuffle Write会将数据持久化到硬盘,再之后Shuffle Write不断进行演进优化,但是数据落地到本地文件系统的实现并没有改变。
                1)Hash Based Shuffle Write
                    在Spark 1.0以前,Spark只支持Hash Based Shuffle。因为在很多运算场景中并不需要排序,因此多余的排序只能使性能变差,比如Hadoop的Map Reduce就是这么实现的,也就是Reducer拿到的数据都是已经排好序的。实际上Spark的实现很简单:每个Shuffle Map Task根据key的哈希值,计算出每个key需要写入的Partition然后将数据单独写入一个文件,这个Partition实际上就对应了下游的一个Shuffle Map Task或者Result Task。因此下游的Task在计算时会通过网络(如果该Task与上游的Shuffle Map Task运行在同一个节点上,那么此时就是一个本地的硬盘读写)读取这个文件并进行计算。
                    Hash Based Shuffle Write存在的问题
                        1)每个节点可能会同时打开多个文件,每次打开文件都会占用一定内存。假设每个Write Handler的默认需要100KB的内存,那么同时打开这些文件需要50GB的内存,对于一个集群来说,还是有一定的压力的。尤其是如果Shuffle Map Task和下游的Task同时增大10倍,那么整体的内存就增长到5TB。
                        2)从整体的角度来看,打开多个文件对于系统来说意味着随机读,尤其是每个文件比较小但是数量非常多的情况。而现在机械硬盘在随机读方面的性能特别差,非常容易成为性能的瓶颈。如果集群依赖的是固态硬盘,也许情况会改善很多,但是随机写的性能肯定不如顺序写的。
                    Hash Based Shuffle的每个Mapper都需要为每个Reducer写一个文件,供Reducer读取,即需要产生M*R个数量的文件,如果Mapper和Reducer的数量比较大,产生的文件数会非常多。
                2)Sort Based Shuffle Write
                    Spark Core的一个重要的升级就是将默认的Hash Based Shuffle换成了Sort Based Shuffle,即spark.shuffle.manager从Hash换成了Sort
                    对应的实现类分别是
                        org.apache.spark.shuffle.hash.HashShuffleManager
                        org.apache.spark.shuffle.sort.SortShuffleManager。
                    Sort Based Shuffle的模式是:每个Shuffle Map Task不会为每个Reducer生成一个单独的文件;相反,它会将所有的结果写到一个文件里,同时会生成一个Index文件,
                    Reducer可以通过这个Index文件取得它需要处理的数据。避免产生大量文件的直接收益就是节省了内存的使用和顺序Disk IO带来的低延时。节省内存的使用可以减少GC的风险和频率。而减少文件的数量可以避免同时写多个文件给系统带来的压力。
                    Sort Based Write实现详解
                        Shuffle Map Task会按照key相对应的Partition ID进行Sort,其中属于同一个Partition的key不会Sort。因为对于不需要Sort的操作来说,这个Sort是负收益的;要知道之前Spark刚开始使用Hash Based的Shuffle而不是Sort Based就是为了避免Hadoop Map Reduce对于所有计算都会Sort的性能损耗。对于那些需要Sort的运算,
                        比如sortByKey,这个Sort在Spark 1.2.0里还是由Reducer完成的。
                        ①答出shuffle的定义
                        ②spark shuffle的特点
                        ③spark shuffle的目的
                        ④spark shuffel的实现类,即对应优缺点
            Shuffle 相关参数配置
                Shuffle是Spark Core比较复杂的模块,它也是非常影响性能的操作之一。
                1)spark.shuffle.manager
                    两种方式的Shuffle 即Hash Based Shuffle和Sort Based Shuffle
                2)spark.shuffle.spill
                    这个参数的默认值是true,用于指定Shuffle过程中如果内存中的数据超过阈值(参考spark.shuffle.memoryFraction的设置)时是否需要将部分数据临时写入外部存储。
                    如果设置为false,那么这个过程就会一直使用内存,会有内存溢出的风险。因此只有在确定内存足够使用时,才可以将这个选项设置为false。
                3)spark.shuffle.memoryFraction
                    在启用spark.shuffle.spill的情况下,spark.shuffle.memoryFraction决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始spill。在Spark 1.2.0里,这个值是0.2
                    此参数可以适当调大,可以控制在0.4~0.6。
                    通过这个参数可以设置Shuffle过程占用内存的大小,它直接影响了写入到外部存储的频率和垃圾回收的频率。
                    可以适当调大此值,可以减少磁盘I/O次数。
                4)spark.shuffle.blockTransferService
                    在Spark 1.2.0中这个配置的默认值是netty,而在之前的版本中是nio。它主要是用于在各个Executor之间传输Shuffle数据。netty的实现更加简洁,但实际上用户不用太关心这个选项。除非有特殊需求,否则采用默认配置即可。
                5)spark.shuffle.consolidateFiles
                    这个配置的默认值是false。主要是为了解决在Hash Based Shuffle过程中产生过多文件的问题。如果配置选项为true,那么对于同一个Core上运行的Shuffle Map Task不会产生一个新的Shuffle文件而是重用原来的
                6)spark.shuffle.compress和spark.shuffle.spill.compress
                    这两个参数的默认配置都是true。都是用来设置Shuffle过程中是否对Shuffle数据进行压缩
                    前者针对最终写入本地文件系统的输出文件
                    后者针对在处理过程需要写入到外部存储的中间数据,即针对最终的shuffle输出文件。
                7)spark.reducer.maxMbInFlight
                    这个参数用于限制一个Result Task向其他的Executor请求Shuffle数据时所占用的最大内存数,默认是64MB。尤其是如果网卡是千兆和千兆以下的网卡时。默认值是 设置这个值需要综合考虑网卡带宽和内存。
        Spark调优
            更好的序列化实现
                Spark用到序列化的地方
                    1)Shuffle时需要将对象写入到外部的临时文件。
                    2)每个Partition中的数据要发送到worker上,spark先把RDD包装成task对象,将task通过网络发给worker。
                    3)RDD如果支持内存+硬盘,只要往硬盘中写数据也会涉及序列化。
                默认使用的是java的序列化。但java的序列化有两个问题,一个是性能相对比较低,另外它序列化完二进制的内容长度也比较大,造成网络传输时间拉长。业界现在有很多更好的实现,如kryo,比java的序列化快10倍以上。而且生成内容长度也短。时间快,空间小,自然选择它了。
            通过代码使用Kryo
            配置多临时文件目录
                spark.local.dir参数。当shuffle、归并排序(sort、merge)时都会产生临时文件。这些临时文件都在这个指定的目录下。那这个文件夹有很多临时文件,如果都发生读写操作,有的线程在读这个文件,有的线程在往这个文件里写,磁盘I/O性能就非常低。
                可以创建多个文件夹,每个文件夹都对应一个真实的硬盘。假如原来是3个程序同时读写一个硬盘,效率肯定低,现在让三个程序分别读取3个磁盘,这样冲突减少,效率就提高了。这样就有效提高外部文件读和写的效率。怎么配置呢?只需要在这个配置时配置多个路径就可以。中间用逗号分隔。
                spark.local.dir=/home/tmp,/home/tmp2
            启用推测执行机制
                可以设置spark.speculation  true
                开启后,spark会检测执行较慢的Task,并复制这个Task在其他节点运行,最后哪个节点先运行完,就用其结果,然后将慢Task 杀死
            collect速度慢
                collect只适合在测试时,因为把结果都收集到Driver服务器上,数据要跨网络传输,同时要求Driver服务器内存大,所以收集过程慢。解决办法就是直接输出到分布式文件系统中。
            有些情况下,RDD操作使用MapPartitions替代map
                map方法对RDD的每一条记录逐一操作。mapPartitions是对RDD里的每个分区操作
                rdd.map{ x=>conn=getDBConn.conn;write(x.toString);conn close;}
                这样频繁的链接、断开数据库,效率差。
                rdd.mapPartitions{(record:=>conn.getDBConn;for(item<-recorders;write(item.toString);conn close;}
                这样就一次链接一次断开,中间批量操作,效率提升。
            Spark的GC调优
                由于Spark立足于内存计算,常常需要在内存中存放大量数据,因此也更依赖JVM的垃圾回收机制(GC)。并且同时,它也支持兼容批处理和流式处理,对于程序吞吐量和延迟都有较高要求,因此GC参数的调优在Spark应用实践中显得尤为重要。
                主要有两种策略——Parallel GC(吞吐量优先)和CMS GC(低延迟响应)。
                GC算法原理
                    对于内存较大的环境非常友好。因为G1 GC对于内存的使用率特别高,内存越大,此优势越明显。
                选择垃圾收集器
                    park默认使用的是Parallel GC。经调研我们发现,Parallel GC常常受困于Full GC,而每次Full GC都给性能带来了较大的下降。而Parallel GC可以进行参数调优的空间也非常有限,我们只能通过调节一些基本参数来提高性能,如各年代分区大小比例、进入老年代前的拷贝次数等。而且这些调优策略只能推迟Full GC的到来,如果是长期运行的应用,Parallel GC调优的意义就非常有限了。
                将InitiatingHeapOccupancyPercent参数调低(默认值是45),可以使G1 GC收集器更早开始Mixed GC(Minor GC);但另一方面,会增加GC发生频率。(启动并发GC周期时的堆内存占用百分比. G1之类的垃圾收集器用它来触发并发GC周期,基于整个堆的使用率,而不只是某一代内存的使用比. 值为 0 则表示"一直执行GC循环". 默认值为 45.)降低此值,会提高Minor GC的频率,但是会推迟Full GC的到来。
                提高ConcGCThreads的值,在Mixed GC阶段投入更多的并发线程,争取提高每次暂停的效率。但是此参数会占用一定的有效工作线程资源。
                调试这两个参数可以有效降低Full GC出现的概率。Full GC被消除之后,最终的性能获得了大幅提升。
            Spark的内存管理
                Spark的核心概念是RDD,实际运行中内存消耗都与RDD密切相关。Spark允许用户将应用中重复使用的RDD数据持久化缓存起来,从而避免反复计算的开销,而RDD的持久化形态之一就是将全部或者部分数据缓存在JVM的Heap中。当我们观察到GC延迟影响效率时,应当先检查Spark应用本身是否有效利用有限的内存空间。RDD占用的内存空间比较少的话,程序运行的heap空间也会比较宽松,GC效率也会相应提高;而RDD如果占用大量空间的话,则会带来巨大的性能损失
            总结
                对于大量依赖于内存计算的Spark应用,GC调优显得尤为重要。在发现GC问题的时候,不要着急调试GC。而是先考虑是否存在Spark进程内存管理的效率问题,例如RDD缓存的持久化和释放。至于GC参数的调试,首先我们比较推荐使用G1 GC来运行Spark应用。相较于传统的垃圾收集器,随着G1的不断成熟,需要配置的选项会更少,能同时满足高吞吐量和低延迟的寻求。当然,GC的调优不是绝对的,不同的应用会有不同应用的特性,掌握根据GC日志进行调优的方法,才能以不变应万变。最后,也不能忘了先对程序本身的逻辑和代码编写进行考量,例如减少中间变量的创建或者复制,控制大对象的创建,将长期存活对象放在Off-heap中等等。
        Checkpoint机制
            checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面 计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方
            总结:Spark的CheckPoint机制很重要,也很常用,尤其在机器学习中的一些迭代算法中很常见。比如一个算法迭代10000次,如果不适用缓冲机制,如果某分区数据丢失,会导致整个计算链重新计算,所以引入缓存机制。但是光引入缓存,也不完全可靠,比如缓存丢失或缓存存储不下,也会导致重新计算,所以使用CheckPoint机制再做一层保证。
            补充:检查目录的路径,一般都是设置到HDFS上
            Spark懒执行的意义
                Spark中,Transformation方法都是懒操作方法,比如map,flatMap,reduceByKey等。当触发某个Action操作时才真正执行。
                懒操作的意义:
                    ①不运行job就触发计算,避免了大量的无意义的计算,即避免了大量的无意义的中间结果的产生,即避免产生无意义的磁盘I/O及网络传输
                    ②更深层次的意义在于,执行运算时,看到之前的计算操作越多,执行优化的可能性就越高
        Spark共享变量
            Spark程序的大部分操作都是RDD操作,通过传入函数给RDD操作函数来计算。这些函数在不同的节点上并发执行,但每个内部的变量有不同的作用域,不能相互访问,所以有时会不太方便,Spark提供了两类共享变量供编程使用——广播变量和计数器
            1. 广播变量
                这是一个只读对象,在所有节点上都有一份缓存,创建方法是SparkContext.broadcast()
                注意,广播变量是只读的,所以创建之后再更新它的值是没有意义的,一般用val修饰符来定义广播变量。
            2. 计数器
                计数器只能增加,是共享变量,用于计数或求和。
                计数器变量的创建方法是SparkContext.accumulator(v, name),其中v是初始值,name是名称。
        spark解决数据倾斜问题
            将少量的数据转化为Map进行广播,广播会将此 Map 发送到每个节点中,如果不进行广播,每个task执行时都会去获取该Map数据,造成了性能浪费。
    Spark SQL
        概述
            Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。它提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。
            SparkSQL的由来
                SparkSQL的前身是Shark。在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,是当时唯一运行在hadoop上的SQL-on-Hadoop工具。但是,MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,运行效率较低。
                为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是
                    1)MapR的Drill
                    2)Cloudera的Impala
                    3)Shark
                    其中Shark是伯克利实验室Spark生态环境的组件之一,它基于Hive实施了一些改进,比如引入缓存管理,改进和优化执行器等,并使之能运行在Spark引擎上,从而使得SQL查询的速度得到10-100倍的提升。
                SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码。
            由于摆脱了对hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。
        SparkSql特点
            1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD
            2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。
            3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算
            SparkSql将RDD封装成一个DataFrame对象,这个对象类似于关系型数据库中的表。
        主要sparkSQL在下面几点做了优化:
             SparkSQL的表数据在内存中存储不是采用原生态的JVM对象存储方式,而是采用内存列存储
                该存储方式无论在空间占用量和读取吞吐率上都占有很大优势。
            1)内存列存储(In-Memory Columnar Storage)
                ①海量数据查询时,不存在冗余列问题。如果是基于行存储,查询时会产生冗余列,消除冗余列一般在内存中进行的。或者基于行存储的查询,实现物化索引(建立B-tree B+tree),但是物化索引也是需要耗费cpu的
                ②基于列存储,每一列数据类型都是同质的,好处一可以避免数据在内存中类型的频繁转换。好处二可以采用更高效的压缩算法,比如增量压缩算法,二进制压缩算法。性别:男  女  男  女  0101
            SparkSql的存储方式
                对于内存列存储来说,将所有原生数据类型的列采用原生数组来存储,将Hive支持的复杂数据类型(如array、map等)先序化后并接成一个字节数组来存储。
                此外,基于列存储,每列数据都是同质的,所以可以降低数据类型转换的CPU消耗。此外,可以采用高效的压缩算法来压缩,是的数据更少。比如针对二元数据列,可以用字节编码压缩来实现(010101)
                这样,每个列创建一个JVM对象,从而可以快速的GC和紧凑的数据存储;额外的,还可以使用低廉CPU开销的高效压缩方法(如字典编码、行长度编码等压缩方法)降低内存开销;更有趣的是,对于分析查询中频繁使用的聚合特定列,性能会得到很大的提高,原因就是这些列的数据放在一起,更容易读入内存进行计算。
    Spark Streaming
        概述
            图
            Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力,以吞吐量高和容错能力强著称。
        SparkStreaming VS Storm
            在Spark老版本中,SparkStreaming的延迟级别达到秒级,而Storm可以达到毫秒级别。而在最新的2.0版本之后,SparkStreaming能够达到毫秒级。
            目前,sparkStreaming还不能达到一条一条记录的精细控制,还是以batch为单位。所以像Storm一般用于金融领域,达到每笔交易的精细控制。
            但是两者的基因不同,更具体地说就是核心数据抽象不同。这是无法改变的,而且也不会轻易改变,这样的基因也决定了它们各自最适合的应用场景。
            Spark Streaming的核心抽象是DSTream,里面是RDD,下层是Spark核心DAG调度,所以Spark Streaming的这一基因决定了其粒度是小批量的,无法做更精细地控制。
            数据的可靠性也是以批次为粒度的,但好处也很明显,就是有可能实现更大的吞吐量。
            核心数据抽象的不同导致了它们在计算模式上的本质区别
            另外,得益于Spark平台的良好整合性,完成相同任务的流式计算程序与历史批量处理程序的代码基本相同,而且还可以使用平台上的其他模块比如SQL、机器学习、图计算的计算能力,在开发效率上占有优势
            图
            核心数据抽象的不同导致了它们在计算模式上的本质区别。Spark Streaming在本质上其实是像MR一样的批处理计算,但将批处理的周期从常规的几十分钟级别尽可能缩短至秒级(毫秒级),也算达到了实时计算的延时指标。而且,它支持各类数据源,基本可以实现流式计算的功能,但延时无法进一步缩短了。但Storm的设计初衷就是实时计算,毫秒级的计算当然不在话下,而且后期通过更高级别的Trident也实现了小批次处理功能。
        架构及原理
            SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、ZeroMQ和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。
            Spark Streaming是将流式计算分解成一系列短小的批处理作业,也就是把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据DStream(Discretized-离散化 Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),然后将Spark Streaming中对DStream的Transformations操作变为针对Spark中对RDD的Transformations操作,将RDD经过操作变成中间结果保存在内存中。
            整个流式计算根据业务的需求可以对中间的结果进行叠加或者存储到外部设备。
            对DStream的处理,每个DStream都要按照数据流到达的先后顺序依次进行处理。即SparkStreaming天然确保了数据处理的顺序性。
            这样使所有的批处理具有了一个顺序的特性,其本质是转换成RDD的血缘关系。所以,SparkStreaming对数据天然具有容错性保证。
            为了提高SparkStreaming的工作效率,你应该合理的配置批的时间间隔, 最好能够实现上一个批处理完某个算子,下一个批子刚好到来
        基本概念
            1. StreamingContext
                StreamingContext是Spark Streaming编程的最基本环境对象,就像Spark编程中的SparkContext一样。StreamingContext提供最基本的功能入口,包括从各途径创建最基本的对象DStream(就像Spark编程中的RDD)。
                StreamingContext创建好之后,还需要下面这几步来实现一个完整的Spark流式计算:
                    (1)创建一个输入DStream,用于接收数据;
                    (2)使用作用于DStream上的Transformation和Output操作来定义流式计算(Spark程序是使用Transformation和Action操作);
                    (3)启动计算,使用streamingContext.start();
                    (4)等待计算结束(人为或错误),使用streamingContext.awaitTermination();
                    (5)也可以手工结束计算,使用streamingContext.stop()。
            2. DStream抽象
                DStream(discretized stream)是Spark Streaming的核心抽象,类似于RDD在Spark编程中的地位。DStream表示连续的数据流,要么是从数据源接收到的输入数据流,要求是经过计算产生的新数据流。DStream的内部是一个RDD序列,每个RDD对应一个计算周期。比如,在上面的WordCount示例中,每5秒一个周期,那么每5秒的数据都分别对应一个RDD,
                所有应用在DStream上的操作,都会被映射为对DStream内部的RDD上的操作
                RDD操作将由Spark核心来调度执行,但DStream屏蔽了这些细节,给开发者更简洁的编程体验。当然,我们也可以直接对DStream内部的RDD进行操作
                我们希望能够每隔一段时间重新统计下一段时间的数据,并且能够对设置的批时间进行更细粒度的控制,这样的功能可以通过滑动窗口的方式来实现。
                window(windowLength, slideInterval)
                windowLength:窗口长度
                slideInterval:滑动区间
            Spark On Yarn搭建
                实现步骤:
                1)搭建好Hadoop(版本,2.7)集群
                2)安装和配置scala(版本,2.11)
                上传解压scala-2.11.0.tgz—>配置 /etc/profile文件
                3)在NodeManager节点(04,05,06节点)上安装和配置Spark
                4)进入Spark安装目录的Conf目录,配置:spark-env.sh 文件
                5)配置:slaves文件
                6)在HDFS上,创建一个目录,用来存放 spark的依赖jar包
                执行: hadoop  fs  -mkdir   /spark_jars
                7)进入spark 安装目录的jars目录,
                执行:hadoop fs  -put   ./*   /spark_jars
                8)进入spark安装目录的 conf目录,配置:spark-defaults.conf 文件
                9)至此,完成Spark-Yarn的配置。注意:如果是用虚拟机搭建,可能会由于虚拟机内存过小而导致启动失败,比如内存资源过小,yarn会直接kill掉进程导致rpc连接失败。
                10)启动Hadoop的yarn,进入Hadoop安装目录的sbin目录
                执行:sh start-yarn.sh
                11)启动spark shell,进入Spark安装目录的bin目录
                执行:sh spark-shell --master yarn-client
                至于spark的使用,和之前都是一样的。只不过资源的分配和管理是交给yarn来控制了。
    MLlib
        数据挖掘与机器学习
            数据挖掘体系
            数据挖掘:也就是data mining,是一个很宽泛的概念,也是一个新兴学科,旨在如何从海量数据中挖掘出有用的信息来。
            数据挖掘这个工作BI(商业智能)可以做,统计分析可以做,大数据技术可以做,市场运营也可以做,或者用excel分析数据,发现了一些有用的信息,然后这些信息可以指导你的business,这也属于数据挖掘。
            机器学习:machine learning,是计算机科学和统计学的交叉学科,基本目标是学习一个x->y的函数(映射),来做分类、聚类或者回归的工作。之所以经常和数据挖掘合在一起讲是因为现在好多数据挖掘的工作是通过机器学习提供的算法工具实现的,例如广告的ctr预估,PB级别的点击日志在通过典型的机器学习流程可以得到一个预估模型,从而提高互联网广告的点击率和回报率;个性化推荐,还是通过机器学习的一些算法分析平台上的各种购买,浏览和收藏日志,得到一个推荐模型,来预测你喜欢的商品。
            深度学习:deep learning,机器学习里面现在比较火的一个topic,本身是神经网络算法的衍生,在图像,语音等富媒体的分类和识别上取得了非常好的效果,所以各大研究机构和公司都投入了大量的人力做相关的研究和开发。
            总结:数据挖掘是个很宽泛的概念,数据挖掘常用方法大多来自于机器学习这门学科,深度学习也是来源于机器学习的算法模型,本质上是原来的神经网络。
        监督学习和无监督学习
            监督学习是指:利用一组已知类别的样本调整分类器的参数,使其达到所要求性能的过程,也称为监督训练或有导师训练。
            常见的监督学习算法
            1.线性回归
            2.逻辑回归
            3.朴素贝叶斯
            4.KNN(最近邻算法)
            5.决策树
            6.支持向量机
            7.某些可用于分类或预测功能的神经网络模型
            根据类别未知(没有被标记)的训练样本解决模式识别中的各种问题,称之为无监督学习。
            常见的无监督学习算法
            1.系统聚类
            2.K-means
            3.K-中值聚类
            3.K-众数法
            4.某些神经网络模型,比如BP神经网络等
            5.受限玻尔兹曼机
        概述
            MLlib is Apache Spark's scalable machine learning library.
            MLlib是一个构建在Spark上的、专门针对大数据处理的并发式高速机器学习库,其特点是采用较为先进的迭代式、内存存储的分析计算,使得数据的计算处理速度大大高于普通的数据处理引擎
            目前MLlib中已经有通用的学习算法和工具类,包括统计、分类、回归、聚类、降维等。
            MLlib采用Scala语言编写,Scala语言是运行在JVM上的一种函数式编程语言,特点就是可移植性强,“一次编写,到处运行”是其最重要的特点。
            借助于RDD数据统一输入格式,让用户可以在不同的IDE上编写数据处理程序,通过本地化测试后可以在略微修改运行参数后直接在集群上运行
            对结果的获取更为可视化和直观,不会因为运行系统底层的不同而造成结果的差异与改变。
        MLlib基本数据模型
            RDD是MLlib专用的数据格式,它参考了Scala函数式编程思想,并大胆引入统计分析概念,将存储数据转化成向量和矩阵的形式进行存储和计算,这样将数据定量化表示,能更准确地整理和分析结果。
            MLlib先天就支持较多的数据格式,从最基本的Spark数据集RDD到部署在集群中的向量和矩阵。同样,MLlib还支持部署在本地计算机中的本地化格式。
            一、本地向量
                MLlib使用的本地化存储类型是向量,这里的向量主要由两类构成:稀疏型数据集(spares)和密集型数据集(dense)
            二、向量标签的使用
                向量标签用于对MLlib中机器学习算法的不同值做标记。例如分类问题中,可以将不同的数据集分成若干份,以整型数0、1、2……进行标记,即程序的编写者可以根据自己的需要对数据进行标记。
            三、本地矩阵的使用
                大数据运算中,为了更好地提升计算效率,可以更多地使用矩阵运算进行数据处理。部署在单机中的本地矩阵就是一个很好的存储方法。
            分布式矩阵的使用
                1. 行矩阵
                    行矩阵是最基本的一种矩阵类型。行矩阵是以行作为基本方向的矩阵存储格式,列的作用相对较小。可以将其理解为行矩阵是一个巨大的特征向量的集合。每一行就是一个具有相同格式的向量数据,且每一行的向量内容都可以单独取出来进行操作。
                2. 带有行索引的行矩阵
                    单纯的行矩阵对其内容无法进行直接显示,当然可以通过调用其方法显示内部数据内容。有时候,为了方便在系统调试的过程中对行矩阵的内容进行观察和显示,MLlib提供了另外一个矩阵形式,即带有行索引的行矩阵。
        MLlib统计量基础
            数理统计中,基本统计量包括数据的平均值、方差,这是一组求数据统计量的基本内容。在MLlib中,统计量的计算主要用到Statistics类库。
            计算基本统计量
                这里主要调用colStats方法,接受的是RDD类型数据。
                这里需要注意的是,其工作和计算是以列为基础进行计算,调用不同的方法可以获得不同的统计量值,其方法内容如下表所示。
            二、计算相关系数
                相关系数是一种用来反映变量之间相关关系密切程度的统计指标,在现实中一般用于对两组数据的拟合和相似程度进行定量化分析。常用的一般是皮尔逊相关系数,MLlib中默认的相关系数求法也是使用皮尔逊相关系数法。
        距离度量和相似度度量
            在数据分析和数据挖掘的过程中,我们经常需要知道个体间差异的大小,进而评价个体的相似性和类别。而如何来度量数据之间的差异则成为关键,分类算法或聚类算法的本质都是基于某种度量(距离度量和相似度度量)来实现的。
            距离度量
                距离度量(Distance)用于衡量个体在空间上存在的距离,距离越远说明个体间的差异越大。
                欧几里得距离(Euclidean Distance)
                1.欧氏距离
                2.明可夫斯基距离
                3.曼哈顿距离
                4.切比雪夫距离
                5.马氏距离
            相似度度量
                1.向量空间余弦相似度(Cosine Similarity)
                2.皮尔森相关系数(Pearson Correlation Coefficient)
    Graphx
        概述
            Spark GraphX是一个分布式图处理框架,它是基于Spark平台提供对图计算和图挖掘简洁易用的而丰富的接口,极大的方便了对分布式图处理的需求。
            众所周知·,社交网络中人与人之间有很多关系链,例如Twitter、Facebook、微博和微信等,这些都是大数据产生的地方都需要图计算,现在的图处理基本都是分布式的图处理,而并非单机处理。Spark GraphX由于底层是基于Spark来处理的,所以天然就是一个分布式的图处理系统。
            图的分布式或者并行处理其实是把图拆分成很多的子图,然后分别对这些子图进行计算,计算的时候可以分别迭代进行分阶段的计算,即对图进行并行计算。
            图
            从图中我们可以看出:拿到Wikipedia的文档以后,可以变成Link Table形式的视图,然后基于Link Table形式的视图可以分析成Hyperlinks超链接,最后我们可以使用PageRank去分析得出Top Communities。在下面路径中的Editor Graph到Community,这个过程可以称之为Triangle Computation,这是计算三角形的一个算法,基于此会发现一个社区。从上面的分析中我们可以发现图计算有很多的做法和算法,同时也发现图和表格可以做互相的转换。
        框架
            设计GraphX时,点分割和GAS都已成熟,在设计和编码中针对它们进行了优化,并在功能和性能之间寻找最佳的平衡点。如同Spark本身,每个子模块都有一个核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。
            图
            如同Spark,GraphX的代码非常简洁。GraphX的核心代码只有3千多行,而在此之上实现的Pregel模式,只要短短的20多行。GraphX的代码结构整体下图所示,其中大部分的实现,都是围绕Partition的优化进行的。这在某种程度上说明了点分割的存储和相应的计算优化,的确是图计算框架的重点和难点。
            版本
                l早在0.5版本,Spark就带了一个小型的Bagel模块,提供了类似Pregel的功能。当然,这个版本还非常原始,性能和功能都比较弱,属于实验型产品。
                l到0.8版本时,鉴于业界对分布式图计算的需求日益见涨,Spark开始独立一个分支Graphx-Branch,作为独立的图计算模块,借鉴GraphLab,开始设计开发GraphX。
                l在0.9版本中,这个模块被正式集成到主干,虽然是Alpha版本,但已可以试用,小面包圈Bagel告别舞台。1.0版本,GraphX正式投入生产使用。
                图
                值得注意的是,GraphX目前依然处于快速发展中,从0.8的分支到0.9和1.0,每个版本代码都有不少的改进和重构。根据观察,在没有改任何代码逻辑和运行环境,只是升级版本、切换接口和重新编译的情况下,每个版本有10%~20%的性能提升。虽然和GraphLab的性能还有一定差距,但凭借Spark整体上的一体化流水线处理,社区热烈的活跃度及快速改进速度,GraphX具有强大的竞争力。
        实现分析
            如同Spark本身,每个子模块都有一个核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一种点和边都带属性的有向多重图。它扩展了Spark RDD的抽象,有Table和Graph两种视图,而只需要一份物理存储。两种视图都有自己独有的操作符,从而获得了灵活操作和执行效率。
            GraphX的底层设计有以下几个关键点。
                对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。这样对一个图的计算,最终在逻辑上,等价于一系列RDD的转换过程。因此,Graph最终具备了RDD的3个关键特性:Immutable、Distributed和Fault-Tolerant,其中最关键的是Immutable(不变性)。逻辑上,所有图的转换和操作都产生了一个新图;物理上,GraphX会有一定程度的不变顶点和边的复用优化,对用户透明。
                 两种视图底层共用的物理数据,由RDD[Vertex-Partition]和RDD[EdgePartition]这两个RDD组成。点和边实际都不是以表Collection[tuple]的形式存储的,而是由VertexPartition/EdgePartition在内部存储一个带索引结构的分片数据块,以加速不同视图下的遍历速度。不变的索引结构在RDD转换过程中是共用的,降低了计算和存储开销。
                图的分布式存储采用点分割模式,而且使用partitionBy方法,由用户指定不同的划分策略(PartitionStrategy)。划分策略会将边分配到各个EdgePartition,顶点Master分配到各个VertexPartition,EdgePartition也会缓存本地边关联点的Ghost副本。划分策略的不同会影响到所需要缓存的Ghost副本数量,以及每个EdgePartition分配的边的均衡程度,需要根据图的结构特征选取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut这四种策略。
         存储模式
             图存储模式
                巨型图的存储总体上有边分割和点分割两种存储方式。2013年,GraphLab2.0将其存储方式由边分割变为点分割,在性能上取得重大提升,目前基本上被业界广泛接受并使用。
                    l边分割(Edge-Cut):每个顶点都存储一次,但有的边会被打断分到两台机器上。这样做的好处是节省存储空间;坏处是对图进行基于边的计算时,对于一条两个顶点被分到不同机器上的边来说,要跨机器通信传输数据,内网通信流量大。
                    l点分割(Vertex-Cut):每条边只存储一次,都只会出现在一台机器上。邻居多的点会被复制到多台机器上,增加了存储开销,同时会引发数据同步问题。好处是可以大幅减少内网通信量。
                虽然两种方法互有利弊,但现在是点分割占上风,各种分布式图计算框架都将自己底层的存储形式变成了点分割。主要原因有以下两个。
                    1.磁盘价格下降,存储空间不再是问题,而内网的通信资源没有突破性进展,集群计算时内网带宽是宝贵的,时间比磁盘更珍贵。这点就类似于常见的空间换时间的策略。
                    2.在当前的应用场景中,绝大多数网络都是“无尺度网络”,遵循幂律分布,不同点的邻居数量相差非常悬殊。而边分割会使那些多邻居的点所相连的边大多数被分到不同的机器上,这样的数据分布会使得内网带宽更加捉襟见肘,于是边分割存储方式被渐渐抛弃了。
            GraphX存储模式
                Graphx借鉴PowerGraph,使用的是Vertex-Cut(点分割)方式存储图,用三个RDD存储图数据信息:
                lVertexTable(id, data):id为Vertex id,data为Edge data
                lEdgeTable(pid, src, dst, data):pid为Partion id,src为原定点id,dst为目的顶点id
                lRoutingTable(id, pid):id为Vertex id,pid为Partion id
                图
        计算模式
            图计算模式
                目前基于图的并行计算框架已经有很多,比如来自Google的Pregel、来自Apache开源的图计算框架Giraph/HAMA以及最为著名的GraphLab,其中Pregel、HAMA和Giraph都是非常类似的,都是基于BSP(Bulk Synchronous Parallell)模式。
                Bulk Synchronous Parallell,即整体同步并行,它将计算分成一系列的超步(superstep)的迭代(iteration)。从纵向上看,它是一个串行模式,而从横向上看,它是一个并行的模式,每两个superstep之间设置一个栅栏(barrier),即整体同步点,确定所有并行的计算都完成后再启动下一轮superstep。
                每一个超步(superstep)包含三部分内容
                    1.计算compute:每一个processor利用上一个superstep传过来的消息和本地的数据进行本地计算;
                    2.消息传递:每一个processor计算完毕后,将消息传递个与之关联的其它processors
                    3.整体同步点:用于整体同步,确定所有的计算和消息传递都进行完毕后,进入下一个superstep。
            GraphX计算模式
                图
            图的缓存
                每个图是由3个RDD组成,所以会占用更多的内存。相应图的cache、unpersist和checkpoint,更需要注意使用技巧。出于最大限度复用边的理念,GraphX的默认接口只提供了unpersistVertices方法。如果要释放边,调用g.edges.unpersist()方法才行,这给用户带来了一定的不便,但为GraphX的优化提供了便利和空间
                大体之意是根据GraphX中Graph的不变性,对g做操作并赋回给g之后,g已不是原来的g了,而且会在下一轮迭代使用,所以必须cache。另外,必须先用prevG保留住对原来图的引用,并在新图产生后,快速将旧图彻底释放掉。否则,十几轮迭代后,会有内存泄漏问题,很快耗光作业缓存空间。
            邻边聚合
                mrTriplets(mapReduceTriplets)是GraphX中最核心的一个接口。Pregel也基于它而来,所以对它的优化能很大程度上影响整个GraphX的性能
                它的计算过程为:map,应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;reduce,应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。
                mrTriplets最后返回的是一个VertexRDD[A],包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。
            进化的Pregel模式
                GraphX中的Pregel接口,并不严格遵循Pregel模式,它是一个参考GAS改进的Pregel模式。
                这种基于mrTrilets方法的Pregel模式,与标准Pregel的最大区别是,它的第2段参数体接收的是3个函数参数,而不接收messageList。它不会在单个顶点上进行消息遍历,而是将顶点的多个Ghost副本收到的消息聚合后,发送给Master副本,再使用vprog函数来更新点值。消息的接收和发送都被自动并行化处理,无需担心超级节点的问题。
                GraphX设计这个模式的用意。它综合了Pregel和GAS两者的优点,即接口相对简单,又保证性能,可以应对点分割的图存储模式,胜任符合幂律分布的自然图的大型计算。另外,值得注意的是,官方的Pregel版本是最简单的一个版本。对于复杂的业务场景,根据这个版本扩展一个定制的Pregel是很常见的做法。
            图算法工具包
                GraphX也提供了一套图算法工具包,方便用户对图进行分析。目前最新版本已支持PageRank、数三角形、最大连通图和最短路径等6种经典的图算法。这些算法的代码实现,目的和重点在于通用性。如果要获得最佳性能,可以参考其实现进行修改和扩展满足业务需求。另外,研读这些代码,也是理解GraphX编程最佳实践的好方法。

转载于:https://www.cnblogs.com/Striverchen/p/10557895.html

你可能感兴趣的文章
String比较
查看>>
Django之Models
查看>>
CSS 透明度级别 及 背景透明
查看>>
Linux 的 date 日期的使用
查看>>
PHP zip压缩文件及解压
查看>>
SOAP web service用AFNetWorking实现请求
查看>>
Java变量类型,实例变量 与局部变量 静态变量
查看>>
mysql操作命令梳理(4)-中文乱码问题
查看>>
Python环境搭建(安装、验证与卸载)
查看>>
一个.NET通用JSON解析/构建类的实现(c#)
查看>>
Windows Phone开发(5):室内装修 转:http://blog.csdn.net/tcjiaan/article/details/7269014
查看>>
详谈js面向对象 javascript oop,持续更新
查看>>
关于这次软件以及pda终端的培训
查看>>
jQuery上传插件Uploadify 3.2在.NET下的详细例子
查看>>
如何辨别一个程序员的水平高低?是靠发量吗?
查看>>
新手村之循环!循环!循环!
查看>>
正则表达式的用法
查看>>
线程安全问题
查看>>
SSM集成activiti6.0错误集锦(一)
查看>>
下拉刷新
查看>>