计算机毕业设计:Python大气污染物浓度预测与可视化系统 Django框架 Spark 线性回归 可视化 大数据 机器学习 深度学习(建议收藏)✅
1、项目介绍
技术栈
采用 Python 语言开发,基于 Django 框架搭建后端服务,使用 MySQL 数据库进行数据存储,前端利用 Echarts 实现可视化展示并结合 HTML 构建页面。通过 requests 爬虫技术从天气后报网站采集空气质量数据,运用机器学习线性回归模型根据 PM2.5、SO₂、NO₂、O₃ 四个指标预测 AQI,集成大数据技术栈(Hadoop、Hive、Spark)进行分布式数据处理。
功能模块
· 首页(各城市日期筛选查看 AQI 均值分析、气体分析)
· 数据列表
· 空气质量年度分析
· AQI 月度分析
· 气体分析
· 城市分析
· 词云图
· AQI 预测
· 注册登录
· Spark 分析
· 爬虫(数据采集)
· 后台用户管理
项目介绍
本系统基于 Django 框架构建空气质量数据可视化分析平台,通过爬虫技术从天气后报网站批量采集多城市空气质量数据,存入 MySQL 数据库,并利用 Spark 进行分布式计算与统计分析。系统提供首页多城市 AQI 与气体分析、数据列表查询、年度与月度 AQI 变化趋势、PM 颗粒物分析、气体污染物浓度分析、城市 AQI 地理分布、空气质量词云图等功能。用户可输入 PM2.5、SO₂、NO₂、O₃ 特征值,通过线性回归模型预测 AQI 数值与等级。系统还包含注册登录与后台用户管理模块。
2、项目界面
1 、首页-各城市日期筛选查看AQI、各气体分析
该页面为空气质量数据可视化分析系统,支持选择日期与城市,通过柱状图展示多城市AQI均值,以仪表盘呈现单城市AQI等级,用折线图分析各气体污染物含量,直观展示城市空气质量相关数据。
2、空气质量、颗粒物年度分析–年度城市筛选分析
该页面为空气质量数据可视化分析系统的年度分析模块,支持选择年份与城市,通过折线图展示选定城市全年AQI最大值与最小值的月度变化趋势,同时可查看PM颗粒物等污染物的相关分析,直观呈现城市空气质量的年度波动情况。
3、AQI月度分析—月份城市筛选
该页面为空气质量数据可视化分析系统的月度分析模块,支持选择年份、月份与城市,通过柱状图与折线图结合的方式展示选定城市当月每日AQI值及排名,直观呈现月度内每日空气质量的变化情况,辅助月度空气质量分析。
4、城市分析----中国地图各城市分布、按月份筛选
该页面为空气质量数据可视化分析系统的城市分布模块,支持选择年份与月份,通过中国地图热力图的形式直观展示对应时段全国各城市AQI均值的地理分布情况,可查看单个城市的具体空气质量数据,直观呈现区域空气质量的空间分布特征。

5、气体分析-----SO2 NO2 CO O3
该页面为空气质量数据可视化分析系统的气体分析模块,通过折线图展示多城市二氧化硫与二氧化氮的最大值对比,同时以饼图分别呈现一氧化碳、臭氧的含量分布情况,直观呈现不同城市各类气体污染物的浓度特征与占比规律。

6、数据中心
该页面为空气质量数据可视化分析系统的数据总览模块,以表格形式集中展示各城市每日空气质量相关数据,支持分页浏览、搜索查询与条目数量调整,可查看城市、污染物浓度等详细信息,实现空气质量数据的集中查阅与管理。
7、词云图分析
该页面为空气质量数据可视化分析系统的数据词云图模块,通过词云的形式直观展示各类空气质量等级的出现频次,高频等级以更大字号呈现,可快速识别整体空气质量的主要特征与分布情况,直观呈现空气质量等级的统计规律。
8、AQI预测----输入特征值:PM值 SO2值 NO2值 O3值
该页面为空气质量数据可视化分析系统的空气质量预测模块,支持输入PM值、SO2值、NO2值、O3值等污染物特征数据,点击查看后即可输出对应的AQI数值与空气质量等级预测结果,实现基于污染物浓度的空气质量智能预测。

9、注册登录
该页面为空气质量数据分析可视化系统的登录入口,提供用户名、密码输入框,支持记住我功能,同时提供注册入口,用于验证用户身份,保障系统访问安全,是进入系统各功能模块的前置入口。

10、Spark大数据分析
该页面是空气质量数据分析可视化系统的后端数据处理模块,基于Spark实现空气质量数据的分布式计算,可完成城市AQI均值统计等数据处理任务,并将计算结果写入数据库,为前端可视化分析提供数据支撑,同时支持Hive数据仓库的集成与数据读取。
11、数据采集
该页面是空气质量数据分析可视化系统的数据采集模块,基于Python爬虫实现,可按指定年份、月份,批量爬取多个目标城市的空气质量数据,包含AQI、各类污染物浓度等信息,为系统的数据分析与可视化提供原始数据支撑。

