计算机毕业设计:Python猫眼电影数据分析与票房预测系统 Spark Hadoop Django 协同过滤 随机森林 大数据(建议收藏)✅
1、项目介绍
一、技术栈
大数据、Hadoop、Spark、Hive、虚拟机、Python语言、Django框架、MySQL数据库、猫眼电影、协同过滤推荐算法、机器学习随机森林回归模型、票房预测
二、功能模块
· 电影数据大屏
· 电影类型分析
· 电影评分分析
· 电影时间分析
· 票房分析
· 数据中心
· 电影推荐
· 词云图分析
· 票房预测
· 我的收藏
· 后台管理
· Spark数据分析
· 数据采集
三、项目介绍
本系统基于Spark大数据框架与Python语言开发,采用Django构建Web应用架构,结合Hadoop和Hive实现海量电影数据的分布式存储与高效计算。系统通过爬虫技术从猫眼电影平台采集电影信息,利用Echarts进行可视化展示。核心功能包括电影数据多维度分析、基于随机森林回归模型的票房预测、以及运用协同过滤算法的个性化电影推荐。系统覆盖数据采集、Spark统计分析、Web展示、推荐系统与后台管理全流程,为电影行业数据分析与决策提供技术支撑。
2、项目界面
(1)电影数据大屏
该电影数据可视化分析大屏展示了电影类型分布环形图、核心统计卡片、TOP10电影票房评分双轴图表、类型票房趋势折线图、发行地区分布条形图及星级占比金字塔图,同时设有进入系统入口,可实现电影数据多维度可视化展示与分析。

(2)电影类型分析
该电影数据分析可视化系统的类型分析页面展示了指定类型电影的票房TOP10折线图、评分分布柱状图及各类型电影平均时长环形图,左侧导航栏提供评分分析、时间分析、票房分析、电影数据、票房预测、电影推荐、词云分析、个人信息管理、数据大屏及后台管理等功能入口,可实现电影数据多维度可视化分析与全流程管理。
(3)电影评分分析
该电影数据分析可视化系统的评分分析页面展示了年度评分均值柱状图、地区评分均值折线图及类型评分分析模块,左侧导航栏提供类型分析、时间分析、票房分析、电影数据、票房预测、电影推荐、词云分析、个人信息管理、数据大屏及后台管理等功能入口,可实现电影评分维度多维度可视化分析与全流程数据管理。
(4)电影时间分析
该电影数据分析可视化系统的时间分析页面展示了电影年度产量柱状图、年月产量柱状图及时长分类饼图,左侧导航栏提供类型分析、评分分析、票房分析、电影数据、票房预测、电影推荐、词云分析、个人信息管理、数据大屏及后台管理等功能入口,可实现电影时间维度的多维度可视化分析与全流程数据管理。

(4)票房分析
该页面为电影数据分析可视化系统的票房分析模块,展示了各类型票房均值、年度票房均值、各地区票房分析的可视化图表,可直观呈现不同维度下的电影票房分布与趋势,同时系统还具备类型分析、评分分析、时间分析、数据总览、票房预测、电影推荐及数据词云、个人信息管理、后台管理等功能模块。
(5)数据中心
该页面为电影数据分析可视化系统的电影数据模块,以表格形式呈现电影相关信息,支持收藏、内容搜索功能,同时系统还具备类型分析、评分分析、时间分析、票房分析、票房预测、电影推荐、数据词云、个人信息管理、数据大屏及后台管理等功能模块。

(6)电影推荐
该页面为电影数据分析可视化系统的电影推荐模块,以卡片形式展示推荐电影的封面、简介、基本信息及收藏功能,同时系统还具备类型分析、评分分析、时间分析、票房分析、票房预测、电影数据、数据词云、个人信息管理、数据大屏及后台管理等功能模块。
(7)词云图分析
该页面为电影数据分析可视化系统的电影词云模块,展示呈现电影相关关键词的词云图,直观呈现电影相关信息的高频词汇特征,同时系统还具备数据分析可视化、类型分析、评分分析、时间分析、票房分析、票房预测、电影推荐、数据词云、个人信息管理、数据大屏及后台管理等功能模块。

(8)票房预测
该页面为电影数据分析可视化系统的票房预测模块,支持选择电影类型、地区及输入首周票房后提交预测,可得到对应的票房预测结果,同时系统还具备类型分析、评分分析、时间分析、票房分析、电影数据、电影推荐、数据词云、个人信息管理、数据大屏及后台管理等功能模块。

