Illustrating big data covid-19 case analysis of new crown pneumonia epidemic data using spark

Author: Han Xinzi@ShowMeAI
Tutorial address: http://www.showmeai.tech/tutorials/84
Article address: http://www.showmeai.tech/article-detail/176
Notice: All Rights Reserved. Please contact the platform and the author for reprint and indicate the source

introduction

2020, since covid-19 has changed the world and affects everyone's life, this case combines data analysis technology with pyspark to analyze the data of the 2020 new crown pneumonia epidemic in 2020.

1. Experimental environment

  • (1)Linux: Ubuntu 16.04
  • (2)Hadoop3.1.3
  • (3)Python: 3.8
  • (4)Spark: 2.4.0
  • (5)Jupyter Notebook

2. Data set

1) Dataset Download

The covid-19 case data set used in this case is from the Kaggle new platform for the United States, the new crown pneumonia epidemic data set. The data name is us-counties.csv, covid-19, is the csv file, which contains the data of the first case of new crown pneumonia diagnosed in the United States to 2020-05-19.

Data set download (Baidu online disk)
Link: https://pan.baidu.com/s/1YNY2UREm5lXsNkHM3DZFmA
Extraction code: show

The data are as follows:

2) Format conversion

The original data is in csv format. We first do some data format conversion to facilitate spark to read the data and generate RDD or DataFrame. The specific data conversion code is as follows:

import pandas as pd
#.csv->.txt
data = pd.read_csv('/home/hadoop/us-counties.csv')
with open('/home/hadoop/us-counties.txt','a+',encoding='utf-8') as f:
    for line in data.values:
        f.write((str(line[0])+'\t'+str(line[1])+'\t'
                +str(line[2])+'\t'+str(line[3])+'\t'+str(line[4])+'\n'))

3) Data upload to HDFS

Then upload "/ home / Hadoop / US counts. TXT" to the HDFS file system. The specific path is "/ user / Hadoop / US counts. TXT". The operation commands are as follows:

./bin/hdfs dfs -put /home/hadoop/us-counties.txt /user/hadoop

3. Use Spark to analyze the data

Here, Python is used as the programming language, combined with pyspark for data analysis.

1) Data reading and DataFrame construction

First, we read the data file and generate Spark DataFrame.
The data used in this case is structured data, so you can use spark to read the source file and generate DataFrame to facilitate subsequent analysis and implementation.

from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func

def toDate(inputStr):
    newStr = ""
    if len(inputStr) == 8:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7]
        newStr = s1+"-"+"0"+s2+"-"+"0"+s3
    else:
        s1 = inputStr[0:4]
        s2 = inputStr[5:6]
        s3 = inputStr[7:]
        newStr = s1+"-"+"0"+s2+"-"+s3
    date = datetime.strptime(newStr, "%Y-%m-%d")
    return date


#Main program:
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

fields = [StructField("date", DateType(),False),StructField("county", StringType(),False),StructField("state", StringType(),False),
                    StructField("cases", IntegerType(),False),StructField("deaths", IntegerType(),False),]
schema = StructType(fields)

rdd0 = spark.sparkContext.textFile("/user/hadoop/us-counties.txt")
rdd1 = rdd0.map(lambda x:x.split("\t")).map(lambda p: Row(toDate(p[0]),p[1],p[2],int(p[3]),int(p[4])))

shemaUsInfo = spark.createDataFrame(rdd1,schema)

shemaUsInfo.createOrReplaceTempView("usInfo")

2) Data analysis

This case mainly carries out the following statistical analysis. The objectives and methods of the analysis are as follows:

(1) Statistics of the cumulative number of confirmed cases and cumulative deaths in the United States as of the end of each day.

  • Take date as the grouping field and summarize the cases and deaths fields.

(2) Count the number of new diagnoses in the United States every day.

  • Because "new number = today's number - yesterday's number", self connection is used here, and the connection condition is T1 date = t2. Date + 1, then use T1 totalCases – t2.totalCases calculates the number of new items added on that day.

(3) Count the number of new diagnoses and new deaths in the United States every day.

  • Because "new number = today's number - yesterday's number", self connection is used here, and the connection condition is T1 date = t2. Date + 1, then use T1 totalCases – t2.totalCases calculates the number of new items added on that day.

