编程开源技术交流,分享技术与知识

网站首页 > 开源技术 正文

Spring Boot集成Spark SQL进行大数据分析

wxchong 2024-10-09 21:12:54 开源技术 13 ℃ 0 评论

Spring Boot集成Spark SQL进行大数据分析

一、大数据开发者的春天

最近一段时间学习Spark基础知识,Spark开发现在完成集成了SQL。具有SQL功底的同学可以无缝学习Spark,成为Spark开发工程师。

同Flink一样,Spark Streaming支持实时数据计算。Flink也完全兼容SQL,Hive更是数据仓库人员开发的圣剑。我曾经在电商公司做过专门的Hive SQL数据仓库开发,当时还是SSM架构,Hive定时任务作业。

现在Spring Boot称霸了Spring框架开发鳌头,由此衍生的Spring Cloud微服务架构,扩展出来Spring Cloud Alibaba架构。笔者曾经的文章都有讲到,本文分析Spring Boot集成Spark SQL开发的案例。

Spark SQL可以使用MySQL作为数据源,也可以使用Oracle、Hive作为数据源。本文采用Scala格式连接MySQL和Java格式连接Hive进行讲解,暂时连接Hive只测试了Java格式。

二、Maven项目引入

Maven工程引入Spark坐标,其版本号为

<java.version>1.8</java.version>

<scala.version>2.11</scala.version>

<spark.version>2.4.0</spark.version>

Spring Boot版本号为:

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.6.RELEASE</version>

<relativePath/>

</parent>

2.1构建Spring Boot项目

引入需要的Spring Boot依赖坐标:

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-aop</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

2.2引入Spark核心工具

引入Spark核心工具、Scala依赖

dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

引入Spark Streaming:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

引入MySQL数据库坐标:

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

</dependency>

<dependency>

<groupId>org.codehaus.janino</groupId>

<artifactId>janino</artifactId>

<version>3.0.8</version>

</dependency>

2.3引入Spark SQL坐标

引入Spark SQL依赖:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_${scala.version}</artifactId>

<version>${spark.version}</version>

</dependency>

引入Spring Boot整合器

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-configuration-processor</artifactId>

<optional>true</optional>

</dependency>

三、主启动工程采用MySQL作为数据源的Spark SQL

主启动工程Scala语言启动,因为引入了Scala语法在,支持Scala启动:

@Configuration

@EnableAutoConfiguration

@ComponentScan

@SpringBootApplication

class SpApp

object springsparkdemoApplication extends App{

SpringApplication.run(classOf[SpApp])

}

3.1核心配置

核心Config配置也采用Scala形式,如下:

import org.springframework.context.annotation.{Bean, Configuration}

import org.apache.spark.{SparkConf, SparkContext}


@Configuration

class HSparkStreamconfig {

private var local= "local"


private var sparkHome = "."


private var sparkName = "sparkProd"


@Bean

def SparkConf: SparkConf = {

var spConf = new SparkConf().setAppName(sparkName).setMaster(local)

return spConf

}

@Bean

def SparkContext = new SparkContext(SparkConf)

}

3.2RestController层

3.2.1示例Controller

数据测试层采用Scala格式RestController如下:

@RestController

@RequestMapping (value = Array("SparkController/spark/"))

@CrossOrigin

class SparkController{

...

}

具体的方法:

@Autowired

var sc:SparkContext = _

@GetMapping(value = Array("prod"))

def prod=

{

val url = "jdbc:mysql://192.16.1.39:3306/taskEmmDB?useUnicode=true&characterEncoding=UTF-8&user=emm&password=wty";

val prop = new Properties();


val sqlContext = new SQLContext(sc);

val df = sqlContext.read.jdbc(url, "t_mem", prop);


df.createOrReplaceTempView("t_mem")

//使用MySQL语句进行查询

var sf = sqlContext.sql("select * from t_mem where p_id = 0")


println("1.------------->" + sf.show().toString())

//println("1.------------->" + sf.rdd.partitions.size)

JSON.parseFull("{honke:1}")

}

此种是Scala连接MySQL的测试Spark SQL

四、采用Hive SQL作为数据源

前面讲到了Spark SQL连接MySQL的用法,下面举例使用Java JDBC连接Hive的方法,Hive作为数据源

加入Hive配置坐标:

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-jdbc</artifactId>

<version>1.2.1</version>

</dependency>

目前测试可用Java JDBC的示例连接Hive为:

String url = "jdbc:hive2://192.16.1.41:10000/default";

try {

Class.forName("org.apache.hive.jdbc.HiveDriver");

} catch (ClassNotFoundException e) {

log.error("异常:", e);

}

Connection conn = DriverManager.getConnection(url,"hadoop","");

Statement stmt = conn.createStatement();

String sql = "SELECT * FROM t_mem limit 10";

ResultSet res = stmt.executeQuery(sql);

while(res.next()){

System.out.println("p_id: "+res.getInt(1)+"\tname: "+res.getString(2)+"\tage:" + res.getInt(3));

}

本文简单讲解了Spark SQL的用法,后面将继续推出Spring Boot集成Hive、集成Flink、微服务系列及阿里微服务系列,敬请期待。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表