spark
一、安装scale插件
IntelliJ IDEA安装scala插件并创建scala示例 - 打翻了牛顿吃饭的碗 - 博客园
二、创建maven项目
删除src
创建sprak-core module
增加scale
后面改成scale2.12.2了,因为我用的是spark3.0.0
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.spark</groupId>
-
<artifactId>spark-core_2.12</artifactId>
-
<version>3.0.0</version>
-
</dependency>
-
</dependencies>
-
package com.sparkcore
-
-
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
object Spark01_WorkCount {
-
def main(args: Array[String]): Unit = {
-
//Application
-
-
//Spark框架
-
-
//TODO 建立和spark框架的连接
-
//JDBC:Connection
-
var sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
-
var sc = new SparkContext(sparkConf);
-
-
//TODO 执行业务操作
-
-
//TODO 关闭连接
-
sc.stop();
-
}
-
}
统计单词数量
-
package com.sparkcore
-
-
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.{SparkConf, SparkContext, rdd}
-
-
object Spark01_WorkCount {
-
def main(args: Array[String]): Unit = {
-
//Application
-
-
//Spark框架
-
-
//TODO 建立和spark框架的连接
-
//JDBC:Connection
-
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
-
val sc = new SparkContext(sparkConf);
-
-
//TODO 执行业务操作
-
//1、读取文件,获取一个一个的单词(分词)
-
// hello world
-
val lines: RDD[String] = sc.textFile("datas")
-
-
//2、将一行数据进行拆分,形成一个一个的单词(分词)
-
// "hello world" => hello ,world, hello ,world
-
val words: RDD[String] = lines.flatMap(_.split(" "))
-
-
//3、将数据根据单词进行分组,便于统计
-
//(hello,hello,hello),(world,world)
-
val wordGroup: RDD[(String,Iterable[String])] = words.groupBy(word=>word)
-
-
//4、对分词后的数据进行转换
-
//(hello,hello,hello),(world,world)
-
//(hello,3),(world,2)
-
val wordToCount = wordGroup.map{
-
case (word, list) => {
-
(word,list.size)
-
}
-
}
-
-
-
//5、将转换结果采集到控制台打印出来
-
val array:Array[(String,Int)] =wordToCount.collect()
-
array.foreach(println)
-
-
//TODO 关闭连接
-
sc.stop();
-
}
-
}
运行结果:
-
package com.sparkcore
-
-
import org.apache.spark.rdd.RDD
-
import org.apache.spark.{SparkConf, SparkContext}
-
-
object Spark02_WorkCount {
-
def main(args: Array[String]): Unit = {
-
//Application
-
-
//Spark框架
-
-
//TODO 建立和spark框架的连接
-
//JDBC:Connection
-
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
-
val sc = new SparkContext(sparkConf);
-
-
//TODO 执行业务操作
-
//1、读取文件,获取一个一个的单词(分词)
-
// hello world
-
val lines: RDD[String] = sc.textFile("datas")
-
-
//2、将一行数据进行拆分,形成一个一个的单词(分词)
-
// "hello world" => hello ,world, hello ,world
-
val words: RDD[String] = lines.flatMap(_.split(" "))
-
val wordToOne = words.map(
-
word => (word, 1)
-
)
-
val wordGroup: RDD[(String,Iterable[(String,Int)])] = wordToOne.groupBy(
-
t => t._1
-
)
-
-
val wordToCount = wordGroup.map {
-
case (word, list) => {
-
list.reduce(
-
(t1, t2) => {
-
(t1._1, t1._2 t2._2)
-
}
-
)
-
}
-
}
-
val array: Array[(String,Int)]= wordToCount.collect()
-
array.foreach(println)
-
-
-
//TODO 关闭连接
-
sc.stop();
-
}
-
}
下载spark
并上传到linux服务器上并解压缩
重命名:
此处省略集群安装。
启动集群
在master节点执行sbin/start-all.sh ,启动后可以在master节点看到Master,在node1节点看到Worker节点
在master-backup节点执行sbin/start-master.sh
在/usr/local/spark/data/下新增word.txt
内容:
-
hello scale
-
hello world
-
hello spark
-
hello spark
使用xsync同步到其他节点。xsync word.txt
在master节点执行bin/spark-shell
然后执行sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).collect
提交应用程序
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077,master-backup:7077 \
./examples/jars/spark-examples_2.11-2.4.8.jar \
10
说明:
--class表示要执行程序的主函数的类,也可以替换成我们自己的应用程序
--master 部署模式。local[2]表示本地模式,数字表示分配的虚拟CPU核数量。spark://master:7077,master-backup:7077表示集群模式。Yarn环境。
spark-examples_2.11-2.4.8.jar为运行的应用类所在的jar包,实际使用时,可以设定为自己的jar包
10表示程序main函数的入口参数,用于设定当前应用的任务数量
配置历史服务
由于spark-shell停止后,集群监控master:4040就看不到历史任务的运行情况了,所以开发时都配置历史服务器记录任务运行情况。
1)修改spark-defaults.conf.template为spark-defaults.conf
2)修改spark-defaults.conf文件,配置日志存储路径
-
spark.eventLog.enabled true
-
spark.eventLog.dir hdfs://master:9000/directory
注意:需要启动Hadoop集群,HDFS上的diretory 目录需要提前存在。
sbin/start-dfs.sh
hadoop fs -mkdir /directory
3)修改spark-env.sh文件,添加日志配置
-
export SPARK_HISTORY_OPTS="
-
-Dspark.history.ui.port=18080
-
-Dspark.history.fs.logDirectory=hdfs://master:9000/directory
-
-Dspark.history.retainedApplications=30"
参数1:WEB UI访问的端口号为18080
参数2:指定历史服务器日志存储路径
参数3:指定保存Application历史记录的个数,如果超过了这个数,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示 的应用数
注意:此处的端口要看Hadoop的core-site.xml和hdfs-site.xml配置
hdfs-site.xml
core-site.xml
4)分发
xsync conf
5)重启集群和历史服务
sbin/start-all.sh
sbin/start-history-server.sh
执行bin/spark-shell
点击“Show incomplete applications”可以看到这个任务。
执行bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077,master-backup:7077 \
./examples/jars/spark-examples_2.11-2.4.8.jar \
10时发现State一直是waiting状态,停掉spark-shell后就可以了,不知道是什么原因。可能是因为只有一个worker
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgaceai
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24