网站首页 > 开源技术 正文
Spring Boot集成Spark SQL进行大数据分析
一、大数据开发者的春天
最近一段时间学习Spark基础知识,Spark开发现在完成集成了SQL。具有SQL功底的同学可以无缝学习Spark,成为Spark开发工程师。
同Flink一样,Spark Streaming支持实时数据计算。Flink也完全兼容SQL,Hive更是数据仓库人员开发的圣剑。我曾经在电商公司做过专门的Hive SQL数据仓库开发,当时还是SSM架构,Hive定时任务作业。
现在Spring Boot称霸了Spring框架开发鳌头,由此衍生的Spring Cloud微服务架构,扩展出来Spring Cloud Alibaba架构。笔者曾经的文章都有讲到,本文分析Spring Boot集成Spark SQL开发的案例。
Spark SQL可以使用MySQL作为数据源,也可以使用Oracle、Hive作为数据源。本文采用Scala格式连接MySQL和Java格式连接Hive进行讲解,暂时连接Hive只测试了Java格式。
二、Maven项目引入
Maven工程引入Spark坐标,其版本号为
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
Spring Boot版本号为:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/>
</parent>
2.1构建Spring Boot项目
引入需要的Spring Boot依赖坐标:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2引入Spark核心工具
引入Spark核心工具、Scala依赖
dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入Spark Streaming:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入MySQL数据库坐标:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
2.3引入Spark SQL坐标
引入Spark SQL依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入Spring Boot整合器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
三、主启动工程采用MySQL作为数据源的Spark SQL
主启动工程Scala语言启动,因为引入了Scala语法在,支持Scala启动:
@Configuration
@EnableAutoConfiguration
@ComponentScan
@SpringBootApplication
class SpApp
object springsparkdemoApplication extends App{
SpringApplication.run(classOf[SpApp])
}
3.1核心配置
核心Config配置也采用Scala形式,如下:
import org.springframework.context.annotation.{Bean, Configuration}
import org.apache.spark.{SparkConf, SparkContext}
@Configuration
class HSparkStreamconfig {
private var local= "local"
private var sparkHome = "."
private var sparkName = "sparkProd"
@Bean
def SparkConf: SparkConf = {
var spConf = new SparkConf().setAppName(sparkName).setMaster(local)
return spConf
}
@Bean
def SparkContext = new SparkContext(SparkConf)
}
3.2RestController层
3.2.1示例Controller
数据测试层采用Scala格式RestController如下:
@RestController
@RequestMapping (value = Array("SparkController/spark/"))
@CrossOrigin
class SparkController{
...
}
具体的方法:
@Autowired
var sc:SparkContext = _
@GetMapping(value = Array("prod"))
def prod=
{
val url = "jdbc:mysql://192.16.1.39:3306/taskEmmDB?useUnicode=true&characterEncoding=UTF-8&user=emm&password=wty";
val prop = new Properties();
val sqlContext = new SQLContext(sc);
val df = sqlContext.read.jdbc(url, "t_mem", prop);
df.createOrReplaceTempView("t_mem")
//使用MySQL语句进行查询
var sf = sqlContext.sql("select * from t_mem where p_id = 0")
println("1.------------->" + sf.show().toString())
//println("1.------------->" + sf.rdd.partitions.size)
JSON.parseFull("{honke:1}")
}
此种是Scala连接MySQL的测试Spark SQL
四、采用Hive SQL作为数据源
前面讲到了Spark SQL连接MySQL的用法,下面举例使用Java JDBC连接Hive的方法,Hive作为数据源
加入Hive配置坐标:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
目前测试可用Java JDBC的示例连接Hive为:
String url = "jdbc:hive2://192.16.1.41:10000/default";
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
log.error("异常:", e);
}
Connection conn = DriverManager.getConnection(url,"hadoop","");
Statement stmt = conn.createStatement();
String sql = "SELECT * FROM t_mem limit 10";
ResultSet res = stmt.executeQuery(sql);
while(res.next()){
System.out.println("p_id: "+res.getInt(1)+"\tname: "+res.getString(2)+"\tage:" + res.getInt(3));
}
本文简单讲解了Spark SQL的用法,后面将继续推出Spring Boot集成Hive、集成Flink、微服务系列及阿里微服务系列,敬请期待。
- 上一篇: 数据中台中的数据存储技术(数据中台的内容)
- 下一篇: 苏宁6亿会员是如何做到快速精确分析的?
猜你喜欢
- 2024-10-09 DCOS(数据中心操作系统)到底是什么鬼?
- 2024-10-09 PostgresSql学习-Vacuum清理机制(plsql清理查询缓存)
- 2024-10-09 最全的大数据学习资料整理(上集)(大数据入门知识)
- 2024-10-09 苏宁6亿会员是如何做到快速精确分析的?
- 2024-10-09 数据中台中的数据存储技术(数据中台的内容)
- 2024-10-09 形象理解海量大数据实时系统(大数据形象描述)
- 2024-10-09 数据库内核杂谈:索引优化(数据库索引设计与优化 pdf)
- 2024-10-09 OPC UA服务器数据写入数据库以及数据分析
- 2024-10-09 探索SQL 与 NO-SQL 数据库-到底有啥不一样呢?
- 2024-10-09 无服务器POSTGRESQL中的分支机制(sql无服务器连接)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)