1、项目介绍

技术栈

采用 Python 语言开发,基于 Django 框架搭建后端服务,使用 MySQL 数据库进行数据存储,前端利用 Echarts 实现可视化展示并结合 HTML 构建页面。通过 requests 爬虫技术从天气后报网站采集空气质量数据,运用机器学习线性回归模型根据 PM2.5、SO₂、NO₂、O₃ 四个指标预测 AQI,集成大数据技术栈(Hadoop、Hive、Spark)进行分布式数据处理。

功能模块

· 首页(各城市日期筛选查看 AQI 均值分析、气体分析)
· 数据列表
· 空气质量年度分析
· AQI 月度分析
· 气体分析
· 城市分析
· 词云图
· AQI 预测
· 注册登录
· Spark 分析
· 爬虫(数据采集)
· 后台用户管理

项目介绍

本系统基于 Django 框架构建空气质量数据可视化分析平台,通过爬虫技术从天气后报网站批量采集多城市空气质量数据,存入 MySQL 数据库,并利用 Spark 进行分布式计算与统计分析。系统提供首页多城市 AQI 与气体分析、数据列表查询、年度与月度 AQI 变化趋势、PM 颗粒物分析、气体污染物浓度分析、城市 AQI 地理分布、空气质量词云图等功能。用户可输入 PM2.5、SO₂、NO₂、O₃ 特征值,通过线性回归模型预测 AQI 数值与等级。系统还包含注册登录与后台用户管理模块。

2、项目界面

1 、首页-各城市日期筛选查看AQI、各气体分析
该页面为空气质量数据可视化分析系统,支持选择日期与城市,通过柱状图展示多城市AQI均值,以仪表盘呈现单城市AQI等级,用折线图分析各气体污染物含量,直观展示城市空气质量相关数据。
在这里插入图片描述

2、空气质量、颗粒物年度分析–年度城市筛选分析
该页面为空气质量数据可视化分析系统的年度分析模块,支持选择年份与城市,通过折线图展示选定城市全年AQI最大值与最小值的月度变化趋势,同时可查看PM颗粒物等污染物的相关分析,直观呈现城市空气质量的年度波动情况。
在这里插入图片描述

3、AQI月度分析—月份城市筛选
该页面为空气质量数据可视化分析系统的月度分析模块,支持选择年份、月份与城市,通过柱状图与折线图结合的方式展示选定城市当月每日AQI值及排名,直观呈现月度内每日空气质量的变化情况,辅助月度空气质量分析。
在这里插入图片描述

4、城市分析----中国地图各城市分布、按月份筛选
该页面为空气质量数据可视化分析系统的城市分布模块,支持选择年份与月份,通过中国地图热力图的形式直观展示对应时段全国各城市AQI均值的地理分布情况,可查看单个城市的具体空气质量数据,直观呈现区域空气质量的空间分布特征。

在这里插入图片描述

5、气体分析-----SO2 NO2 CO O3
该页面为空气质量数据可视化分析系统的气体分析模块,通过折线图展示多城市二氧化硫与二氧化氮的最大值对比,同时以饼图分别呈现一氧化碳、臭氧的含量分布情况,直观呈现不同城市各类气体污染物的浓度特征与占比规律。

在这里插入图片描述

6、数据中心
该页面为空气质量数据可视化分析系统的数据总览模块,以表格形式集中展示各城市每日空气质量相关数据,支持分页浏览、搜索查询与条目数量调整,可查看城市、污染物浓度等详细信息,实现空气质量数据的集中查阅与管理。
在这里插入图片描述

7、词云图分析
该页面为空气质量数据可视化分析系统的数据词云图模块,通过词云的形式直观展示各类空气质量等级的出现频次,高频等级以更大字号呈现,可快速识别整体空气质量的主要特征与分布情况,直观呈现空气质量等级的统计规律。
在这里插入图片描述

8、AQI预测----输入特征值:PM值 SO2值 NO2值 O3值
该页面为空气质量数据可视化分析系统的空气质量预测模块,支持输入PM值、SO2值、NO2值、O3值等污染物特征数据,点击查看后即可输出对应的AQI数值与空气质量等级预测结果,实现基于污染物浓度的空气质量智能预测。

在这里插入图片描述

9、注册登录
该页面为空气质量数据分析可视化系统的登录入口,提供用户名、密码输入框,支持记住我功能,同时提供注册入口,用于验证用户身份,保障系统访问安全,是进入系统各功能模块的前置入口。

在这里插入图片描述

10、Spark大数据分析
该页面是空气质量数据分析可视化系统的后端数据处理模块,基于Spark实现空气质量数据的分布式计算,可完成城市AQI均值统计等数据处理任务,并将计算结果写入数据库,为前端可视化分析提供数据支撑,同时支持Hive数据仓库的集成与数据读取。
在这里插入图片描述

11、数据采集
该页面是空气质量数据分析可视化系统的数据采集模块,基于Python爬虫实现,可按指定年份、月份,批量爬取多个目标城市的空气质量数据,包含AQI、各类污染物浓度等信息,为系统的数据分析与可视化提供原始数据支撑。

