• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

spark

武飞扬头像
疼迅扣扣
帮助1

一、安装scale插件

IntelliJ IDEA安装scala插件并创建scala示例 - 打翻了牛顿吃饭的碗 - 博客园

 二、创建maven项目

学新通

学新通

学新通

学新通

删除src

学新通

创建sprak-core module 

学新通 学新通

学新通学新通 学新通

增加scale

学新通

 后面改成scale2.12.2了,因为我用的是spark3.0.0学新通

学新通

 学新通

学新通

 学新通

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>org.apache.spark</groupId>
  4.  
    <artifactId>spark-core_2.12</artifactId>
  5.  
    <version>3.0.0</version>
  6.  
    </dependency>
  7.  
    </dependencies>
  1.  
    package com.sparkcore
  2.  
     
  3.  
     
  4.  
    import org.apache.spark.{SparkConf, SparkContext}
  5.  
     
  6.  
    object Spark01_WorkCount {
  7.  
    def main(args: Array[String]): Unit = {
  8.  
    //Application
  9.  
     
  10.  
    //Spark框架
  11.  
     
  12.  
    //TODO 建立和spark框架的连接
  13.  
    //JDBC:Connection
  14.  
    var sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
  15.  
    var sc = new SparkContext(sparkConf);
  16.  
     
  17.  
    //TODO 执行业务操作
  18.  
     
  19.  
    //TODO 关闭连接
  20.  
    sc.stop();
  21.  
    }
  22.  
    }
学新通

 学新通

 统计单词数量

学新通

学新通

学新通

  1.  
    package com.sparkcore
  2.  
     
  3.  
     
  4.  
    import org.apache.spark.rdd.RDD
  5.  
    import org.apache.spark.{SparkConf, SparkContext, rdd}
  6.  
     
  7.  
    object Spark01_WorkCount {
  8.  
    def main(args: Array[String]): Unit = {
  9.  
    //Application
  10.  
     
  11.  
    //Spark框架
  12.  
     
  13.  
    //TODO 建立和spark框架的连接
  14.  
    //JDBC:Connection
  15.  
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
  16.  
    val sc = new SparkContext(sparkConf);
  17.  
     
  18.  
    //TODO 执行业务操作
  19.  
    //1、读取文件,获取一个一个的单词(分词)
  20.  
    // hello world
  21.  
    val lines: RDD[String] = sc.textFile("datas")
  22.  
     
  23.  
    //2、将一行数据进行拆分,形成一个一个的单词(分词)
  24.  
    // "hello world" => hello ,world, hello ,world
  25.  
    val words: RDD[String] = lines.flatMap(_.split(" "))
  26.  
     
  27.  
    //3、将数据根据单词进行分组,便于统计
  28.  
    //(hello,hello,hello),(world,world)
  29.  
    val wordGroup: RDD[(String,Iterable[String])] = words.groupBy(word=>word)
  30.  
     
  31.  
    //4、对分词后的数据进行转换
  32.  
    //(hello,hello,hello),(world,world)
  33.  
    //(hello,3),(world,2)
  34.  
    val wordToCount = wordGroup.map{
  35.  
    case (word, list) => {
  36.  
    (word,list.size)
  37.  
    }
  38.  
    }
  39.  
     
  40.  
     
  41.  
    //5、将转换结果采集到控制台打印出来
  42.  
    val array:Array[(String,Int)] =wordToCount.collect()
  43.  
    array.foreach(println)
  44.  
     
  45.  
    //TODO 关闭连接
  46.  
    sc.stop();
  47.  
    }
  48.  
    }
