第十周,大数据建模学习应用于模具行业的培训班
早上, 很早到书城, 今天最后一天培训, 还是在 SPARK 复习。
开发 SPARK 使用 scala 、java 、 python 、 api 以及 shell 语言开发的 搜索引擎。
缺点: 不适合 web服务 , dao层 、web爬虫
优点: 伯克利数据分析 生态圈 , 机器学习 , 数据挖掘, 数据库, 信息检索, 自然语言处理, 语音识别
以 spark core 为核心 , 从 hdfs , amazon s3 hbase 等持久读取数据
SPARK SQL 基于内存的,
查看数据库
这个在前端也能看到,点击进去, 详细说明。
HIVE 不支持事物的, 增删改查。 ACRD 使用
在那一台机器完成的, 可以看看
提交的人物, 由系统分配节点去跑。
HIVE 主要做 数据仓库 DW ,
插入数据完成, 做一下 查询数据
将 HIVE 的内核, 改成 SPARK 以后, 提升速度 80倍
上面是 HOVE 的插入和查询的过程。
接着跑 SPARK SQL 的过程
所有的 数据库和表, 都是存储在 HDFS 上的。
现在要用 SPARK SQL 来执行。
show database
use
select
这个是 SPARK的管理页面
DAG 的有向性的结合图
DAG Scheduler
RDD 经过 4个 聚合步骤后, 形成一个 DAG , 多个 DAG 组合编程 DAG计划表
下面是 scala 写的
要统计下面文件中的个数
上述虽然是 一行代码, 但是也显示的淋漓尽致
运行的状况
管理页面的结果
环境配置
SPARK 单词统计的 DEMO , 接下来再看看
SPARK STREM 消费kaFka 的数据, 管理者确定是 ZooKeeper
下午讲解 Klin 做一些报表的功能, SPARK
要搭建不同的集群, 由原来的 store 的程序, 迁移到 spark 上来, Flink 是把交互式查询和实时计算合在一起了。
Flink 也是用 Scala 来写。
1、客户端提交作业后,启动Driver,Driver是Spark作业的Master(也就是通过Driver来启动Receiver,定时去启动任务的处理,
注意的是,驱动启动任务会受前一个任务执行的影响。也就是前一个任务没有执行完成后,是不会启动后边的任务的。
所以,注意你的streaming的执行时间,绝对不要超过Recive数据的时间)
2、每个作业包含多个Executor,每个Executor以线程的方式运行task,Spark Streaming至少包含一个Receiver task。
(一个Executor就是一个spark进程,在yarn中就是一个container,这个大家应该知道。
然后Receiver task是在driver中创建的,我理解一个Receiver是运行在一个Executor中的。
然后如果想要创建多个Receiver,那么需要大概这样做(1 to 10).map(_.createStream…),这样就能创建10个receiver task啦。
注意这个数量当然不能超过你的结点数量啦。
还有个问题,通常使用kafka比较合适,因为kafka是stream向kafka来poll数据。
而他妈的flume默认只支持pull,如果想支持poll,那需要定制sink,那真是太恶心了。)
3、Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。
(默认情况下接受数据是200毫秒生成一个block,我理解一个block应该是一个partition?
这个还不确定,需要对照源代码看一下;
然后会把生成的Block随机扔到不同的Executor,同时,driver去派发任务时,也会找到就近的Executor。
我理解,节点中的所有executor都应该会有数据才对)
4、ReceiverTracker维护Receiver汇报的BlockId。
(这个ReceiverTracker应该是维护在Driver中,Driver会根据维护的这些数据块进行任务的派发)
5、Driver定时生成JobGenerator,根据DStream的关系生成逻辑RDD,然后创建Jobset,交给JobScheduler。
6、JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,
每个stage包含一到多个task。(我记得DAGScheduler会对任务做一层优化)
7、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。
8、当tasks,stages,jobset完成后,单个batch(批处理)才算完成。
具体流程:
客户端提交作业后启动Driver,Driver是spark作业的Master。
每个作业包含多个Executor,每个Executor以线程的方式运行task,Spark Streaming至少包含一个receiver task。
Receiver接收数据后生成Block,并把BlockId汇报给Driver,然后备份到另外一个Executor上。
ReceiverTracker维护Reciver汇报的BlockId。
Driver定时启动JobGenerator,根据Dstream的关系生成逻辑RDD,然后创建Jobset,交给JobScheduler。
JobScheduler负责调度Jobset,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个stage包含一到多个task。
TaskScheduler负责把task调度到Executor上,并维护task的运行状态。
当tasks,stages,jobset完成后,单个batch才算完成。
--------------------------------------------------- 第十周 卢老师 讲课笔记
Spark Streaming原理:
Spark Streaming是基于spark的流式批处理引擎,其基本原理是把输入的数据以某一时间间隔批量的处理,当批处理缩短到秒级时,便可以用于处理实时数据流。
kafka
flume HDFS
hdfs -------> spark streaming --------> DataBase
zeromq | Dashboards
|
twitter |
|
input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data
live input stream --> spark Streaming --->t t t --->spark job --->results
sparkstreaming架构
spark streaming作业流程:
Driver:
receivertracker
jobGenerator
JobScheduler
DagScheduler
TaskScheduler
BlockManagerMaster
Executor:
Receiver
BlockManagerSlave
Executor:
Task
BlockManagerSlave
运行机制:
1、客户端提交作业后启动一个Driver,Driver是spark作业的Master。
2、每个作业包含多个Executor,每个Executor以线程的方式运行Task,Spark Streaing至少包含一个receiver task。
3、Receiver接收数据后生成Block,并把Blockid汇报给Driver,然后备份到另外一个Executor上
4、ReceiverTracker维护Recivver汇报的BlockId。
5、Driver定时启动JobGenerator,根据Dstream的关系生成逻辑RDD,然后创建JobSet,交给JobScheduler.
6、JobScheduler负责调度JobSet,交给DAGScheduler,DAGScheduler根据逻辑RDD,生成相应的Stages,每个Stages包含一个到多个Task。
7、TaskScheduler负责把task调度到Executor上,并维护task的运行状态。
8、当tasks、stages、jobset完成后,单个batch才算完成。
Spark Streaming消费Kafka数据:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
Spark统计单词次数:
text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")
SparkSQL数据查询:
// Creates a DataFrame based on a table named "people"
// stored in a MySQL database.
val url =
"jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"
val df = sqlContext
.read
.format("jdbc")
.option("url", url)
.option("dbtable", "people")
.load()
// Looks the schema of this DataFrame.
df.printSchema()
// Counts people by age
val countsByAge = df.groupBy("age").count()
countsByAge.show()
// Saves countsByAge to S3 in the JSON format.
countsByAge.write.format("json").save("s3a://...")
Spark机器学习实现逻辑回归算法:
// Every record of this DataFrame contains the label and
// features represented by a vector.
val df = sqlContext.createDataFrame(data).toDF("label", "features")
// Set parameters for the algorithm.
// Here, we limit the number of iterations to 10.
val lr = new LogisticRegression().setMaxIter(10)
// Fit the model to the data.
val model = lr.fit(df)
// Inspect the model: get the feature weights.
val weights = model.weights
// Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
-------------------------------------------------------------------------------------------------------
最后做一次复习 , 有始有终。
-------在最下面增加 2行代码。
如果有 几千台服务器, 必须配置免登录, 生成的密钥, 追加到这个文件中去。
hadoop 的官方网站, 有非常详细的内容。
多种方式都可以查看是不是启动成功
50070端口, HDFS的端口
搭建完成后,
下面是 Yarn 的配置信息
需要 shuffle 的, 需要增加 各类 shuffle .
FIFO FAIR 先进先出 公平调度
上面是 Yarn 的配置文件, 这是第三周讲的
第四周讲 Hbase , 是 google 的大表实现
单机版和集群版的不同
第5周 、 第六周
一切阶是对像
以上是 5周 Scala