第十周,大数据建模学习应用于模具行业的培训班

2019-11-03 16:09:50

早上, 很早到书城, 今天最后一天培训, 还是在  SPARK 复习。


        开发 SPARK 使用  scala 、java 、 python 、 api 以及    shell 语言开发的  搜索引擎。


       缺点:    不适合  web服务  ,   dao层    、web爬虫 


       优点:    伯克利数据分析 生态圈  , 机器学习 , 数据挖掘, 数据库, 信息检索, 自然语言处理, 语音识别

                      以  spark core 为核心 , 从  hdfs  , amazon  s3  hbase 等持久读取数据


 image.png


SPARK SQL  基于内存的, 



image.png

查看数据库

深圳塑胶模具厂,深圳市模具厂,深圳模具厂,深圳模具,深圳塑胶模具

image.png

image.png

image.png

image.png


image.png



这个在前端也能看到,点击进去, 详细说明。


image.png

       image.png

image.png

image.png



image.png

image.png

image.pngimage.png


HIVE 不支持事物的, 增删改查。 ACRD 使用 


image.png



在那一台机器完成的, 可以看看

image.png


提交的人物, 由系统分配节点去跑。


image.png

image.png



image.png





HIVE 主要做 数据仓库  DW   , 

image.png



插入数据完成, 做一下  查询数据

image.png



将  HIVE 的内核, 改成  SPARK 以后, 提升速度 80倍

image.png



image.png



image.png




上面是   HOVE 的插入和查询的过程。


接着跑  SPARK SQL 的过程


image.png


image.png


所有的  数据库和表, 都是存储在  HDFS 上的。


现在要用  SPARK SQL 来执行。


image.png


image.png


image.png


show database  

use  

select 

image.png


image.png




这个是 SPARK的管理页面


image.png



image.png

image.png


image.png

image.png




image.png

image.png

image.png

image.png

image.png

image.png

image.png


image.png

image.png

image.png


image.png


DAG 的有向性的结合图



DAG Scheduler 





image.png


RDD 经过 4个 聚合步骤后, 形成一个  DAG , 多个 DAG 组合编程  DAG计划表


下面是  scala 写的


image.png



要统计下面文件中的个数


image.png


image.png

上述虽然是  一行代码, 但是也显示的淋漓尽致


image.png


image.png




运行的状况


image.png



image.png


image.png




image.png


image.png


image.png



管理页面的结果image.png


image.png


环境配置image.png


image.png



image.png


image.png


SPARK   单词统计的 DEMO  , 接下来再看看


image.png

image.png


image.png



SPARK STREM 消费kaFka 的数据,  管理者确定是   ZooKeeper


image.png



image.png



image.png


下午讲解     Klin       做一些报表的功能, SPARK 




image.png

要搭建不同的集群, 由原来的 store 的程序, 迁移到  spark 上来,    Flink 是把交互式查询和实时计算合在一起了。

Flink 也是用 Scala 来写。

image.png


1、客户端提交作业后,启动Driver,Driver是Spark作业的Master(也就是通过Driver来启动Receiver,定时去启动任务的处理,

注意的是,驱动启动任务会受前一个任务执行的影响。也就是前一个任务没有执行完成后,是不会启动后边的任务的。 

所以,注意你的streaming的执行时间,绝对不要超过Recive数据的时间)


2、每个作业包含多个Executor,每个Executor以线程的方式运行task,Spark Streaming至少包含一个Receiver task。

(一个Executor就是一个spark进程,在yarn中就是一个container,这个大家应该知道。

然后Receiver task是在driver中创建的,我理解一个Receiver是运行在一个Executor中的。

然后如果想要创建多个Receiver,那么需要大概这样做(1 to 10).map(_.createStream…),这样就能创建10个receiver task啦。 

注意这个数量当然不能超过你的结点数量啦。 

还有个问题,通常使用kafka比较合适,因为kafka是stream向kafka来poll数据。

而他妈的flume默认只支持pull,如果想支持poll,那需要定制sink,那真是太恶心了。)


3、Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。

(默认情况下接受数据是200毫秒生成一个block,我理解一个block应该是一个partition?

这个还不确定,需要对照源代码看一下;

然后会把生成的Block随机扔到不同的Executor,同时,driver去派发任务时,也会找到就近的Executor。

我理解,节点中的所有executor都应该会有数据才对)


4、ReceiverTracker维护Receiver汇报的BlockId。

    (这个ReceiverTracker应该是维护在Driver中,Driver会根据维护的这些数据块进行任务的派发)


5、Driver定时生成JobGenerator,根据DStream的关系生成逻辑RDD,然后创建Jobset,交给JobScheduler。


6、JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,

      每个stage包含一到多个task。(我记得DAGScheduler会对任务做一层优化)


7、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。


8、当tasks,stages,jobset完成后,单个batch(批处理)才算完成。


image.png

具体流程:

  1. 客户端提交作业后启动Driver,Driver是spark作业的Master

  2. 每个作业包含多个Executor,每个Executor以线程的方式运行task,Spark Streaming至少包含一个receiver task

  3. Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。

  4. ReceiverTracker维护Reciver汇报的BlockId。

  5. Driver定时启动JobGenerator,根据Dstream的关系生成逻辑RDD,然后创建Jobset,交给JobScheduler。

  6. JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个stage包含一到多个task。

  7. TaskScheduler负责把task调度到Executor上,并维护task的运行状态。

  8. 当tasks,stages,jobset完成后,单个batch才算完成。