(4) Statistics show that the cumulative number of confirmed cases and deaths in U.S. states as of May 19.

  • First, filter out the data of May 19, and then take state as the grouping field to summarize and count the cases and deaths fields.

(5) According to statistics, the top ten states in the United States had the highest number of confirmed cases as of May 19.

  • Register the temporary table for the result DataFrame of 3), and then arrange it in descending order according to the number of confirmed cases, and take the top 10 states.

(6) According to statistics, as of May 19, the ten states with the highest death toll in the United States.

  • Register the temporary table for the result DataFrame of 3), and then arrange it in descending order according to the number of deaths, and take the top 10 states.

(7) According to statistics, as of May 19, the ten states with the lowest number of confirmed cases in the United States.

  • Register the temporary table for the result DataFrame of 3), and then arrange it in ascending order according to the number of confirmed cases, and take the top 10 states.

(8) According to statistics, as of May 19, the ten states with the lowest death toll in the United States.

  • Register the temporary table for the result DataFrame of 3), and then arrange it in ascending order according to the number of deaths, and take the top 10 states.

(9) The mortality of the United States and states was counted as of May 19.

  • Case fatality rate = number of deaths / number of confirmed cases, register the temporary table for the result DataFrame of 3), and then calculate according to the formula.

We will make statistical analysis based on Spark DataFrame and Spark sql.

# 1. Calculate the cumulative number of confirmed cases and deaths per day
df = shemaUsInfo.groupBy("date").agg(func.sum("cases"),func.sum("deaths")).sort(shemaUsInfo["date"].asc())

# Column rename
df1 = df.withColumnRenamed("sum(cases)","cases").withColumnRenamed("sum(deaths)","deaths")
df1.repartition(1).write.json("result1.json")                               #Write hdfs

# Register as a temporary table for the next step
df1.createOrReplaceTempView("ustotal")

# 2. Calculate the number of newly confirmed cases and deaths per day compared with yesterday
df2 = spark.sql("select t1.date,t1.cases-t2.cases as caseIncrease,t1.deaths-t2.deaths as deathIncrease from ustotal t1,ustotal t2 where t1.date = date_add(t2.date,1)")

df2.sort(df2["date"].asc()).repartition(1).write.json("result2.json")           #Write hdfs

# 3. Count the cumulative number of confirmed cases and deaths in U.S. states as of May 19
df3 = spark.sql("select date,state,sum(cases) as totalCases,sum(deaths) as totalDeaths,round(sum(deaths)/sum(cases),4) as deathRate from usInfo  where date = to_date('2020-05-19','yyyy-MM-dd') group by date,state")

df3.sort(df3["totalCases"].desc()).repartition(1).write.json("result3.json") #Write hdfs

df3.createOrReplaceTempView("eachStateInfo")

# 4. Find out the top 10 states in the United States
df4 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases desc limit 10")
df4.repartition(1).write.json("result4.json")

# 5. Find out the 10 states with the most deaths in the United States
df5 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths desc limit 10")
df5.repartition(1).write.json("result5.json")

# 6. Identify the 10 states with the least confirmed cases in the United States
df6 = spark.sql("select date,state,totalCases from eachStateInfo  order by totalCases asc limit 10")
df6.repartition(1).write.json("result6.json")

# 7. Identify the 10 states with the least deaths in the United States
df7 = spark.sql("select date,state,totalDeaths from eachStateInfo  order by totalDeaths asc limit 10")
df7.repartition(1).write.json("result7.json")

# 8. The mortality rate of the United States and states was calculated as of 5.19
df8 = spark.sql("select 1 as sign,date,'USA' as state,round(sum(totalDeaths)/sum(totalCases),4) as deathRate from eachStateInfo group by date union select 2 as sign,date,state,deathRate from eachStateInfo").cache()
df8.sort(df8["sign"].asc(),df8["deathRate"].desc()).repartition(1).write.json("result8.json")

3) Result file

Save the above Spark calculation results json file to facilitate subsequent visual processing. Since it is inconvenient to read the HDFS file system with Python, dump the result file on HDFS to the local file system with the following command:

./bin/hdfs dfs -get /user/hadoop/result1.json/*.json /home/hadoop/result/result1
...

For result2 and other result files, use the same command and just change the path. The download process is shown in the figure below:

4. Data visualization

1) Visualizer selection and code

Choose to use the python third-party library pyecharts as the visualization tool.

Before use, you need to install pyecarts. The installation code is as follows:

pip install pyecharts

The specific visualization code is as follows:

from pyecharts import options as opts
from pyecharts.charts import Bar
from pyecharts.charts import Line
from pyecharts.components import Table
from pyecharts.charts import WordCloud
from pyecharts.charts import Pie
from pyecharts.charts import Funnel
from pyecharts.charts import Scatter
from pyecharts.charts import PictorialBar
from pyecharts.options import ComponentTitleOpts
from pyecharts.globals import SymbolType
import json

Cumulative confirmed cases and deaths per day → double histogram

#1. Draw the cumulative number of confirmed cases and deaths per day → double histogram

def drawChart_1(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['cases']))
            deaths.append(int(js['deaths']))

    d = (
    Bar()
    .add_xaxis(date)
    .add_yaxis("Cumulative number of confirmed cases", cases, stack="stack1")
    .add_yaxis("Cumulative deaths", deaths, stack="stack1")
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(title_opts=opts.TitleOpts(title="Cumulative number of confirmed cases and deaths per day in the United States"))
    .render("/home/hadoop/result/result1/result1.html")
    )

Number of newly confirmed cases and deaths per day → line chart

#2. Draw the daily number of newly confirmed cases and deaths → broken line chart
def drawChart_2(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    date = []
    cases = []
    deaths = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            date.append(str(js['date']))
            cases.append(int(js['caseIncrease']))
            deaths.append(int(js['deathIncrease']))

    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="New diagnosis",
        y_axis=cases,
        markpoint_opts=opts.MarkPointOpts(
            data=[
                opts.MarkPointItem(type_="max", name="Maximum")

            ]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[opts.MarkLineItem(type_="average", name="average value")]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="Us daily new diagnostic broken line chart", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result1.html")
    )
    (
    Line(init_opts=opts.InitOpts(width="1600px", height="800px"))
    .add_xaxis(xaxis_data=date)
    .add_yaxis(
        series_name="New deaths",
        y_axis=deaths,
        markpoint_opts=opts.MarkPointOpts(
            data=[opts.MarkPointItem(type_="max", name="Maximum")]
        ),
        markline_opts=opts.MarkLineOpts(
            data=[
                opts.MarkLineItem(type_="average", name="average value"),
                opts.MarkLineItem(symbol="none", x="90%", y="max"),
                opts.MarkLineItem(symbol="circle", type_="max", name="the peak"),
            ]
        ),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="New daily death line chart in the United States", subtitle=""),
        tooltip_opts=opts.TooltipOpts(trigger="axis"),
        toolbox_opts=opts.ToolboxOpts(is_show=True),
        xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
    )
    .render("/home/hadoop/result/result2/result2.html")
    )

Cumulative number of confirmed cases, deaths and case fatalities by States as of May 19 - > table

#3. Draw a table showing the cumulative number of confirmed cases, deaths and case fatality rate -- > by 5.19
def drawChart_3(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    allState = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            row = []
            row.append(str(js['state']))
            row.append(int(js['totalCases']))
            row.append(int(js['totalDeaths']))
            row.append(float(js['deathRate']))
            allState.append(row)

    table = Table()

    headers = ["State name", "Total cases", "Total deaths", "Death rate"]
    rows = allState
    table.add(headers, rows)
    table.set_global_opts(
        title_opts=ComponentTitleOpts(title="List of epidemic situations in various states of the United States", subtitle="")
    )
    table.render("/home/hadoop/result/result3/result1.html")

10 states with the most confirmed cases in the United States → word cloud map

#4. Draw the top 10 states in the United States → word cloud map
def drawChart_4(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (
    WordCloud()
    .add("", data, word_size_range=[20, 100], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="Confirmed by states in the United States Top10"))
    .render("/home/hadoop/result/result4/result1.html")
    )

10 states with the most deaths in the United States → histogram

#5. Draw the 10 states with the most deaths in the United States → histogram
def drawChart_5(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    state = []
    totalDeath = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            state.insert(0,str(js['state']))
            totalDeath.insert(0,int(js['totalDeaths']))

    c = (
    PictorialBar()
    .add_xaxis(state)
    .add_yaxis(
        "",
        totalDeath,
        label_opts=opts.LabelOpts(is_show=False),
        symbol_size=18,
        symbol_repeat="fixed",
        symbol_offset=[0, 0],
        is_symbol_clip=True,
        symbol=SymbolType.ROUND_RECT,
    )
    .reversal_axis()
    .set_global_opts(
        title_opts=opts.TitleOpts(title="PictorialBar-Death toll in US states Top10"),
        xaxis_opts=opts.AxisOpts(is_show=False),
        yaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_show=False),
            axisline_opts=opts.AxisLineOpts(
                linestyle_opts=opts.LineStyleOpts(opacity=0)
            ),
        ),
    )
    .render("/home/hadoop/result/result5/result1.html")
    )

Find out the 10 states with the least diagnosis in the United States → word cloud map

#6. Find out the 10 states with the least diagnosis in the United States → word cloud map
def drawChart_6(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            row=(str(js['state']),int(js['totalCases']))
            data.append(row)

    c = (
    WordCloud()
    .add("", data, word_size_range=[100, 20], shape=SymbolType.DIAMOND)
    .set_global_opts(title_opts=opts.TitleOpts(title="10 states with the least confirmed cases in the United States"))
    .render("/home/hadoop/result/result6/result1.html")
    )

10 states with the least deaths in the United States → funnel chart

#7. Find out the 10 states with the least deaths in the United States → funnel chart
def drawChart_7(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    data = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            data.insert(0,[str(js['state']),int(js['totalDeaths'])])

    c = (
    Funnel()
    .add(
        "State",
        data,
        sort_="ascending",
        label_opts=opts.LabelOpts(position="inside"),
    )
    .set_global_opts(title_opts=opts.TitleOpts(title=""))
    .render("/home/hadoop/result/result7/result1.html")
    )

Case fatality rate in the United States - > pie chart

#8. Mortality in the United States -- > pie chart
def drawChart_8(index):
    root = "/home/hadoop/result/result" + str(index) +"/part-00000.json"
    values = []
    with open(root, 'r') as f:
        while True:
            line = f.readline()
            if not line:                            # To EOF and return an empty string, the loop is terminated
                break
            js = json.loads(line)
            if str(js['state'])=="USA":
                values.append(["Death(%)",round(float(js['deathRate'])*100,2)])
                values.append(["No-Death(%)",100-round(float(js['deathRate'])*100,2)])
    c = (
    Pie()
    .add("", values)
    .set_colors(["blcak","orange"])
    .set_global_opts(title_opts=opts.TitleOpts(title="Mortality in the United States"))
    .set_series_opts(label_opts=opts.LabelOpts(formatter="{b}: {c}"))
    .render("/home/hadoop/result/result8/result1.html")
    )

#visualization
index = 1
while index<9:
    funcStr = "drawChart_" + str(index)
    eval(funcStr)(index)
    index+=1

2) Result icon display

The visualization result is In html format, the saving path of the result display diagram of result1 is "/ home/hadoop/result/result1/result1.html", and the saving path of the result display diagram of result2 is "/ home/hadoop/result/result2/result1.html". The rest are similar to recursion. The specific screenshot is as follows:

(1) Cumulative number of confirmed cases and deaths per day in the United States → double histogram

(2) Daily number of newly confirmed cases in the United States → line chart

(3) Daily new deaths in the United States → line chart

(4) By the end of May 19, the cumulative number of confirmed cases, deaths and case fatality rates in various states of the United States → table

(5) As of May 19, the top 10 states in the United States → word cloud map

(6) As of May 19, the top 10 states in the United States with cumulative deaths → histogram

(7) As of May 19, the 10 states with the lowest cumulative number of confirmed cases in the United States → word cloud map

(8) As of May 19, the 10 states with the lowest cumulative death toll in the United States → funnel chart

(9) Mortality in the United States as of 5.19 → pie chart

5. References

ShowMeAI related articles recommended

ShowMeAI series tutorial recommendations

Keywords: Big Data Spark Data Analysis

Added by subwayman on Tue, 08 Mar 2022 16:24:06 +0200