在这里插入图片描述

12、后台数据管理
该页面为空气质量数据分析可视化系统的后台管理用户表模块,以列表形式展示系统用户信息,支持分页浏览、用户增加与删除操作,可对系统用户进行集中管理,保障系统用户权限与账号的有序维护。

在这里插入图片描述

3、项目说明

一、技术栈简要说明

本系统采用 Python 语言开发,基于 Django 框架搭建后端服务,使用 MySQL 数据库进行数据存储,前端利用 Echarts 实现可视化展示并结合 HTML 构建页面。通过 requests 爬虫技术从天气后报网站采集空气质量数据,运用机器学习线性回归模型根据 PM2.5、SO₂、NO₂、O₃ 四个指标预测 AQI,集成大数据技术栈(Hadoop、Hive、Spark)进行分布式数据处理。

二、功能模块详细介绍

· 首页(各城市日期筛选查看 AQI 均值分析、气体分析)
该页面支持选择日期与城市,通过柱状图展示多城市 AQI 均值,以仪表盘呈现单城市 AQI 等级,用折线图分析 PM2.5、PM10、SO₂、NO₂、CO、O₃ 等气体污染物含量,直观展示城市空气质量相关数据。

· 数据列表
该页面以表格形式集中展示各城市每日空气质量相关数据,支持分页浏览、搜索查询与条目数量调整,可查看城市、各类污染物浓度等详细信息,实现空气质量数据的集中查阅与管理。

· 空气质量年度分析
该模块支持选择年份与城市,通过折线图展示选定城市全年 AQI 最大值与最小值的月度变化趋势,同时可查看 PM 颗粒物等污染物的相关分析,直观呈现城市空气质量的年度波动情况。

· AQI 月度分析
该模块支持选择年份、月份与城市,通过柱状图与折线图结合的方式展示选定城市当月每日 AQI 值及排名,直观呈现月度内每日空气质量的变化情况,辅助月度空气质量分析。

· 气体分析
该模块通过折线图展示多城市二氧化硫与二氧化氮的最大值对比,同时以饼图分别呈现一氧化碳、臭氧的含量分布情况,直观呈现不同城市各类气体污染物的浓度特征与占比规律。

· 城市分析
该模块支持选择年份与月份,通过中国地图热力图的形式直观展示对应时段全国各城市 AQI 均值的地理分布情况,可查看单个城市的具体空气质量数据,直观呈现区域空气质量的空间分布特征。

· 词云图
该模块通过词云的形式直观展示各类空气质量等级的出现频次,高频等级以更大字号呈现,可快速识别整体空气质量的主要特征与分布情况,直观呈现空气质量等级的统计规律。

· AQI 预测
该模块支持用户输入 PM 值、SO₂ 值、NO₂ 值、O₃ 值等污染物特征数据,系统利用线性回归模型输出对应的 AQI 数值与空气质量等级预测结果,实现基于污染物浓度的空气质量智能预测。

· 注册登录
该页面提供用户名、密码输入框,支持记住我功能,同时提供注册入口,用于验证用户身份,保障系统访问安全,是进入系统各功能模块的前置入口。

· Spark 分析
该模块基于 Spark 实现空气质量数据的分布式计算,可完成城市 AQI 均值统计等数据处理任务,并将计算结果写入数据库,为前端可视化分析提供数据支撑,同时支持 Hive 数据仓库的集成与数据读取。

· 爬虫(数据采集)
该模块基于 Python 爬虫实现,可按指定年份、月份批量爬取多个目标城市的空气质量数据,包含 AQI、各类污染物浓度等信息,为系统的数据分析与可视化提供原始数据支撑。

· 后台用户管理
该模块以列表形式展示系统用户信息,支持分页浏览、用户增加与删除操作,可对系统用户进行集中管理,保障系统用户权限与账号的有序维护。

三、项目总结

本系统基于 Django 框架构建空气质量数据可视化分析平台,通过爬虫技术从天气后报网站批量采集多城市空气质量数据,存入 MySQL 数据库,并利用 Spark 进行分布式计算与统计分析。系统提供首页多城市 AQI 与气体分析、数据列表查询、年度与月度 AQI 变化趋势、PM 颗粒物分析、气体污染物浓度分析、城市 AQI 地理分布、空气质量词云图等可视化功能。用户可输入 PM2.5、SO₂、NO₂、O₃ 特征值,通过线性回归模型预测 AQI 数值与等级。系统还包含注册登录与后台用户管理模块,为环境监测、健康出行和污染治理提供了数据支撑与决策参考。

4、核心代码

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count,mean,col,sum,when,year,month,max,min,avg
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType

