「Spark大数据分析及调度」气象数据分析
in 数据分析 with 0 comment

「Spark大数据分析及调度」气象数据分析

in 数据分析 with 0 comment

项目需求

需求:使用PySpark分析空气质量

数据来源:U.S. Department of State Air Quality Monitoring Program

空气质量标准(来源网络)

空气质量指数pm2.5 健康建议
0-50健康
51-100中等
101-150对敏感人群不健康
151-200不健康
201-300非常不健康
301-500危险
>500爆表

数据来源城市:北京、成都、广州、上海、沈阳

数据流程:数据分析→ES→Kibana

将数据通过PySpark进行数据分析后存入ES,再通过Kibana进行数据可视化展示。

具体处理架构如下:

img

本文不涉及工具部署环节,如有朋友在部署上遇到问题,欢迎通过邮箱或评论联系我

PySpark 数据分析

分析指标:北京市2015年、2016年、2017年空气质量等级对比分析

以下指标分析代码还未完成,正在敲代码中...

  • 同一个城市不同年份的对比

  • 相同年份的不同城市的对比

  • 月份为统计维度

  • 小时为统计维度

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf


def get_grade(value):
    """
    判断输入的pm2.5指数判断空气质量
    0-50    健康
    51-100  中等
    101-150 对敏感人群不健康
    151-200 不健康
    201-300 非常不健康
    301-500 危险
    >500    爆表
    :param value: pm2.5指数
    :return: 空气质量等级
    """
    if value <= 50:
        return '健康'
    elif value <= 100:
        return '中等'
    elif value <= 150:
        return '对敏感人群不健康'
    elif value <= 200:
        return '不健康'
    elif value <= 300:
        return '非常不健康'
    elif value <= 500:
        return '危险'
    elif value > 500:
        return '爆表'
    else:
        return None


spark = SparkSession.builder.appName('project').getOrCreate()

"""
2017年、2016年、2015年数据读取
header:true 设置表头为true
inferSchema:true 设置各字段数据类型自动识别
"""
data2017 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load('hdfs:///hadoop-1:50070/data/Beijing/Beijing_2017_HourlyPM25_created20170803.csv') \
    .select('Year', 'Month', 'Day', 'Hour', 'Value', 'Duration')

data2016 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load('hdfs:///hadoop-1:50070/data/Beijing/Beijing_2016_HourlyPM25_created20170201.csv') \
    .select('Year', 'Month', 'Day', 'Hour', 'Value', 'Duration')

data2015 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load('hdfs:///hadoop-1:50070/data/Beijing/Beijing_2015_HourlyPM25_created20160201.csv') \
    .select('Year', 'Month', 'Day', 'Hour', 'Value', 'Duration')


#   因需要使用withColumn功能返回的是column类型,get_grade()返回的是字符串类型,需通过udf进行转换
grade_function_udf = udf(get_grade, StringType())

#   统计每年Grade出现的次数
group_2017 = data2017\
    .withColumn('Grade', grade_function_udf(data2017['Value']))\
    .groupBy('Grade').count()

group_2016 = data2016\
    .withColumn('Grade', grade_function_udf(data2016['Value']))\
    .groupBy('Grade').count()

group_2015 = data2015.withColumn(
    'Grade', grade_function_udf(
        data2015['Value'])) .groupBy('Grade').count()

#   统计Grade在每年中的占比
result_2017 = group_2017.select('Grade', 'count')\
    .withColumn('precent', group_2017['count'] / data2017.count() * 100)

result_2016 = group_2016.select('Grade', 'count')\
    .withColumn('precent', group_2016['count'] / data2016.count() * 100)

result_2015 = group_2015.select('Grade', 'count')\
    .withColumn('precent', group_2015['count'] / data2015.count() * 100)

#   将计算结果存入ES
result_2017.selectExpr("Grade as grade", "count", "precent")\
    .write.format("org.elasticsearch.spark.sql")\
    .option("es.nodes", "192.168.85.128:9200")\
    .mode("overwrite")\
    .save("beijing_2017/pm")

result_2016.selectExpr("Grade as grade", "count", "precent")\
    .write.format("org.elasticsearch.spark.sql")\
    .option("es.nodes", "192.168.85.128:9200")\
    .mode("overwrite")\
    .save("beijing_2016/pm")

result_2015.selectExpr("Grade as grade", "count", "precent")\
    .write.format("org.elasticsearch.spark.sql")\
    .option("es.nodes", "192.168.85.128:9200")\
    .mode("overwrite")\
    .save("beijing_2015/pm")

spark.stop()

使用Azkaban调度流程

#weather.job

type=command
command=sh ./wea.sh

#wea.sh

/home/hadoop/app/spark-2.3.0-bin-2.6.0/bin/spark-submit --master local[2] --jars ./elasticsearch-spark-20_2.11-6.3.0.jar ./weather.py

将代码文件weather.py和依赖jar包一同打包至weather.zip

image-20210603191633395.png

image-20210603191759716.png

通过Kibana进行可视化展示

image-20210603192235508.png

image-20210603192310052.png