image.png



---------------------------------------------------   第十周  卢老师 讲课笔记


Spark Streaming原理:


Spark Streaming是基于spark的流式批处理引擎,其基本原理是把输入的数据以某一时间间隔批量的处理,当批处理缩短到秒级时,便可以用于处理实时数据流。


kafka


flume                                        HDFS


hdfs      -------> spark streaming --------> DataBase


zeromq                   |                   Dashboards

                               | 

twitter                    |

                              |

input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data




live input stream --> spark Streaming --->t t t --->spark job --->results



sparkstreaming架构


spark streaming作业流程:


Driver:

receivertracker

jobGenerator

JobScheduler

DagScheduler

TaskScheduler


BlockManagerMaster


Executor:

Receiver

BlockManagerSlave


Executor:

Task

BlockManagerSlave



运行机制:

1、客户端提交作业后启动一个Driver,Driver是spark作业的Master。


2、每个作业包含多个Executor,每个Executor以线程的方式运行Task,Spark Streaing至少包含一个receiver task。


3、Receiver接收数据后生成Block,并把Blockid汇报给Driver,然后备份到另外一个Executor上


4、ReceiverTracker维护Recivver汇报的BlockId。


5、Driver定时启动JobGenerator,根据Dstream的关系生成逻辑RDD,然后创建JobSet,交给JobScheduler.


6、JobScheduler负责调度JobSet,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个Stages包含一个到多个Task。


7、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。


8、当tasks、stages、jobset完成后,单个batch才算完成。





Spark Streaming消费Kafka数据:


import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


val kafkaParams = Map[String, Object](

  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",

  "key.deserializer" -> classOf[StringDeserializer],

  "value.deserializer" -> classOf[StringDeserializer],

  "group.id" -> "use_a_separate_group_id_for_each_stream",

  "auto.offset.reset" -> "latest",

  "enable.auto.commit" -> (false: java.lang.Boolean)

)


val topics = Array("topicA", "topicB")

val stream = KafkaUtils.createDirectStream[String, String](

  streamingContext,

  PreferConsistent,

  Subscribe[String, String](topics, kafkaParams)

)


stream.map(record => (record.key, record.value))





Spark统计单词次数:


text_file = sc.textFile("hdfs://...")

counts = text_file.flatMap(lambda line: line.split(" ")) \

             .map(lambda word: (word, 1)) \

             .reduceByKey(lambda a, b: a + b)

counts.saveAsTextFile("hdfs://...")




SparkSQL数据查询:


// Creates a DataFrame based on a table named "people"

// stored in a MySQL database.

val url =

  "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"

val df = sqlContext

  .read

  .format("jdbc")

  .option("url", url)

  .option("dbtable", "people")

  .load()


// Looks the schema of this DataFrame.

df.printSchema()


// Counts people by age

val countsByAge = df.groupBy("age").count()

countsByAge.show()


// Saves countsByAge to S3 in the JSON format.

countsByAge.write.format("json").save("s3a://...")




Spark机器学习实现逻辑回归算法:


// Every record of this DataFrame contains the label and

// features represented by a vector.

val df = sqlContext.createDataFrame(data).toDF("label", "features")


// Set parameters for the algorithm.

// Here, we limit the number of iterations to 10.

val lr = new LogisticRegression().setMaxIter(10)


// Fit the model to the data.

val model = lr.fit(df)


// Inspect the model: get the feature weights.

val weights = model.weights


// Given a dataset, predict each point's label, and show the results.

model.transform(df).show()









-------------------------------------------------------------------------------------------------------


最后做一次复习 ,  有始有终。


image.png


image.png

-------在最下面增加 2行代码。


image.png

如果有 几千台服务器, 必须配置免登录,  生成的密钥, 追加到这个文件中去。


image.png


image.png

image.png


hadoop 的官方网站, 有非常详细的内容。


image.png


image.png


多种方式都可以查看是不是启动成功


image.png


image.png


image.png


image.png

50070端口, HDFS的端口


搭建完成后,     


image.png



下面是  Yarn 的配置信息

image.png


需要 shuffle  的, 需要增加 各类  shuffle .


FIFO     FAIR    先进先出    公平调度  


上面是 Yarn 的配置文件, 这是第三周讲的


第四周讲   Hbase , 是 google  的大表实现

image.png

image.png

image.png

image.png

image.png

image.png


image.png


image.png


image.png


image.png


image.png



image.png

image.png


image.png


image.png


image.pngimage.png

image.png


image.png


单机版和集群版的不同

image.png


image.png

image.png


image.png

image.png


image.png

image.png

image.png

image.png



image.pngimage.png


image.png


第5周 、 第六周 


image.png


image.png

image.png


image.png

image.png

一切阶是对像


image.png

image.png


image.png

image.png

image.png



image.png


image.png


以上是   5周   Scala  



image.png




image.png


image.png


image.png



image.png



image.png


image.png


image.png



image.png

image.png


image.png


image.png




















首页
产品
新闻
联系