# breeze-viz
**Repository Path**: big01/breeze-viz
## Basic Information
- **Project Name**: breeze-viz
- **Description**: 清风可视化
空气污染数据的可视化分析平台。
- **Primary Language**: Unknown
- **License**: MulanPSL-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2024-12-13
- **Last Updated**: 2025-01-02
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# 空气污染数据的可视化分析(breeze-viz)
项目名称:空气污染数据的可视化分析
**数据来源**
| 名称 | 来源 |
| ---------------------------------- | ------------------------------------------------------------ |
| 意大利某城市现场的气体 | https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set |
| 印度空气质量数据(2015 - 2020 年) | https://www.kaggle.com/datasets/rohanrao/air-quality-data-in-india |
| 全球空气污染数据集 | https://www.heywhale.com/mw/dataset/64aab06558ef2eb35108cc0c/file |
## 项目预处理
#### 字段解析
##### 全球空气污染数据集
> AirGlobal.csv
| 字段 | 含义 | |
| :----------------: | :------------: | :--: |
| Country | 国家 | |
| City | 城市 | |
| AQI Value | AQI指数 | |
| AQI Category | AQI级别 | |
| CO AQI Value | CO的AQI | |
| CO AQI Category | CO的AQI级别 | |
| Ozone AQI Value | Ozone的AQI | |
| Ozone AQI Category | Ozone的AQI级别 | |
| NO2 AQI Value | NO2的AQI | |
| NO2 AQI Category | NO2的AQI级别 | |
| PM2.5 AQI Value | PM2.5的AQI | |
| PM2.5 AQI Category | CO的AQI级别 | |
> AQI:空气质量指数(Air Quality Index, AQI)的数值。AQI是根据空气中的主要污染物浓度来计算的,数值越高表示空气质量越差。AQI值通常是一个数值,范围从0到500,数值越高表示污染越严重。
>
> AQI Category:空气质量指数(AQI)的分类。AQI数值被分为多个级别,以帮助人们理解空气质量的好坏。
>
> CO:一氧化碳
>
> Ozone:臭氧
>
> NO2:二氧化氮
>PM2.5:细颗粒物,PM2.5指的是直径小于等于2.5微米的颗粒物,通常是空气污染的主要成分之一,PM2.5浓度高时对健康的危害更大。
##### 印度车站数据集
>IndiaStationday.csv
| 字段 | 含义 | |
| :----------------: | :------------: | :--: |
| City | 城市 | |
| | | |
| AQI Value | AQI指数 | |
| AQI Category | AQI级别 | |
| CO AQI Value | CO的AQI | |
| CO AQI Category | CO的AQI级别 | |
| Ozone AQI Value | Ozone的AQI | |
| Ozone AQI Category | Ozone的AQI级别 | |
| NO2 AQI Value | NO2的AQI | |
| NO2 AQI Category | NO2的AQI级别 | |
| PM2.5 AQI Value | PM2.5的AQI | |
| PM2.5 AQI Category | CO的AQI级别 | |
> AQI:空气质量指数(Air Quality Index, AQI)的数值。AQI是根据空气中的主要污染物浓度来计算的,数值越高表示空气质量越差。AQI值通常是一个数值,范围从0到500,数值越高表示污染越严重。
>
> AQI Category:空气质量指数(AQI)的分类。AQI数值被分为多个级别,以帮助人们理解空气质量的好坏。
>
> CO:一氧化碳
>
> Ozone:臭氧
>
> NO2:二氧化氮
>
> PM2.5:细颗粒物,PM2.5指的是直径小于等于2.5微米的颗粒物,通常是空气污染的主要成分之一,PM2.5浓度高时对健康的危害更大。
##### 印度城市每日污染数据集
>IndiaCityday.csv
| 字段 | 含义 | |
| :----------------: | :------------: | :--: |
| Country | 国家 | |
| City | 城市 | |
| AQI Value | AQI指数 | |
| AQI Category | AQI级别 | |
| CO AQI Value | CO的AQI | |
| CO AQI Category | CO的AQI级别 | |
| Ozone AQI Value | Ozone的AQI | |
| Ozone AQI Category | Ozone的AQI级别 | |
| NO2 AQI Value | NO2的AQI | |
| NO2 AQI Category | NO2的AQI级别 | |
| PM2.5 AQI Value | PM2.5的AQI | |
| PM2.5 AQI Category | CO的AQI级别 | |
##### 意大利空气污染数据集
>italy.csv
| 字段 | 含义 | |
| ----------------- | ------------------------------------- | ---- |
| date | 日期 | |
| time | 时间 | |
| co_gt | 一氧化碳(CO)浓度(GT) | |
| pt08_s1_co | 一氧化碳(CO)传感器值(PT08.S1) | |
| nmhc_gt | 非甲烷总烃(NMHC)浓度(GT) | |
| c6h6_gt | 苯(C6H6)浓度(GT) | |
| pt08_s2_nmhc | 非甲烷总烃(NMHC)传感器值(PT08.S2) | |
| nox_gt | 氮氧化物(NOx)浓度(GT) | |
| pt08_s3_nox | 氮氧化物(NOx)传感器值(PT08.S3) | |
| no2_gt | 二氧化氮(NO2)浓度(GT) | |
| pt08_s4_no2 | 二氧化氮(NO2)传感器值(PT08.S4) | |
| pt08_s5_o3 | 臭氧(O3)传感器值(PT08.S5) | |
| temperature | 温度(T) | |
| relative_humidity | 相对湿度(RH) | |
| absolute_humidity | 绝对湿度(AH) | |
> date(日期): 记录测量数据的日期。
>
> time(时间): 记录测量数据的时间。
>
> co_gt(一氧化碳(CO)浓度(GT)): 记录空气中一氧化碳(CO)的浓度,单位通常是ppm(百万分之一)。
>
> pt08_s1_co(一氧化碳(CO)传感器值(PT08.S1)): 一氧化碳(CO)的传感器值,表示传感器对CO浓度的测量结果。
>
> nmhc_gt(非甲烷总烃(NMHC)浓度(GT)): 记录空气中非甲烷总烃(NMHC)的浓度,单位通常是ppm。
>
> c6h6_gt(苯(C6H6)浓度(GT)): 记录空气中苯(C6H6)的浓度,单位通常是ppm。
>
> pt08_s2_nmhc(非甲烷总烃(NMHC)传感器值(PT08.S2)): 非甲烷总烃(NMHC)浓度的传感器值,表示传感器对该污染物浓度的测量结果。
>
> nox_gt(氮氧化物(NOx)浓度(GT)): 记录空气中氮氧化物(NOx)的浓度,单位通常是ppm。
>
> pt08_s3_nox(氮氧化物(NOx)传感器值(PT08.S3)): 氮氧化物(NOx)浓度的传感器值,表示传感器对该污染物浓度的测量结果。
>
> no2_gt(二氧化氮(NO2)浓度(GT)): 记录空气中二氧化氮(NO2)的浓度,单位通常是ppm。
>
> pt08_s4_no2(二氧化氮(NO2)传感器值(PT08.S4)): 二氧化氮(NO2)浓度的传感器值,表示传感器对该污染物浓度的测量结果。
>
> pt08_s5_o3(臭氧(O3)传感器值(PT08.S5)): 臭氧(O3)浓度的传感器值,表示传感器对臭氧浓度的测量结果。
>
> temperature(温度(T)): 测量时的温度,单位通常为摄氏度(°C)。
>
> relative_humidity(相对湿度(RH)): 测量时的相对湿度,单位通常为百分比(%)
>
> absolute_humidity(绝对湿度(AH)): 测量时的绝对湿度,通常单位是g/m³,表示每立方米空气中水蒸气的质量。
上传原始数据到HDFS
```bash
hdfs dfs -mkdir -p /air-data/ods
```
```bash
hdfs dfs -put AirGlobal.csv /air-data/ods
```
#### 创建 hive 数据库
```sql
create database air;
```
#### 创建 mysql 数据库
```sql
create database air;
```
## 工程文件
1. 数据分析 air-spark
2. 数据接口 air-api
3. 数据可视化 air-view
## 数据分析
> [air-spark](./air-spark)
### pom.xml
```xml
4.0.0
cn.lhz
air-spark
1.0.0
8
${jdk.version}
${jdk.version}
${jdk.version}
utf-8
utf-8
UTF-8
true
true
3.5.3
1.8.1
2.18.0
3.17.0
1.2.24
2.0.53
2.0.53
2.11.0
3.4.0
3.1.3
1.1.2
2.15.4
5.8.34
1.18.34
9.1.0
2.13.15
org.scala-lang
scala-library
${scala.version}
org.scala-lang
scala-compiler
${scala.version}
com.github.binarywang
java-testdata-generator
${java-testdata-generator.version}
org.apache.spark
spark-core_2.13
${spark.version}
org.apache.spark
spark-sql_2.13
${spark.version}
org.apache.spark
spark-streaming_2.13
${spark.version}
org.apache.spark
spark-hive_2.13
${spark.version}
org.apache.spark
spark-streaming-kafka-0-10_2.13
${spark.version}
org.projectlombok
lombok
${lombok.version}
org.apache.logging.log4j
log4j-slf4j2-impl
2.24.2
org.apache.logging.log4j
log4j-core
2.24.2
org.slf4j
slf4j-api
2.0.16
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.hive
hive-jdbc
${hive.version}
org.apache.commons
commons-lang3
${commons-lang3.version}
commons-io
commons-io
${commons-io.version}
com.mysql
mysql-connector-j
${mysql.version}
com.fasterxml.jackson.core
jackson-core
${jackson.version}
com.fasterxml.jackson.core
jackson-annotations
${jackson.version}
com.fasterxml.jackson.core
jackson-databind
${jackson.version}
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
${jackson.version}
${project.artifactId}
org.apache.maven.plugins
maven-compiler-plugin
3.13.0
UTF-8
${jdk.version}
${jdk.version}
org.apache.maven.plugins
maven-clean-plugin
3.4.0
org.apache.maven.plugins
maven-resources-plugin
3.3.1
org.apache.maven.plugins
maven-war-plugin
3.4.0
org.apache.maven.plugins
maven-jar-plugin
3.4.2
org.apache.maven.plugins
maven-surefire-plugin
3.5.2
true
net.alchim31.maven
scala-maven-plugin
4.9.2
${scala.version}
${scala.version}
scala-compile-first
process-resources
compile
testCompile
compile-scala
compile
add-source
compile
test-compile-scala
test-compile
add-source
testCompile
org.apache.maven.plugins
maven-assembly-plugin
3.7.1
jar-with-dependencies
make-assembly
package
single
public
aliyun nexus
https://maven.aliyun.com/repository/public
true
public
aliyun nexus
https://maven.aliyun.com/repository/public
true
false
```
### 集群配置文件
> `resources`
#### hdfs配置文件
> core-site.xml
>
> hdfs-site.xml
>
#### hive配置文件
> hive-stie.xml
### log4j.properties
```properties
log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n
log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n
```
### spark工具类
> hive.properties
```properties
warehouse.dir=hdfs://lihaozhe:8020/user/hive/warehouse
metastore.uris=thrift://lihaozhe:9083
```
> mysql.properties
```properties
url=jdbc:mysql://lihaozhe
schema=air
user=root
password=lihaozhe
```
> cn.lhz.util.spark.SparkUtil.scala
```scala
package cn.lhz.util.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkUtil {
def apply(): SparkSession = {
System.setProperty("HADOOP_USER_NAME", "root")
val sparkConf = new SparkConf()
if (!sparkConf.contains("spark.master")) {
sparkConf.setMaster("local")
}
val prop = new Properties()
prop.load(this.getClass.getClassLoader.getResourceAsStream("hive.properties"))
val sparkSession: SparkSession = SparkSession
.builder()
.appName("Spark SQL JDBC")
.config(conf = sparkConf)
.config("spark.sql.warehouse.dir", prop.getProperty("warehouse.dir"))
.config("hive.metastore.uris", prop.getProperty("metastore.uris"))
.enableHiveSupport()
.getOrCreate()
sparkSession
}
/**
* 获取 mysql 连接参数
*
* @param tableName 数据表名称 数据库为air
* @return
*/
def mysqlConnectionProperties(tableName: String): Properties = {
val prop = new Properties()
prop.load(this.getClass.getClassLoader.getResourceAsStream("mysql.properties"))
val schema = prop.remove("schema")
prop.put("tableName", schema + "." + tableName)
prop
}
/**
* 获取 mysql 连接参数
*
* @param schema 数据库名称
* @param tableName 数据表名称
* @return
*/
def mysqlConnectionProperties(schema: String, tableName: String): Properties = {
val prop = new Properties()
prop.load(this.getClass.getClassLoader.getResourceAsStream("mysql.properties"))
prop.remove("schema")
prop.put("tableName", schema + "." + tableName)
prop
}
/**
* 百分比转换
*
* @param number 浮点数
* @return 百分比
*/
def rate(number: Double): Double = {
BigDecimal(number).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble * 100
}
}
```
### 数据清洗
#### hive 建表
##### 全球污染数据建表
>AirQuality.csv
```hive
create database if not exists air;
use air;
drop table if exists `dwd_global`;
create table `dwd_global`
(
`country` string comment '国家',
`city` string comment '城市',
`aqi_value` int comment '空气质量指数(AQI)值',
`aqi_category` string comment '空气质量指数(AQI)分类',
`co_aqi_value` int comment '一氧化碳(CO)AQI值',
`co_aqi_category` string comment '一氧化碳(CO)AQI分类',
`ozone_aqi_value` int comment '臭氧(Ozone)AQI值',
`ozone_aqi_category` string comment '臭氧(Ozone)AQI分类',
`no2_aqi_value` int comment '二氧化氮(NO2)AQI值',
`no2_aqi_category` string comment '二氧化氮(NO2)AQI分类',
`pm2_5_aqi_value` int comment '细颗粒物(PM2.5)AQI值',
`pm2_5_aqi_category` string comment '细颗粒物(PM2.5)AQI分类'
)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe';
```
##### 印度车站每日污染数据建表
>indiaStationDay.csv
```hive
create database if not exists air;
use air;
drop table if exists `dwd_india_station`;
create table `dwd_india_station`
(
`india_date` string comment '日期',
`india_time` string comment '时间',
`co_gt` int comment '一氧化碳(CO)浓度(GT)',
`pt08_s1_co` int comment '一氧化碳(CO)传感器值(PT08.S1)',
`nmhc_gt` int comment '非甲烷总烃(NMHC)浓度(GT)',
`pt08_s2_nmhc` int comment '非甲烷总烃(NMHC)传感器值(PT08.S2)',
`nox_gt` int comment '氮氧化物(NOx)浓度(GT)',
`pt08_s3_nox` int comment '氮氧化物(NOx)传感器值(PT08.S3)',
`no2_gt` int comment '二氧化氮(NO2)浓度(GT)',
`pt08_s4_no2` int comment '二氧化氮(NO2)传感器值(PT08.S4)',
`pt08_s5_o3` int comment '臭氧(O3)传感器值(PT08.S5)',
`temperature` int comment '温度(T)',
`relative_humidity` int comment '相对湿度(RH)',
`absolute_humidity` int comment '绝对湿度(AH)'
)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe';
```
##### 印度城市每日污染数据建表
> indiaCityDay.csv
```hive
create database if not exists air;
use air;
drop table if exists `dwd_india_city_day`;
create table `dwd_india_city_day`
(
`india_city_day_courty` string comment '印度城市',
`india_city_day_data` string comment '日期',
`india_city_day_pm2_5` float comment 'PM2.5浓度',
`india_city_day_no` float comment '一氧化碳浓度',
`india_city_day_no2` float comment '二氧化氮浓度',
`india_city_day_nox` float comment '氮氧化物浓度',
`india_city_day_co` float comment '一氧化碳(CO)浓度',
`india_city_day_so2` float comment '二氧化硫浓度'
)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe';
```
##### 意大利污染数据建表
> Airital.csv
```hive
create database if not exists air;
use air;
drop table if exists `dwd_air_italy`;
create table `dwd_air_italy`
(
`date` string comment '日期',
`time` string comment '时间',
`co_gt` int comment '一氧化碳(CO)浓度(GT)',
`pt08_s1_co` int comment '一氧化碳(CO)传感器值(PT08.S1)',
`nmhc_gt` int comment '非甲烷总烃(NMHC)浓度(GT)',
`c6h6_gt` int comment '苯(C6H6)浓度(GT)',
`pt08_s2_nmhc` int comment '非甲烷总烃(NMHC)传感器值(PT08.S2)',
`nox_gt` int comment '氮氧化物(NOx)浓度(GT)',
`pt08_s3_nox` int comment '氮氧化物(NOx)传感器值(PT08.S3)',
`no2_gt` int comment '二氧化氮(NO2)浓度(GT)',
`pt08_s4_no2` int comment '二氧化氮(NO2)传感器值(PT08.S4)',
`pt08_s5_o3` int comment '臭氧(O3)传感器值(PT08.S5)',
`temperature` int comment '温度(T)',
`relative_humidity` int comment '相对湿度(RH)',
`absolute_humidity` int comment '绝对湿度(AH)'
)
row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe';
```
#### ETL
##### global ETL
> AirQuality.csv
```scala
package cn.lhz.global.etl
import cn.lhz.util.spark.SparkUtil
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
/**
* 对 global.csv 进行数据清洗,过滤掉包含空值的行
*
* @author 年阔正
* @version 1.0.0
*/
object GlobalEtl {
def main(args: Array[String]): Unit = {
// 获取 SparkSession 对象
val sparkSession = SparkUtil()
// 定义源数据在HDFS文件系统的路径
val originPath: String = "/air-data/ods/global.csv"
// 读取 CSV 文件,获取 DataFrame
val df = sparkSession.read.option("header", "true").csv(originPath)
// 字段集合
val cols = Array("country", "city", "aqi_value", "aqi_category",
"co_aqi_value", "co_aqi_category", "ozone_aqi_value", "ozone_aqi_category",
"no2_aqi_value", "no2_aqi_category", "pm2_5_aqi_value", "pm2_5_aqi_category")
// 过滤掉有空值的数据行
val cleanedDf = df.na.drop(cols)
// 为字段指定数据类型
val table = cleanedDf.select(
col("country"),
col("city"),
col("aqi_value") cast (IntegerType),
col("aqi_category"),
col("co_aqi_value") cast (IntegerType),
col("co_aqi_category"),
col("ozone_aqi_value") cast (IntegerType),
col("ozone_aqi_category"),
col("no2_aqi_value") cast (IntegerType),
col("no2_aqi_category"),
col("pm2_5_aqi_value") cast (IntegerType),
col("pm2_5_aqi_category")
)
// 保存数据到 Hive 表
table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_global")
// 释放资源
sparkSession.stop()
}
}
```
##### IndiaStationDay ETL
> indiaStationDay.csv
```scala
package cn.lhz.indiastationday.etl
import cn.lhz.util.spark.SparkUtil
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
/**
* 对 IndiaStationDay.csv 进行数据清洗,过滤掉包含空值的行
*
* @author 董奕麟
* @version 1.0.0
*/
object IndiaStationEtl {
def main(args: Array[String]): Unit = {
// 获取 SparkSession 对象
val sparkSession = SparkUtil()
// 定义源数据在HDFS文件系统的路径
val originPath: String = "/air-data/ods/IndiaStationDay.csv"
// 读取 CSV 文件,获取 DataFrame
val df = sparkSession.read.option("header", "true").csv(originPath)
// 字段集合
val cols = Array(“stationid”,"date","pm2_5","pm10","no","nox","nh3","co","so2","o3","benzene","toluene","xylene","aqi","aqi_bucket")
// 过滤掉有空值的数据行
val cleanedDf = df.na.drop(cols)
// 为字段指定数据类型
val table = cleanedDf.select(
col("india_date"),
col("india_time"),
col("co_gt").cast(IntegerType),
col("pt08_s1_co").cast(IntegerType),
col("nmhc_gt").cast(IntegerType),
col("pt08_s2_nmhc").cast(IntegerType),
col("nox_gt").cast(IntegerType),
col("pt08_s3_nox").cast(IntegerType),
col("no2_gt").cast(IntegerType),
col("pt08_s4_no2").cast(IntegerType),
col("pt08_s5_o3").cast(IntegerType),
col("temperature").cast(IntegerType),
col("relative_humidity").cast(IntegerType),
col("absolute_humidity").cast(IntegerType)
)
// 保存数据到 Hive 表
table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_india_station")
// 释放资源
sparkSession.stop()
}
}
```
##### IndiaCityDay ETL
>indiaCityDay.csv
```scala
package cn.lhz.IndiaCityDay.etl
import cn.lhz.util.spark.SparkUtil
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
/**
* 对 IndiaCityDay.csv 进行数据清洗,过滤掉包含空值的行
*
* @author 陈世良
* @version 1.0.0
*/
boject indiaCityDayETL {
def main(args: Array[String]): Unit = {
// 获取 SparkSession 对象
val sparkSession = SparkUtil()
// 定义源数据在HDFS文件系统的路径
val originPath: String = "IndiaCityDay.csv"
// 读取 CSV 文件,获取 DataFrame
val df = sparkSession.read.option("header", "true").csv(originPath)
// 字段集合
val indiaCityDaySchema= Array("City", "Date", "PM2.5", "NO", "NO2", "NOx", "CO", "SO2")
// 过滤掉有空值的数据行
val indiaCityDaySchema = new StructType()
.add("City", StringType, true)
.add("Date", StringType, true)
.add("PM2.5", FloatType, true)
.add("NO", FloatType, true)
.add("NO2", FloatType, true)
.add("NOx", FloatType, true)
.add("CO", FloatType, true)
.add("SO2", FloatType, true)
// 保存数据到 Hive 表
table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_india_city_day")
// 释放资源
sparkSession.stop()
}
}
```
##### italy ETL
>Airital.csv
```scala
package cn.lhz.italy.etl
import cn.lhz.util.spark.SparkUtil
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
/**
* 对 italy.csv 进行数据清洗,过滤掉包含空值的行,并保存至 Hive 表
*
* @author 胡书源
* @version 1.0.0
*/
object ItalyEtl {
def main(args: Array[String]): Unit = {
// 获取 SparkSession 对象
val sparkSession = SparkUtil()
// 定义源数据在HDFS文件系统的路径
val originPath: String = "/air-data/ods/italy.csv"
// 读取 CSV 文件,获取 DataFrame
val df = sparkSession.read.option("header", "true").csv(originPath)
// 字段集合
val cols = Array(
"Date", "Time", "CO(GT)", "PT08.S1(CO)", "NMHC(GT)", "C6H6(GT)", "PT08.S2(NMHC)",
"NOx(GT)", "PT08.S3(NOx)", "NO2(GT)", "PT08.S4(NO2)", "PT08.S5(O3)", "T", "RH", "AH"
)
// 过滤掉有空值的数据行
val cleanedDf = df.na.drop(cols)
// 为字段指定数据类型并重命名字段
val table = cleanedDf.select(
col("Date").alias("date"),
col("Time").alias("time"),
col("CO(GT)").cast(IntegerType).alias("co_gt"),
col("PT08.S1(CO)").cast(IntegerType).alias("pt08_s1_co"),
col("NMHC(GT)").cast(IntegerType).alias("nmhc_gt"),
col("C6H6(GT)").cast(IntegerType).alias("c6h6_gt"),
col("PT08.S2(NMHC)").cast(IntegerType).alias("pt08_s2_nmhc"),
col("NOx(GT)").cast(IntegerType).alias("nox_gt"),
col("PT08.S3(NOx)").cast(IntegerType).alias("pt08_s3_nox"),
col("NO2(GT)").cast(IntegerType).alias("no2_gt"),
col("PT08.S4(NO2)").cast(IntegerType).alias("pt08_s4_no2"),
col("PT08.S5(O3)").cast(IntegerType).alias("pt08_s5_o3"),
col("T").cast(IntegerType).alias("temperature"),
col("RH").cast(IntegerType).alias("relative_humidity"),
col("AH").cast(IntegerType).alias("absolute_humidity")
)
// 保存数据到 Hive 表
table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_air_italy")
// 释放资源
sparkSession.stop()
}
}
```
**服务器提交job**
```bash
spark-submit --master yarn --deploy-mode cluster --class cn.lhz.cardio.CardioEtl heart-spark.jar
```
## 数据接口
> springboot工程
>
### pom.xml
```xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
3.3.5
cn.lhz
air-api
0.0.1
air-api
air-api
21
21
${jdk.version}
${jdk.version}
${jdk.version}
utf-8
utf-8
UTF-8
true
true
2.17.0
3.17.0
1.2.23
2.1.4
2.0.53
2.0.53
2.11.0
1.1.2
5.8.32
5.11.2
2.0.0
2.18.0
2.9.18
1.18.34
3.5.9
3.0.4
9.1.0
3.4.0
0.289
org.springframework.boot
spring-boot-starter-thymeleaf
org.springframework.boot
spring-boot-starter-web
com.baomidou
mybatis-plus-spring-boot3-starter
${mybatis-plus.version}
com.baomidou
mybatis-plus-jsqlparser
${mybatis-plus.version}
com.baomidou
mybatis-plus-generator
${mybatis-plus.version}
org.springframework.boot
spring-boot-configuration-processor
true
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
cn.hutool
hutool-all
${hutool.version}
com.github.binarywang
java-testdata-generator
1.1.2
org.apache.commons
commons-lang3
${commons-lang3.version}
commons-io
commons-io
${commons-io.version}
com.google.code.gson
gson
${gson.version}
org.webjars
layui
${layui.version}
com.alibaba
druid
${druid.version}
com.mysql
mysql-connector-j
${mysql.version}
com.github.xiaoymin
knife4j-openapi3-jakarta-spring-boot-starter
4.5.0
org.apache.maven.plugins
maven-compiler-plugin
3.13.0
UTF-8
${jdk.version}
${jdk.version}
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok
${project.name}
public
aliyun nexus
https://maven.aliyun.com/repository/public
true
public
aliyun nexus
https://maven.aliyun.com/repository/public
true
false
```