(9)我的收藏
该页面为电影数据分析可视化系统的个人收藏模块,以卡片形式展示收藏电影的封面、类型、地区、时长等信息,提供取消收藏操作功能,同时系统还具备数据分析可视化、类型分析、评分分析、时间分析、票房分析、票房预测、电影推荐、数据词云、数据大屏及后台管理等功能模块。

(10)后台管理
该页面为电影数据分析可视化系统的后台管理模块,提供快捷操作入口,可访问历史表、用户表、用户及组相关管理功能,还能查看最近动作记录,同时系统还具备数据分析可视化、类型分析、评分分析、时间分析、票房分析、票房预测、电影推荐、数据词云、个人信息管理等功能模块。

(11)spark数据分析
该页面为电影数据分析项目的代码开发界面,包含基于Spark的电影数据统计分析代码模块,可实现国家平均评分、类型平均评分、时间分析、时长分析、平均票房等统计分析,并将结果存储到数据库,同时项目还包含爬虫、数据处理、Web应用、推荐系统等功能模块。

(12)数据采集
该页面为电影数据分析项目的爬虫开发界面,包含猫眼电影数据爬虫模块,可自动采集电影的标题、类型、地区、时长、评分、简介、导演、演员及票房等信息并保存为csv文件,同时项目还包含数据处理、Spark分析、Web应用、推荐系统、可视化展示及后台管理等功能模块。

3、项目说明
一、技术栈简要说明
本系统基于大数据技术体系构建,采用Hadoop实现分布式存储,Spark进行高效计算,Hive完成数据仓库管理。后端使用Python语言结合Django框架搭建Web应用,MySQL数据库存储结构化数据。数据采集通过爬虫技术从猫眼电影平台获取电影信息。机器学习部分采用随机森林回归模型进行票房预测,协同过滤算法实现个性化电影推荐,前端利用Echarts完成数据可视化展示。
二、功能模块详细介绍
· 电影数据大屏
该页面为系统数据总览入口,集中展示电影类型分布环形图、核心统计指标卡片、TOP10电影票房与评分双轴组合图、类型票房趋势折线图、发行地区分布条形图以及星级占比金字塔图,为用户提供多维度的数据可视化概览。
· 电影类型分析
页面支持选择特定电影类型,展示该类型下票房TOP10折线图、评分分布柱状图以及各类型电影平均时长环形图,帮助用户深入分析不同类型电影的市场表现与特征差异。
· 电影评分分析
该模块从年度评分均值、地区评分均值、类型评分三个维度展开分析,通过柱状图与折线图呈现评分变化趋势,揭示不同时间、地域和类型电影的评分规律。
· 电影时间分析
页面展示电影年度产量柱状图、年月产量柱状图以及时长分类饼图,从时间维度分析电影生产数量的变化趋势与电影时长的分布特征。
· 票房分析
该模块提供各类型票房均值、年度票房均值、各地区票房分析的可视化图表,直观呈现不同维度下的票房分布规律与市场热点。
· 数据中心
以表格形式展示电影详细信息,包含标题、类型、地区、时长、评分、票房等字段,支持关键词搜索与电影收藏功能,方便用户查询与管理电影数据。
· 电影推荐
基于协同过滤推荐算法,为用户生成个性化电影推荐列表。推荐结果以卡片形式呈现,包含电影封面、简介、基本信息及收藏入口,帮助用户发现感兴趣的电影。
· 词云图分析
通过词云图展示电影相关关键词的高频词汇,直观呈现电影标题、类型或简介中的热点词汇分布,帮助用户快速把握电影内容的主题特征。
· 票房预测
该模块支持用户选择电影类型、发行地区并输入首周票房数据,基于随机森林回归模型输出总票房预测结果,为电影市场评估提供参考依据。
· 我的收藏
展示用户收藏的电影列表,以卡片形式呈现收藏电影的封面、类型、地区、时长等信息,支持取消收藏操作,方便用户管理个人偏好。
· 后台管理
面向管理员的系统管理界面,提供历史表、用户表、用户及组管理等快捷操作入口,支持查看最近操作记录,实现系统数据的统一维护。
· Spark数据分析
该模块为代码开发界面,包含基于Spark的统计分析代码,可实现国家平均评分、类型平均评分、时间分析、时长分析、平均票房等统计计算,并将结果存储至数据库。
· 数据采集
内置爬虫代码编辑与运行界面,通过爬虫模块从猫眼电影平台自动采集电影的标题、类型、地区、时长、评分、简介、导演、演员及票房等信息,支持数据保存为CSV文件。
三、项目总结
本系统实现了从数据采集、Spark统计分析到Web可视化展示与机器学习应用的全流程闭环。通过Hadoop与Spark构建的大数据处理能力,系统能够高效处理海量电影数据。多维度分析模块从类型、评分、时间、票房等角度深入挖掘电影市场规律,票房预测模块基于随机森林回归模型提供可靠的票房预估,推荐模块利用协同过滤算法实现个性化电影推荐。系统各模块协同工作,为电影行业的数据分析、市场研究与决策支持提供了完整的技术解决方案。