if __name__ == '__main__':
    #构建
    spark = SparkSession.builder.appName("sparkSQL").master("local[*]").\
        config("spark.sql.shuffle.partitions", 2). \
        config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \
        config("hive.metastore.uris", "thrift://node1:9083"). \
        enableHiveSupport().\
        getOrCreate()

    sc = spark.sparkContext

    #读取
    airdata = spark.read.table("airdata")

    #需求1 城市平均AQI
    print("-------------------需求1 城市平均AQI--------------------")
    result1 = airdata.groupby("city")\
        .agg(mean("AQI").alias("avg_AQI"))\
        .orderBy("avg_AQI",ascending=False)

    #df
    df = result1.toPandas()
    # print(df)

    # sql
    result1.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "avgCityAqi"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result1.write.mode("overwrite").saveAsTable("avgCityAqi", "parquet")
    spark.sql("select * from avgCityAqi").show()

    #需求2 各气体分析
    print("-------------------需求2 各气体分析--------------------")
    result2 = airdata.groupby("city") \
        .agg(
        mean("PM").alias("avg_PM"),
        mean("PM10").alias("avg_PM10"),
        mean("So2").alias("avg_So2"),
        mean("No2").alias("avg_No2"),
        mean("Co").alias("avg_Co"),
        mean("O3").alias("avg_O3"),
             )

    # sql
    result2.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "avgCitySix"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result2.write.mode("overwrite").saveAsTable("avgCitySix", "parquet")
    spark.sql("select * from avgCitySix").show()


    #需求三 年度空气质量分析
    print("-------------------需求3 年度空气质量分析--------------------")
    airdata = airdata.withColumn("date",airdata["date"].cast("date"))

    result3 = airdata.groupby("city",year("date").alias("year"),month("date").alias("month"))\
        .agg(
        max("AQI").alias("max_AQI"),
        min("AQI").alias("min_AQI")
    )

    result3.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "maxCityAqi"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result3.write.mode("overwrite").saveAsTable("maxCityAqi", "parquet")
    spark.sql("select * from maxCityAqi").show()

    #需求四
    print("-------------------需求4  计算每个城市每年每月的 PM 和 PM10 的平均值--------------------")
    result4 = airdata.groupby("city", year("date").alias("year"), month("date").alias("month")) \
        .agg(
        avg("PM").alias("max_PM"),
        avg("PM10").alias("min_PM10")
    )

    result4.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "avgCityPM"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result4.write.mode("overwrite").saveAsTable("avgCityPM", "parquet")
    spark.sql("select * from avgCityPM").show()

    #需求五
    print("-------------------需求5  计算每个城市每年每月 AQI 小于 50 的天数。--------------------")
    result5 = airdata.groupby("city",year("date").alias("year"),month("date").alias("month"))\
        .agg(
        count(when(airdata["AQI"] < 50, True)).alias("greatAirCount")
    )
    result5.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "greatAir"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result5.write.mode("overwrite").saveAsTable("greatAir", "parquet")
    spark.sql("select * from greatAir").show()


    #需求6
    print("-------------------需求6 计算每个城市的最大 So2 和 No2 值--------------------")
    result6 = airdata.groupby("city")\
        .agg(
        max("So2").alias("max_So2"),
        max("No2").alias("max_No2")
    )

    result6.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "maxSN"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result6.write.mode("overwrite").saveAsTable("maxSN", "parquet")
    spark.sql("select * from maxSN").show()

    #需求7
    print("-------------------需求7 将 Co 值分为不同的区间,并计算每个区间的记录数--------------------")
    airdata = airdata.withColumn(
        "Co_category",
        when((col("Co") >= 0) & (col("Co") < 0.25),'0-0.25')
        .when((col("Co") >= 0.25) & (col("Co") < 0.5),'0.25-0.5')
        .when((col("Co") >= 0.5)& (col("Co") < 0.75), '0.5-0.75')
        .when((col("Co") >= 0.75) & (col("Co") < 1.0), '0.75-1')
        .otherwise("1以上")
    )
    result7 = airdata.groupby("Co_category").agg(count('*').alias('Co_count'))

    result7.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "CoCategory"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result7.write.mode("overwrite").saveAsTable("CoCategory", "parquet")
    spark.sql("select * from CoCategory").show()

    #需求8
    print("-------------------需求8 O3 值分为不同的区间,并计算每个区间的记录数--------------------")
    airdata = airdata.withColumn(
        "O3_category",
        when((col("O3") >= 0) & (col("O3") < 25), '0-25')
        .when((col("O3") >= 0.25) & (col("O3") < 50), '25-50')
        .when((col("O3") >= 50) & (col("O3") < 75), '50-75')
        .when((col("O3") >= 75) & (col("O3") < 100), '75-100')
        .otherwise("100以上")
    )
    result8 = airdata.groupby("O3_category").agg(count('*').alias('O3_count'))

    result8.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "O3Category"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    result8.write.mode("overwrite").saveAsTable("O3Category", "parquet")
    spark.sql("select * from O3Category").show()


    #需求9
    print("-------------------需求9 计算每个城市每年每月的平均 AQI--------------------")
    reuslt9 = airdata.groupby("city",year("date").alias("year"), month("date").alias("month"))\
        .agg(avg("AQI").alias("month_AQI"))

    reuslt9.write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
        option("dbtable", "monthAQI"). \
        option("user", "root"). \
        option("password", "root"). \
        option("encoding", "utf-8"). \
        save()

    reuslt9.write.mode("overwrite").saveAsTable("monthAQI", "parquet")
    spark.sql("select * from monthAQI").show()

5、项目列表

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