学新通

 运行结果:学新通

  1.  
    package com.sparkcore
  2.  
     
  3.  
    import org.apache.spark.rdd.RDD
  4.  
    import org.apache.spark.{SparkConf, SparkContext}
  5.  
     
  6.  
    object Spark02_WorkCount {
  7.  
    def main(args: Array[String]): Unit = {
  8.  
    //Application
  9.  
     
  10.  
    //Spark框架
  11.  
     
  12.  
    //TODO 建立和spark框架的连接
  13.  
    //JDBC:Connection
  14.  
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount");
  15.  
    val sc = new SparkContext(sparkConf);
  16.  
     
  17.  
    //TODO 执行业务操作
  18.  
    //1、读取文件,获取一个一个的单词(分词)
  19.  
    // hello world
  20.  
    val lines: RDD[String] = sc.textFile("datas")
  21.  
     
  22.  
    //2、将一行数据进行拆分,形成一个一个的单词(分词)
  23.  
    // "hello world" => hello ,world, hello ,world
  24.  
    val words: RDD[String] = lines.flatMap(_.split(" "))
  25.  
    val wordToOne = words.map(
  26.  
    word => (word, 1)
  27.  
    )
  28.  
    val wordGroup: RDD[(String,Iterable[(String,Int)])] = wordToOne.groupBy(
  29.  
    t => t._1
  30.  
    )
  31.  
     
  32.  
    val wordToCount = wordGroup.map {
  33.  
    case (word, list) => {
  34.  
    list.reduce(
  35.  
    (t1, t2) => {
  36.  
    (t1._1, t1._2 t2._2)
  37.  
    }
  38.  
    )
  39.  
    }
  40.  
    }
  41.  
    val array: Array[(String,Int)]= wordToCount.collect()
  42.  
    array.foreach(println)
  43.  
     
  44.  
     
  45.  
    //TODO 关闭连接
  46.  
    sc.stop();
  47.  
    }
  48.  
    }
学新通

学新通

下载spark 

Apache Downloads

并上传到linux服务器上并解压缩学新通

重命名:

学新通 此处省略集群安装。

启动集群

在master节点执行sbin/start-all.sh ,启动后可以在master节点看到Master,在node1节点看到Worker节点

学新通

学新通

 在master-backup节点执行sbin/start-master.sh

学新通

 在/usr/local/spark/data/下新增word.txt

内容:

  1.  
    hello scale
  2.  
    hello world
  3.  
    hello spark
  4.  
    hello spark

使用xsync同步到其他节点。xsync word.txt

在master节点执行bin/spark-shell

学新通

然后执行sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ _).collect

 学新通

学新通

学新通

提交应用程序

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077,master-backup:7077 \
./examples/jars/spark-examples_2.11-2.4.8.jar \
10

说明:

--class表示要执行程序的主函数的类,也可以替换成我们自己的应用程序

--master 部署模式。local[2]表示本地模式,数字表示分配的虚拟CPU核数量。spark://master:7077,master-backup:7077表示集群模式。Yarn环境。

spark-examples_2.11-2.4.8.jar为运行的应用类所在的jar包,实际使用时,可以设定为自己的jar包

10表示程序main函数的入口参数,用于设定当前应用的任务数量

学新通

配置历史服务

由于spark-shell停止后,集群监控master:4040就看不到历史任务的运行情况了,所以开发时都配置历史服务器记录任务运行情况。

1)修改spark-defaults.conf.template为spark-defaults.conf

2)修改spark-defaults.conf文件,配置日志存储路径

  1.  
    spark.eventLog.enabled true
  2.  
    spark.eventLog.dir hdfs://master:9000/directory

注意:需要启动Hadoop集群,HDFS上的diretory 目录需要提前存在。

sbin/start-dfs.sh

hadoop fs -mkdir /directory

3)修改spark-env.sh文件,添加日志配置

  1.  
    export SPARK_HISTORY_OPTS="
  2.  
    -Dspark.history.ui.port=18080
  3.  
    -Dspark.history.fs.logDirectory=hdfs://master:9000/directory
  4.  
    -Dspark.history.retainedApplications=30"

参数1:WEB UI访问的端口号为18080

参数2:指定历史服务器日志存储路径

参数3:指定保存Application历史记录的个数,如果超过了这个数,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示 的应用数

注意:此处的端口要看Hadoop的core-site.xml和hdfs-site.xml配置

hdfs-site.xml

学新通

 core-site.xml

学新通

4)分发

xsync conf

5)重启集群和历史服务

sbin/start-all.sh

sbin/start-history-server.sh

学新通

学新通

学新通 执行bin/spark-shell

学新通

学新通学新通

点击“Show incomplete applications”可以看到这个任务。

学新通

 学新通

学新通

学新通 执行bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077,master-backup:7077 \
./examples/jars/spark-examples_2.11-2.4.8.jar \
10时发现State一直是waiting状态,停掉spark-shell后就可以了,不知道是什么原因。可能是因为只有一个worker

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaceai
系列文章
更多 icon
同类精品
更多 icon
继续加载