# breeze-viz **Repository Path**: big01/breeze-viz ## Basic Information - **Project Name**: breeze-viz - **Description**: 清风可视化 空气污染数据的可视化分析平台。 - **Primary Language**: Unknown - **License**: MulanPSL-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2024-12-13 - **Last Updated**: 2025-01-02 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 空气污染数据的可视化分析(breeze-viz) 项目名称:空气污染数据的可视化分析 **数据来源** | 名称 | 来源 | | ---------------------------------- | ------------------------------------------------------------ | | 意大利某城市现场的气体 | https://www.kaggle.com/datasets/fedesoriano/air-quality-data-set | | 印度空气质量数据(2015 - 2020 年) | https://www.kaggle.com/datasets/rohanrao/air-quality-data-in-india | | 全球空气污染数据集 | https://www.heywhale.com/mw/dataset/64aab06558ef2eb35108cc0c/file | ## 项目预处理 #### 字段解析 ##### 全球空气污染数据集 > AirGlobal.csv | 字段 | 含义 | | | :----------------: | :------------: | :--: | | Country | 国家 | | | City | 城市 | | | AQI Value | AQI指数 | | | AQI Category | AQI级别 | | | CO AQI Value | CO的AQI | | | CO AQI Category | CO的AQI级别 | | | Ozone AQI Value | Ozone的AQI | | | Ozone AQI Category | Ozone的AQI级别 | | | NO2 AQI Value | NO2的AQI | | | NO2 AQI Category | NO2的AQI级别 | | | PM2.5 AQI Value | PM2.5的AQI | | | PM2.5 AQI Category | CO的AQI级别 | | > AQI:空气质量指数(Air Quality Index, AQI)的数值。AQI是根据空气中的主要污染物浓度来计算的,数值越高表示空气质量越差。AQI值通常是一个数值,范围从0到500,数值越高表示污染越严重。 > > AQI Category:空气质量指数(AQI)的分类。AQI数值被分为多个级别,以帮助人们理解空气质量的好坏。 > > CO:一氧化碳 > > Ozone:臭氧 > > NO2:二氧化氮 >PM2.5:细颗粒物,PM2.5指的是直径小于等于2.5微米的颗粒物,通常是空气污染的主要成分之一,PM2.5浓度高时对健康的危害更大。 ##### 印度车站数据集 >IndiaStationday.csv | 字段 | 含义 | | | :----------------: | :------------: | :--: | | City | 城市 | | | | | | | AQI Value | AQI指数 | | | AQI Category | AQI级别 | | | CO AQI Value | CO的AQI | | | CO AQI Category | CO的AQI级别 | | | Ozone AQI Value | Ozone的AQI | | | Ozone AQI Category | Ozone的AQI级别 | | | NO2 AQI Value | NO2的AQI | | | NO2 AQI Category | NO2的AQI级别 | | | PM2.5 AQI Value | PM2.5的AQI | | | PM2.5 AQI Category | CO的AQI级别 | | > AQI:空气质量指数(Air Quality Index, AQI)的数值。AQI是根据空气中的主要污染物浓度来计算的,数值越高表示空气质量越差。AQI值通常是一个数值,范围从0到500,数值越高表示污染越严重。 > > AQI Category:空气质量指数(AQI)的分类。AQI数值被分为多个级别,以帮助人们理解空气质量的好坏。 > > CO:一氧化碳 > > Ozone:臭氧 > > NO2:二氧化氮 > > PM2.5:细颗粒物,PM2.5指的是直径小于等于2.5微米的颗粒物,通常是空气污染的主要成分之一,PM2.5浓度高时对健康的危害更大。 ##### 印度城市每日污染数据集 >IndiaCityday.csv | 字段 | 含义 | | | :----------------: | :------------: | :--: | | Country | 国家 | | | City | 城市 | | | AQI Value | AQI指数 | | | AQI Category | AQI级别 | | | CO AQI Value | CO的AQI | | | CO AQI Category | CO的AQI级别 | | | Ozone AQI Value | Ozone的AQI | | | Ozone AQI Category | Ozone的AQI级别 | | | NO2 AQI Value | NO2的AQI | | | NO2 AQI Category | NO2的AQI级别 | | | PM2.5 AQI Value | PM2.5的AQI | | | PM2.5 AQI Category | CO的AQI级别 | | ##### 意大利空气污染数据集 >italy.csv | 字段 | 含义 | | | ----------------- | ------------------------------------- | ---- | | date | 日期 | | | time | 时间 | | | co_gt | 一氧化碳(CO)浓度(GT) | | | pt08_s1_co | 一氧化碳(CO)传感器值(PT08.S1) | | | nmhc_gt | 非甲烷总烃(NMHC)浓度(GT) | | | c6h6_gt | 苯(C6H6)浓度(GT) | | | pt08_s2_nmhc | 非甲烷总烃(NMHC)传感器值(PT08.S2) | | | nox_gt | 氮氧化物(NOx)浓度(GT) | | | pt08_s3_nox | 氮氧化物(NOx)传感器值(PT08.S3) | | | no2_gt | 二氧化氮(NO2)浓度(GT) | | | pt08_s4_no2 | 二氧化氮(NO2)传感器值(PT08.S4) | | | pt08_s5_o3 | 臭氧(O3)传感器值(PT08.S5) | | | temperature | 温度(T) | | | relative_humidity | 相对湿度(RH) | | | absolute_humidity | 绝对湿度(AH) | | > date(日期): 记录测量数据的日期。 > > time(时间): 记录测量数据的时间。 > > co_gt(一氧化碳(CO)浓度(GT)): 记录空气中一氧化碳(CO)的浓度,单位通常是ppm(百万分之一)。 > > pt08_s1_co(一氧化碳(CO)传感器值(PT08.S1)): 一氧化碳(CO)的传感器值,表示传感器对CO浓度的测量结果。 > > nmhc_gt(非甲烷总烃(NMHC)浓度(GT)): 记录空气中非甲烷总烃(NMHC)的浓度,单位通常是ppm。 > > c6h6_gt(苯(C6H6)浓度(GT)): 记录空气中苯(C6H6)的浓度,单位通常是ppm。 > > pt08_s2_nmhc(非甲烷总烃(NMHC)传感器值(PT08.S2)): 非甲烷总烃(NMHC)浓度的传感器值,表示传感器对该污染物浓度的测量结果。 > > nox_gt(氮氧化物(NOx)浓度(GT)): 记录空气中氮氧化物(NOx)的浓度,单位通常是ppm。 > > pt08_s3_nox(氮氧化物(NOx)传感器值(PT08.S3)): 氮氧化物(NOx)浓度的传感器值,表示传感器对该污染物浓度的测量结果。 > > no2_gt(二氧化氮(NO2)浓度(GT)): 记录空气中二氧化氮(NO2)的浓度,单位通常是ppm。 > > pt08_s4_no2(二氧化氮(NO2)传感器值(PT08.S4)): 二氧化氮(NO2)浓度的传感器值,表示传感器对该污染物浓度的测量结果。 > > pt08_s5_o3(臭氧(O3)传感器值(PT08.S5)): 臭氧(O3)浓度的传感器值,表示传感器对臭氧浓度的测量结果。 > > temperature(温度(T)): 测量时的温度,单位通常为摄氏度(°C)。 > > relative_humidity(相对湿度(RH)): 测量时的相对湿度,单位通常为百分比(%) > > absolute_humidity(绝对湿度(AH)): 测量时的绝对湿度,通常单位是g/m³,表示每立方米空气中水蒸气的质量。 上传原始数据到HDFS ```bash hdfs dfs -mkdir -p /air-data/ods ``` ```bash hdfs dfs -put AirGlobal.csv /air-data/ods ``` #### 创建 hive 数据库 ```sql create database air; ``` #### 创建 mysql 数据库 ```sql create database air; ``` ## 工程文件 1. 数据分析 air-spark 2. 数据接口 air-api 3. 数据可视化 air-view ## 数据分析 > [air-spark](./air-spark) ### pom.xml ```xml 4.0.0 cn.lhz air-spark 1.0.0 8 ${jdk.version} ${jdk.version} ${jdk.version} utf-8 utf-8 UTF-8 true true 3.5.3 1.8.1 2.18.0 3.17.0 1.2.24 2.0.53 2.0.53 2.11.0 3.4.0 3.1.3 1.1.2 2.15.4 5.8.34 1.18.34 9.1.0 2.13.15 org.scala-lang scala-library ${scala.version} org.scala-lang scala-compiler ${scala.version} com.github.binarywang java-testdata-generator ${java-testdata-generator.version} org.apache.spark spark-core_2.13 ${spark.version} org.apache.spark spark-sql_2.13 ${spark.version} org.apache.spark spark-streaming_2.13 ${spark.version} org.apache.spark spark-hive_2.13 ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.13 ${spark.version} org.projectlombok lombok ${lombok.version} org.apache.logging.log4j log4j-slf4j2-impl 2.24.2 org.apache.logging.log4j log4j-core 2.24.2 org.slf4j slf4j-api 2.0.16 org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hive hive-jdbc ${hive.version} org.apache.commons commons-lang3 ${commons-lang3.version} commons-io commons-io ${commons-io.version} com.mysql mysql-connector-j ${mysql.version} com.fasterxml.jackson.core jackson-core ${jackson.version} com.fasterxml.jackson.core jackson-annotations ${jackson.version} com.fasterxml.jackson.core jackson-databind ${jackson.version} com.fasterxml.jackson.datatype jackson-datatype-jsr310 ${jackson.version} ${project.artifactId} org.apache.maven.plugins maven-compiler-plugin 3.13.0 UTF-8 ${jdk.version} ${jdk.version} org.apache.maven.plugins maven-clean-plugin 3.4.0 org.apache.maven.plugins maven-resources-plugin 3.3.1 org.apache.maven.plugins maven-war-plugin 3.4.0 org.apache.maven.plugins maven-jar-plugin 3.4.2 org.apache.maven.plugins maven-surefire-plugin 3.5.2 true net.alchim31.maven scala-maven-plugin 4.9.2 ${scala.version} ${scala.version} scala-compile-first process-resources compile testCompile compile-scala compile add-source compile test-compile-scala test-compile add-source testCompile org.apache.maven.plugins maven-assembly-plugin 3.7.1 jar-with-dependencies make-assembly package single public aliyun nexus https://maven.aliyun.com/repository/public true public aliyun nexus https://maven.aliyun.com/repository/public true false ``` ### 集群配置文件 > `resources` #### hdfs配置文件 > core-site.xml > > hdfs-site.xml > #### hive配置文件 > hive-stie.xml ### log4j.properties ```properties log4j.rootLogger=error, stdout,R log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n log4j.appender.R=org.apache.log4j.RollingFileAppender log4j.appender.R.File=../log/agent.log log4j.appender.R.MaxFileSize=1024KB log4j.appender.R.MaxBackupIndex=1 log4j.appender.R.layout=org.apache.log4j.PatternLayout log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n ``` ### spark工具类 > hive.properties ```properties warehouse.dir=hdfs://lihaozhe:8020/user/hive/warehouse metastore.uris=thrift://lihaozhe:9083 ``` > mysql.properties ```properties url=jdbc:mysql://lihaozhe schema=air user=root password=lihaozhe ``` > cn.lhz.util.spark.SparkUtil.scala ```scala package cn.lhz.util.spark import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import java.util.Properties object SparkUtil { def apply(): SparkSession = { System.setProperty("HADOOP_USER_NAME", "root") val sparkConf = new SparkConf() if (!sparkConf.contains("spark.master")) { sparkConf.setMaster("local") } val prop = new Properties() prop.load(this.getClass.getClassLoader.getResourceAsStream("hive.properties")) val sparkSession: SparkSession = SparkSession .builder() .appName("Spark SQL JDBC") .config(conf = sparkConf) .config("spark.sql.warehouse.dir", prop.getProperty("warehouse.dir")) .config("hive.metastore.uris", prop.getProperty("metastore.uris")) .enableHiveSupport() .getOrCreate() sparkSession } /** * 获取 mysql 连接参数 * * @param tableName 数据表名称 数据库为air * @return */ def mysqlConnectionProperties(tableName: String): Properties = { val prop = new Properties() prop.load(this.getClass.getClassLoader.getResourceAsStream("mysql.properties")) val schema = prop.remove("schema") prop.put("tableName", schema + "." + tableName) prop } /** * 获取 mysql 连接参数 * * @param schema 数据库名称 * @param tableName 数据表名称 * @return */ def mysqlConnectionProperties(schema: String, tableName: String): Properties = { val prop = new Properties() prop.load(this.getClass.getClassLoader.getResourceAsStream("mysql.properties")) prop.remove("schema") prop.put("tableName", schema + "." + tableName) prop } /** * 百分比转换 * * @param number 浮点数 * @return 百分比 */ def rate(number: Double): Double = { BigDecimal(number).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble * 100 } } ``` ### 数据清洗 #### hive 建表 ##### 全球污染数据建表 >AirQuality.csv ```hive create database if not exists air; use air; drop table if exists `dwd_global`; create table `dwd_global` ( `country` string comment '国家', `city` string comment '城市', `aqi_value` int comment '空气质量指数(AQI)值', `aqi_category` string comment '空气质量指数(AQI)分类', `co_aqi_value` int comment '一氧化碳(CO)AQI值', `co_aqi_category` string comment '一氧化碳(CO)AQI分类', `ozone_aqi_value` int comment '臭氧(Ozone)AQI值', `ozone_aqi_category` string comment '臭氧(Ozone)AQI分类', `no2_aqi_value` int comment '二氧化氮(NO2)AQI值', `no2_aqi_category` string comment '二氧化氮(NO2)AQI分类', `pm2_5_aqi_value` int comment '细颗粒物(PM2.5)AQI值', `pm2_5_aqi_category` string comment '细颗粒物(PM2.5)AQI分类' ) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'; ``` ##### 印度车站每日污染数据建表 >indiaStationDay.csv ```hive create database if not exists air; use air; drop table if exists `dwd_india_station`; create table `dwd_india_station` ( `india_date` string comment '日期', `india_time` string comment '时间', `co_gt` int comment '一氧化碳(CO)浓度(GT)', `pt08_s1_co` int comment '一氧化碳(CO)传感器值(PT08.S1)', `nmhc_gt` int comment '非甲烷总烃(NMHC)浓度(GT)', `pt08_s2_nmhc` int comment '非甲烷总烃(NMHC)传感器值(PT08.S2)', `nox_gt` int comment '氮氧化物(NOx)浓度(GT)', `pt08_s3_nox` int comment '氮氧化物(NOx)传感器值(PT08.S3)', `no2_gt` int comment '二氧化氮(NO2)浓度(GT)', `pt08_s4_no2` int comment '二氧化氮(NO2)传感器值(PT08.S4)', `pt08_s5_o3` int comment '臭氧(O3)传感器值(PT08.S5)', `temperature` int comment '温度(T)', `relative_humidity` int comment '相对湿度(RH)', `absolute_humidity` int comment '绝对湿度(AH)' ) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'; ``` ##### 印度城市每日污染数据建表 > indiaCityDay.csv ```hive create database if not exists air; use air; drop table if exists `dwd_india_city_day`; create table `dwd_india_city_day` ( `india_city_day_courty` string comment '印度城市', `india_city_day_data` string comment '日期', `india_city_day_pm2_5` float comment 'PM2.5浓度', `india_city_day_no` float comment '一氧化碳浓度', `india_city_day_no2` float comment '二氧化氮浓度', `india_city_day_nox` float comment '氮氧化物浓度', `india_city_day_co` float comment '一氧化碳(CO)浓度', `india_city_day_so2` float comment '二氧化硫浓度' ) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'; ``` ##### 意大利污染数据建表 > Airital.csv ```hive create database if not exists air; use air; drop table if exists `dwd_air_italy`; create table `dwd_air_italy` ( `date` string comment '日期', `time` string comment '时间', `co_gt` int comment '一氧化碳(CO)浓度(GT)', `pt08_s1_co` int comment '一氧化碳(CO)传感器值(PT08.S1)', `nmhc_gt` int comment '非甲烷总烃(NMHC)浓度(GT)', `c6h6_gt` int comment '苯(C6H6)浓度(GT)', `pt08_s2_nmhc` int comment '非甲烷总烃(NMHC)传感器值(PT08.S2)', `nox_gt` int comment '氮氧化物(NOx)浓度(GT)', `pt08_s3_nox` int comment '氮氧化物(NOx)传感器值(PT08.S3)', `no2_gt` int comment '二氧化氮(NO2)浓度(GT)', `pt08_s4_no2` int comment '二氧化氮(NO2)传感器值(PT08.S4)', `pt08_s5_o3` int comment '臭氧(O3)传感器值(PT08.S5)', `temperature` int comment '温度(T)', `relative_humidity` int comment '相对湿度(RH)', `absolute_humidity` int comment '绝对湿度(AH)' ) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'; ``` #### ETL ##### global ETL > AirQuality.csv ```scala package cn.lhz.global.etl import cn.lhz.util.spark.SparkUtil import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.IntegerType /** * 对 global.csv 进行数据清洗,过滤掉包含空值的行 * * @author 年阔正 * @version 1.0.0 */ object GlobalEtl { def main(args: Array[String]): Unit = { // 获取 SparkSession 对象 val sparkSession = SparkUtil() // 定义源数据在HDFS文件系统的路径 val originPath: String = "/air-data/ods/global.csv" // 读取 CSV 文件,获取 DataFrame val df = sparkSession.read.option("header", "true").csv(originPath) // 字段集合 val cols = Array("country", "city", "aqi_value", "aqi_category", "co_aqi_value", "co_aqi_category", "ozone_aqi_value", "ozone_aqi_category", "no2_aqi_value", "no2_aqi_category", "pm2_5_aqi_value", "pm2_5_aqi_category") // 过滤掉有空值的数据行 val cleanedDf = df.na.drop(cols) // 为字段指定数据类型 val table = cleanedDf.select( col("country"), col("city"), col("aqi_value") cast (IntegerType), col("aqi_category"), col("co_aqi_value") cast (IntegerType), col("co_aqi_category"), col("ozone_aqi_value") cast (IntegerType), col("ozone_aqi_category"), col("no2_aqi_value") cast (IntegerType), col("no2_aqi_category"), col("pm2_5_aqi_value") cast (IntegerType), col("pm2_5_aqi_category") ) // 保存数据到 Hive 表 table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_global") // 释放资源 sparkSession.stop() } } ``` ##### IndiaStationDay ETL > indiaStationDay.csv ```scala package cn.lhz.indiastationday.etl import cn.lhz.util.spark.SparkUtil import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.IntegerType /** * 对 IndiaStationDay.csv 进行数据清洗,过滤掉包含空值的行 * * @author 董奕麟 * @version 1.0.0 */ object IndiaStationEtl { def main(args: Array[String]): Unit = { // 获取 SparkSession 对象 val sparkSession = SparkUtil() // 定义源数据在HDFS文件系统的路径 val originPath: String = "/air-data/ods/IndiaStationDay.csv" // 读取 CSV 文件,获取 DataFrame val df = sparkSession.read.option("header", "true").csv(originPath) // 字段集合 val cols = Array(“stationid”,"date","pm2_5","pm10","no","nox","nh3","co","so2","o3","benzene","toluene","xylene","aqi","aqi_bucket") // 过滤掉有空值的数据行 val cleanedDf = df.na.drop(cols) // 为字段指定数据类型 val table = cleanedDf.select( col("india_date"), col("india_time"), col("co_gt").cast(IntegerType), col("pt08_s1_co").cast(IntegerType), col("nmhc_gt").cast(IntegerType), col("pt08_s2_nmhc").cast(IntegerType), col("nox_gt").cast(IntegerType), col("pt08_s3_nox").cast(IntegerType), col("no2_gt").cast(IntegerType), col("pt08_s4_no2").cast(IntegerType), col("pt08_s5_o3").cast(IntegerType), col("temperature").cast(IntegerType), col("relative_humidity").cast(IntegerType), col("absolute_humidity").cast(IntegerType) ) // 保存数据到 Hive 表 table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_india_station") // 释放资源 sparkSession.stop() } } ``` ##### IndiaCityDay ETL >indiaCityDay.csv ```scala package cn.lhz.IndiaCityDay.etl import cn.lhz.util.spark.SparkUtil import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.IntegerType /** * 对 IndiaCityDay.csv 进行数据清洗,过滤掉包含空值的行 * * @author 陈世良 * @version 1.0.0 */ boject indiaCityDayETL { def main(args: Array[String]): Unit = { // 获取 SparkSession 对象 val sparkSession = SparkUtil() // 定义源数据在HDFS文件系统的路径 val originPath: String = "IndiaCityDay.csv" // 读取 CSV 文件,获取 DataFrame val df = sparkSession.read.option("header", "true").csv(originPath) // 字段集合 val indiaCityDaySchema= Array("City", "Date", "PM2.5", "NO", "NO2", "NOx", "CO", "SO2") // 过滤掉有空值的数据行 val indiaCityDaySchema = new StructType() .add("City", StringType, true) .add("Date", StringType, true) .add("PM2.5", FloatType, true) .add("NO", FloatType, true) .add("NO2", FloatType, true) .add("NOx", FloatType, true) .add("CO", FloatType, true) .add("SO2", FloatType, true) // 保存数据到 Hive 表 table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_india_city_day") // 释放资源 sparkSession.stop() } } ``` ##### italy ETL >Airital.csv ```scala package cn.lhz.italy.etl import cn.lhz.util.spark.SparkUtil import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.IntegerType /** * 对 italy.csv 进行数据清洗,过滤掉包含空值的行,并保存至 Hive 表 * * @author 胡书源 * @version 1.0.0 */ object ItalyEtl { def main(args: Array[String]): Unit = { // 获取 SparkSession 对象 val sparkSession = SparkUtil() // 定义源数据在HDFS文件系统的路径 val originPath: String = "/air-data/ods/italy.csv" // 读取 CSV 文件,获取 DataFrame val df = sparkSession.read.option("header", "true").csv(originPath) // 字段集合 val cols = Array( "Date", "Time", "CO(GT)", "PT08.S1(CO)", "NMHC(GT)", "C6H6(GT)", "PT08.S2(NMHC)", "NOx(GT)", "PT08.S3(NOx)", "NO2(GT)", "PT08.S4(NO2)", "PT08.S5(O3)", "T", "RH", "AH" ) // 过滤掉有空值的数据行 val cleanedDf = df.na.drop(cols) // 为字段指定数据类型并重命名字段 val table = cleanedDf.select( col("Date").alias("date"), col("Time").alias("time"), col("CO(GT)").cast(IntegerType).alias("co_gt"), col("PT08.S1(CO)").cast(IntegerType).alias("pt08_s1_co"), col("NMHC(GT)").cast(IntegerType).alias("nmhc_gt"), col("C6H6(GT)").cast(IntegerType).alias("c6h6_gt"), col("PT08.S2(NMHC)").cast(IntegerType).alias("pt08_s2_nmhc"), col("NOx(GT)").cast(IntegerType).alias("nox_gt"), col("PT08.S3(NOx)").cast(IntegerType).alias("pt08_s3_nox"), col("NO2(GT)").cast(IntegerType).alias("no2_gt"), col("PT08.S4(NO2)").cast(IntegerType).alias("pt08_s4_no2"), col("PT08.S5(O3)").cast(IntegerType).alias("pt08_s5_o3"), col("T").cast(IntegerType).alias("temperature"), col("RH").cast(IntegerType).alias("relative_humidity"), col("AH").cast(IntegerType).alias("absolute_humidity") ) // 保存数据到 Hive 表 table.write.mode(SaveMode.Overwrite).saveAsTable("air.dwd_air_italy") // 释放资源 sparkSession.stop() } } ``` **服务器提交job** ```bash spark-submit --master yarn --deploy-mode cluster --class cn.lhz.cardio.CardioEtl heart-spark.jar ``` ## 数据接口 > springboot工程 > ### pom.xml ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 3.3.5 cn.lhz air-api 0.0.1 air-api air-api 21 21 ${jdk.version} ${jdk.version} ${jdk.version} utf-8 utf-8 UTF-8 true true 2.17.0 3.17.0 1.2.23 2.1.4 2.0.53 2.0.53 2.11.0 1.1.2 5.8.32 5.11.2 2.0.0 2.18.0 2.9.18 1.18.34 3.5.9 3.0.4 9.1.0 3.4.0 0.289 org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-starter-web com.baomidou mybatis-plus-spring-boot3-starter ${mybatis-plus.version} com.baomidou mybatis-plus-jsqlparser ${mybatis-plus.version} com.baomidou mybatis-plus-generator ${mybatis-plus.version} org.springframework.boot spring-boot-configuration-processor true org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test cn.hutool hutool-all ${hutool.version} com.github.binarywang java-testdata-generator 1.1.2 org.apache.commons commons-lang3 ${commons-lang3.version} commons-io commons-io ${commons-io.version} com.google.code.gson gson ${gson.version} org.webjars layui ${layui.version} com.alibaba druid ${druid.version} com.mysql mysql-connector-j ${mysql.version} com.github.xiaoymin knife4j-openapi3-jakarta-spring-boot-starter 4.5.0 org.apache.maven.plugins maven-compiler-plugin 3.13.0 UTF-8 ${jdk.version} ${jdk.version} org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok ${project.name} public aliyun nexus https://maven.aliyun.com/repository/public true public aliyun nexus https://maven.aliyun.com/repository/public true false ```