12、后台数据管理
该页面为空气质量数据分析可视化系统的后台管理用户表模块,以列表形式展示系统用户信息,支持分页浏览、用户增加与删除操作,可对系统用户进行集中管理,保障系统用户权限与账号的有序维护。

3、项目说明
一、技术栈简要说明
本系统采用 Python 语言开发,基于 Django 框架搭建后端服务,使用 MySQL 数据库进行数据存储,前端利用 Echarts 实现可视化展示并结合 HTML 构建页面。通过 requests 爬虫技术从天气后报网站采集空气质量数据,运用机器学习线性回归模型根据 PM2.5、SO₂、NO₂、O₃ 四个指标预测 AQI,集成大数据技术栈(Hadoop、Hive、Spark)进行分布式数据处理。
二、功能模块详细介绍
· 首页(各城市日期筛选查看 AQI 均值分析、气体分析)
该页面支持选择日期与城市,通过柱状图展示多城市 AQI 均值,以仪表盘呈现单城市 AQI 等级,用折线图分析 PM2.5、PM10、SO₂、NO₂、CO、O₃ 等气体污染物含量,直观展示城市空气质量相关数据。
· 数据列表
该页面以表格形式集中展示各城市每日空气质量相关数据,支持分页浏览、搜索查询与条目数量调整,可查看城市、各类污染物浓度等详细信息,实现空气质量数据的集中查阅与管理。
· 空气质量年度分析
该模块支持选择年份与城市,通过折线图展示选定城市全年 AQI 最大值与最小值的月度变化趋势,同时可查看 PM 颗粒物等污染物的相关分析,直观呈现城市空气质量的年度波动情况。
· AQI 月度分析
该模块支持选择年份、月份与城市,通过柱状图与折线图结合的方式展示选定城市当月每日 AQI 值及排名,直观呈现月度内每日空气质量的变化情况,辅助月度空气质量分析。
· 气体分析
该模块通过折线图展示多城市二氧化硫与二氧化氮的最大值对比,同时以饼图分别呈现一氧化碳、臭氧的含量分布情况,直观呈现不同城市各类气体污染物的浓度特征与占比规律。
· 城市分析
该模块支持选择年份与月份,通过中国地图热力图的形式直观展示对应时段全国各城市 AQI 均值的地理分布情况,可查看单个城市的具体空气质量数据,直观呈现区域空气质量的空间分布特征。
· 词云图
该模块通过词云的形式直观展示各类空气质量等级的出现频次,高频等级以更大字号呈现,可快速识别整体空气质量的主要特征与分布情况,直观呈现空气质量等级的统计规律。
· AQI 预测
该模块支持用户输入 PM 值、SO₂ 值、NO₂ 值、O₃ 值等污染物特征数据,系统利用线性回归模型输出对应的 AQI 数值与空气质量等级预测结果,实现基于污染物浓度的空气质量智能预测。
· 注册登录
该页面提供用户名、密码输入框,支持记住我功能,同时提供注册入口,用于验证用户身份,保障系统访问安全,是进入系统各功能模块的前置入口。
· Spark 分析
该模块基于 Spark 实现空气质量数据的分布式计算,可完成城市 AQI 均值统计等数据处理任务,并将计算结果写入数据库,为前端可视化分析提供数据支撑,同时支持 Hive 数据仓库的集成与数据读取。
· 爬虫(数据采集)
该模块基于 Python 爬虫实现,可按指定年份、月份批量爬取多个目标城市的空气质量数据,包含 AQI、各类污染物浓度等信息,为系统的数据分析与可视化提供原始数据支撑。
· 后台用户管理
该模块以列表形式展示系统用户信息,支持分页浏览、用户增加与删除操作,可对系统用户进行集中管理,保障系统用户权限与账号的有序维护。
三、项目总结
本系统基于 Django 框架构建空气质量数据可视化分析平台,通过爬虫技术从天气后报网站批量采集多城市空气质量数据,存入 MySQL 数据库,并利用 Spark 进行分布式计算与统计分析。系统提供首页多城市 AQI 与气体分析、数据列表查询、年度与月度 AQI 变化趋势、PM 颗粒物分析、气体污染物浓度分析、城市 AQI 地理分布、空气质量词云图等可视化功能。用户可输入 PM2.5、SO₂、NO₂、O₃ 特征值,通过线性回归模型预测 AQI 数值与等级。系统还包含注册登录与后台用户管理模块,为环境监测、健康出行和污染治理提供了数据支撑与决策参考。
4、核心代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count,mean,col,sum,when,year,month,max,min,avg
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType
if __name__ == '__main__':
#构建
spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\
config("spark.sql.shuffle.partitions", 2). \
config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \
config("hive.metastore.uris", "thrift://node1:9083"). \
enableHiveSupport().\
getOrCreate()
sc = spark.sparkContext
#读取
airdata = spark.read.table("airdata")
#需求1 城市平均AQI
print("-------------------需求1 城市平均AQI--------------------")
result1 = airdata.groupby("city")\
.agg(mean("AQI").alias("avg_AQI"))\
.orderBy("avg_AQI",ascending=False)
#df
df = result1.toPandas()
# print(df)
# sql
result1.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "avgCityAqi"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result1.write.mode("overwrite").saveAsTable("avgCityAqi", "parquet")
spark.sql("select * from avgCityAqi").show()
#需求2 各气体分析
print("-------------------需求2 各气体分析--------------------")
result2 = airdata.groupby("city") \
.agg(
mean("PM").alias("avg_PM"),
mean("PM10").alias("avg_PM10"),
mean("So2").alias("avg_So2"),
mean("No2").alias("avg_No2"),
mean("Co").alias("avg_Co"),
mean("O3").alias("avg_O3"),
)
# sql
result2.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "avgCitySix"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result2.write.mode("overwrite").saveAsTable("avgCitySix", "parquet")
spark.sql("select * from avgCitySix").show()
#需求三 年度空气质量分析
print("-------------------需求3 年度空气质量分析--------------------")
airdata = airdata.withColumn("date",airdata["date"].cast("date"))
result3 = airdata.groupby("city",year("date").alias("year"),month("date").alias("month"))\
.agg(
max("AQI").alias("max_AQI"),
min("AQI").alias("min_AQI")
)
result3.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "maxCityAqi"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result3.write.mode("overwrite").saveAsTable("maxCityAqi", "parquet")
spark.sql("select * from maxCityAqi").show()
#需求四
print("-------------------需求4 计算每个城市每年每月的 PM 和 PM10 的平均值--------------------")
result4 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month")) \
.agg(
avg("PM").alias("max_PM"),
avg("PM10").alias("min_PM10")
)
result4.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "avgCityPM"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result4.write.mode("overwrite").saveAsTable("avgCityPM", "parquet")
spark.sql("select * from avgCityPM").show()
#需求五
print("-------------------需求5 计算每个城市每年每月 AQI 小于 50 的天数。--------------------")
result5 = airdata.groupby("city",year("date").alias("year"),month("date").alias("month"))\
.agg(
count(when(airdata["AQI"] < 50, True)).alias("greatAirCount")
)
result5.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "greatAir"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result5.write.mode("overwrite").saveAsTable("greatAir", "parquet")
spark.sql("select * from greatAir").show()
#需求6
print("-------------------需求6 计算每个城市的最大 So2 和 No2 值--------------------")
result6 = airdata.groupby("city")\
.agg(
max("So2").alias("max_So2"),
max("No2").alias("max_No2")
)
result6.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "maxSN"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result6.write.mode("overwrite").saveAsTable("maxSN", "parquet")
spark.sql("select * from maxSN").show()
#需求7
print("-------------------需求7 将 Co 值分为不同的区间,并计算每个区间的记录数--------------------")
airdata = airdata.withColumn(
"Co_category",
when((col("Co") >= 0) & (col("Co") < 0.25),'0-0.25')
.when((col("Co") >= 0.25) & (col("Co") < 0.5),'0.25-0.5')
.when((col("Co") >= 0.5)& (col("Co") < 0.75), '0.5-0.75')
.when((col("Co") >= 0.75) & (col("Co") < 1.0), '0.75-1')
.otherwise("1以上")
)
result7 = airdata.groupby("Co_category").agg(count('*').alias('Co_count'))
result7.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "CoCategory"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result7.write.mode("overwrite").saveAsTable("CoCategory", "parquet")
spark.sql("select * from CoCategory").show()
#需求8
print("-------------------需求8 O3 值分为不同的区间,并计算每个区间的记录数--------------------")
airdata = airdata.withColumn(
"O3_category",
when((col("O3") >= 0) & (col("O3") < 25), '0-25')
.when((col("O3") >= 0.25) & (col("O3") < 50), '25-50')
.when((col("O3") >= 50) & (col("O3") < 75), '50-75')
.when((col("O3") >= 75) & (col("O3") < 100), '75-100')
.otherwise("100以上")
)
result8 = airdata.groupby("O3_category").agg(count('*').alias('O3_count'))
result8.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "O3Category"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result8.write.mode("overwrite").saveAsTable("O3Category", "parquet")
spark.sql("select * from O3Category").show()
#需求9
print("-------------------需求9 计算每个城市每年每月的平均 AQI--------------------")
reuslt9 = airdata.groupby("city",year("date").alias("year"), month("date").alias("month"))\
.agg(avg("AQI").alias("month_AQI"))
reuslt9.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "monthAQI"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
reuslt9.write.mode("overwrite").saveAsTable("monthAQI", "parquet")
spark.sql("select * from monthAQI").show()
5、项目列表





AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)