4、核心代码
#导包
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,FloatType
from pyspark.sql.functions import count,mean,col,sum,when,max,min,avg,explode,split,row_number,year,month
from pyspark.sql.window import Window
if __name__ == '__main__':
#构建
spark = SparkSession.builder.appName("sparkSQL").master("local[*]"). \
config("spark.sql.shuffle.partitions", 2). \
config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse"). \
config("hive.metastore.uris", "thrift://node1:9083"). \
enableHiveSupport(). \
getOrCreate()
#读取
catMovieData = spark.read.table("catMovieData")
#需求一 类型分析 统计
explode_df = catMovieData.withColumn("type",explode(split(col("type"),"-")))
result1 = explode_df.groupBy("type").agg(count("type").alias("type_count"))
# sql
result1.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "movieTypeCount"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result1.write.mode("overwrite").saveAsTable("movieTypeCount", "parquet")
spark.sql("select * from movieTypeCount").show()
#需求二 票房TOP10
sorted_df =catMovieData.orderBy(col("allBoxOffice").desc())
result2 = sorted_df.limit(10)
# sql
result2.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "boxTopMovie"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result2.write.mode("overwrite").saveAsTable("boxTopMovie", "parquet")
spark.sql("select * from boxTopMovie").show()
#需求三 类型最大票房
result3 = explode_df.groupBy("type").agg(
max("firstBoxOffice").alias("max_firstBoxOffice"),
max("allBoxOffice").alias("max_allBoxOffice")
)
# sql
result3.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "typeMaxBox"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result3.write.mode("overwrite").saveAsTable("typeMaxBox", "parquet")
spark.sql("select * from typeMaxBox").show()
#需求四 国家统计
explode_df2 = catMovieData.withColumn("country",explode(split(col("country"),",")))
result4 = explode_df2.groupBy("country").agg(count("country").alias("country_count"))
# sql
result4.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mcountryCount"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result4.write.mode("overwrite").saveAsTable("mcountryCount", "parquet")
spark.sql("select * from mcountryCount").show()
#需求五 评分分类
catMovieData = catMovieData.withColumn(
"rateCategory",
when(col("rate") == 0, None).otherwise(
when((col("rate") >= 10) & (col("rate") < 20), "半星").
when((col("rate") >= 20) & (col("rate") < 30), "1星").
when((col("rate") >= 30) & (col("rate") < 40), "1.5星").
when((col("rate") >= 40) & (col("rate") < 50), "2星").
when((col("rate") >= 50) & (col("rate") < 60), "2.5星").
when((col("rate") >= 60) & (col("rate") < 70), "3星").
when((col("rate") >= 70) & (col("rate") < 80), "3.5星").
when((col("rate") >= 80) & (col("rate") < 90), "4星").
when((col("rate") >= 90) & (col("rate") < 100), "4.5星")
)
)
filter_df = catMovieData.filter(col("rate") != 0)
result5 = filter_df.groupBy("rateCategory").count()
# sql
result5.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "starCategory"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result5.write.mode("overwrite").saveAsTable("starCategory", "parquet")
spark.sql("select * from starCategory").show()
#需求6 按类型票房分析
grouped_df = explode_df.groupBy("type","title").agg({"allBoxOffice":"sum"})
grouped_df = grouped_df.withColumnRenamed("sum(allBoxOffice)","allBoxOffice")
window = Window.partitionBy("type").orderBy(col("allBoxOffice").desc())
ranked_df = grouped_df.withColumn("row_num",row_number().over(window))
#过滤出每组的前十
result6 = ranked_df.filter(col("row_num") <= 10).drop("row_num")
# sql
result6.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "perTypeBox"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result6.write.mode("overwrite").saveAsTable("perTypeBox", "parquet")
spark.sql("select * from perTypeBox").show()
#需求7 按类型评分
filter_df2 = explode_df.filter(col("rate") != 0)
result7 = filter_df2.groupBy("type","rate").agg(count("rate").alias("rate_count"))
# sql
result7.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "perTypeRate"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result7.write.mode("overwrite").saveAsTable("perTypeRate", "parquet")
spark.sql("select * from perTypeRate").show()
#需求8类型平均时长
result8 = explode_df.groupBy("type").agg(avg("duration").alias("avg_duration"))
# sql
result8.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "typeAvgTime"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result8.write.mode("overwrite").saveAsTable("typeAvgTime", "parquet")
spark.sql("select * from typeAvgTime").show()
#需求9 年度平均评分分析
time_df1 = catMovieData.withColumn("releaseTime",col("releaseTime").cast("date"))
time_df2 = catMovieData.withColumn("year",year(col("releaseTime")))
result9 = time_df2.groupBy("year").agg(avg("rate").alias("year_rate"))
# sql
result9.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mYearRate"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result9.write.mode("overwrite").saveAsTable("mYearRate", "parquet")
spark.sql("select * from mYearRate").show()
#需求十 国家平均
result10 = explode_df2.groupBy("country").agg(avg("rate").alias("avg_rate"))
# sql
result10.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mCountryRate"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result10.write.mode("overwrite").saveAsTable("mCountryRate", "parquet")
spark.sql("select * from mCountryRate").show()
#需求十一 类型
result11 = explode_df.groupBy("type").agg(avg("rate").alias("avg_rate"))
# sql
result11.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mTypeRate"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result11.write.mode("overwrite").saveAsTable("mTypeRate", "parquet")
spark.sql("select * from mTypeRate").show()
#需求十二 时间分析
result12 = time_df2.groupBy("year").agg(count("year").alias("year_count"))
# sql
result12.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mYearCount"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result12.write.mode("overwrite").saveAsTable("mYearCount", "parquet")
spark.sql("select * from mYearCount").show()
#月度
time_df3 = time_df1.withColumn("month",month(col("releaseTime")))
result13 = time_df3.groupBy("month").agg(count("month").alias("year_count"))
# sql
result13.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mMonthCount"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result13.write.mode("overwrite").saveAsTable("mMonthCount", "parquet")
spark.sql("select * from mMonthCount").show()
#时长分析
catMovieData = catMovieData.withColumn(
"durationCategory",
when(col("duration") == 0, None).otherwise(
when((col("duration") >= 0) & (col("rate") < 50), "很短").
when((col("duration") >= 50) & (col("rate") < 80), "较短").
when((col("duration") >= 80) & (col("rate") < 120), "中").
when((col("duration") >= 120) & (col("rate") < 150), "较长").
otherwise('很长')
)
)
filter_df4 = catMovieData.filter(col("duration") != 0)
result14 = filter_df4.groupBy("durationCategory").count()
# sql
result14.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mTimeCategory"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result14.write.mode("overwrite").saveAsTable("mTimeCategory", "parquet")
spark.sql("select * from mTimeCategory").show()
#需求15 各类型平均票房
result15 = explode_df.groupBy("type").agg(
avg("allBoxOffice").alias("avg_allBoxOffice")
)
# sql
result15.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mTypeAvgBox"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result15.write.mode("overwrite").saveAsTable("mTypeAvgBox", "parquet")
spark.sql("select * from mTypeAvgBox").show()
#各国家票房
result16 = explode_df2.groupBy("country").agg(
avg("allBoxOffice").alias("avg_allBoxOffice")
)
# sql
result16.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mCountryAvgBox"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result16.write.mode("overwrite").saveAsTable("mCountryAvgBox", "parquet")
spark.sql("select * from mCountryAvgBox").show()
#年度平均票房
result17 = time_df2.groupBy("year").agg(
avg("allBoxOffice").alias("avg_allBoxOffice")
)
# sql
result17.write.mode("overwrite"). \
format("jdbc"). \
option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8"). \
option("dbtable", "mYearAvgBox"). \
option("user", "root"). \
option("password", "root"). \
option("encoding", "utf-8"). \
save()
result17.write.mode("overwrite").saveAsTable("mYearAvgBox", "parquet")
spark.sql("select * from mYearAvgBox").show()
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)