运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

· 更多精彩内容,请下载阅读全本《Elastic Stack实战手册》

· 加入创作人行列,一起交流碰撞,参与技术圈年度盛事吧

创作人:刘晓国

目前 COVID-19 有很多的地方发表数据。

在本文中,我们使用 Elastic Stack 来分析相关的数据,并对数据进行分析。我们将使用 Kibana 来对所有的数据进行可视化分析。

特别指出的是:我们将使用 Elasticsearch 的 processors 来对我们的数据进行解析。

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

数据来源

我们将使用在地址:https://www.datawrapper.de/_/Gnfyw/下载我们想要的数据。我们点击链接 Get the data 来下载我们想要的数据。

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们下载后的数据就像是下面的:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

上面的数据是一个 csv 格式的文件。我们把下载后的数据存入到一个我们喜欢的目录中,并命令为 covid19.csv 。针对 CSV 格式的数据导入,我们可以采用 Logstash 来把它导入到 Elasticsearch 中。具体如何操作,我们可以参照以下文章:

  • Logstash:导入 zipcode CSV 文件和 Geo Search 体验

https://blog.csdn.net/UbuntuTouch/article/details/100606857

  • Logstash: 应用实践 - 装载 CSV 文档到 Elasticsearch

https://blog.csdn.net/UbuntuTouch/article/details/100606857

当然我们也可以直接通过 Kibana 提供的便利,直接把数据导入到 Elasticsearch 中:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

针对我们今天的这个 COVID-19 数据,我们将使用 Filebeat 来对它处理。

安装

  • 安装 Elasticsearch 及 Kibana
  • 安装 Filebeat
  • 打开 Kibana

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

虽然 csv 部署 log 范畴,但是在上面的 logs 里含有针对各个操作系统的 filebeat 的安装指南。我们点击上面的 Add log data 按钮:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们点击 System logs:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

在上面我们选择我们的操作系统,并按照上面的安装指令来完成和我们 Elasticsearch 版本相匹配的 Filbeat 的安装。我们可以不启动相应的模块,只做相应的安装即可。

配置 Filebeat 及导入数据到 Elasticsearch

为了能够把我们的数据导入到 Elasticsearch 中,我们可以采用 Filebeat。为此,我们必须来配置 filebeat。我们首先来创建一个定制的 filebeat_covid19.yml 配置文件,并把这个文件存于和我们上面的 covid19.csv 同一个目录中:

filebeat_covid19.yml

filebeat.inputs:
- type: log
  paths:
    - /Users/liuxg/data/covid19/covid19.csv
  exclude_lines: ['^Lat']
 
output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: covid19
 
setup.ilm.enabled: false
setup.template.name: covid19
setup.template.pattern: covid19

在上面我们定义了一个 type 为 log 的 filebeat.inputs。我们定了我们的 log 的路径。你需要根据自己的实际路径来修改上面的路径。

我们定义了数据的 index 为 covid19 的索引。值得注意的是,由于 csv 文件的第一行是数据的 header,我们需要去掉这一行。为此,我们采用了 exclude_lines: ['^Lat'] 来去掉第一行。

等我们定义好上面的配置后,我们运行如下的命令:

./filebeat -e -c ~/data/covid19/filebeat_covid19.yml

上面的命令将把我们的数据导入到 Elasticsearch 中。

Filebeat 的 registry 文件存储 Filebeat 用于跟踪上次读取位置的状态和位置信息。如果由于某种原因,我们想重复对这个 csv 文件的处理,我们可以删除如下的目录:

  • data/registry 针对 .tar.gz and .tgz 归档文件安装
  • /var/lib/filebeat/registry 针对 DEB 及 RPM 安装包
  • c:\ProgramData\filebeat\registry 针对 Windows zip 文件

如果上面的命令成功后,我们可以在 Kibana 中查看新生产的 covid19 索引:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

如果可以对数据进行查询:

GET covid19/_search

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

在上面,它显示了 message 字段。

显然这个数据是最原始的数据,它并不能让我们很方便地对这个数据进行分析。那么我们该如何处理呢?我们可以通过 Elasticsearch 的 ingest node 提供的强大的 processors 来帮我们处理这个数据。

利用 Processors 来加工数据

去掉无用的字段

在我们的文档里,我们可以看到有很多我们并不想要的字段,比如 ecs, host,log 等等。我们想把这些字段去掉,那么我们该如何做呢?我们可以通过定义一 个 pipleline 来帮我们处理。为此,我们定义一个如下的 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    }
  ]
}

上面的 pipeline 定义了一个叫做 remove 的 processor。它检查 log,input, ecs, host 及 agent 都不为空的情况下,删除字段 log, input,ecs, host 及 agent。我们在 Kibana 中执行上面的命令。

为了能够使得我们的 pipleline 起作用,我们通过如下指令来执行:

POST covid19/_update_by_query?pipeline=covid19_parser

当我们执行完上面的指令后,我们重新查看我们的文档:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

在上面我们可以看出来,所有的我们不想要的字段都已经被去掉了

替换引号

我们可以看到导入的 message 数据为:

"""37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,221,0,0"""

显然,这里的数据有很多的引号"字符,我们想把这些字符替换为符号'。为此,我们需要 gsub processors 来帮我们处理。我重新修改我们的 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    }    
  ]
}

在 Kibana 中运行上面的指令,并同时执行:

POST covid19/_update_by_query?pipeline=covid19_parser

经过上面的 pipeline 的处理后,我们重新来查看我们的文档:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

从上面的显示中,我们也看出来我们已经成功都去掉了引号。我们的 message 的信息如下:

"37.1232245,-78.4927721,'Virginia, US',Virginia,',',US,221,0,0"

解析信息

在上面我们已经很成功地把我们的信息转换为我们所希望的数据类型。接下来我们来使用 grok 来解析我们的数据。grok 的数据解析,基本上是一种正则解析的方法。我们首先使用Kibana所提供的 Grok Debugger 来帮助我们分析数据。我们将使用如下的 grok pattern 来解析我们的message:

%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们点击 Grok Debugger,并把我们的相应的文档拷入到相应的输入框中,并用上面的 grok pattern 来解析数据。上面显示,它可以帮我们成功地解析我们想要的数据。显然这个被解析的信息更适合我们做数据的分析。为此,我们需要重新修改 pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    }        
  ]
}

我们运行上面的 pipeline,并使用如下的命令来重新对数据进行分析:

POST covid19/_update_by_query?pipeline=covid19_parser

我们重新来查看文档:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

在上面我们可以看到新增加的 country,infected,address 等等的字段。

添加location字段

在上面我们可以看到 lon 及 lat 字段。这些字段是文档的经纬度信息。这些信息并不能为我们所使用,因为首先他们是分散的,并不处于一个通过叫做 location 的字段中。为此,我们需要创建一个新的 location 字段。为此我们更新 pipeline 为:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    },
    {
      "set": {
        "field": "location.lat",
        "value": "{{lat}}"
      }
    },
    {
      "set": {
        "field": "location.lon",
        "value": "{{lon}}"
      }
    }              
  ]
}

在上面我们设置了一个叫做 location.lat 及 location.lon 的两个字段。它们的值分别是 {{lat}} 及 {{lon}}。我们执行上面的命令。

由于 location 是一个新增加的字段,在默认的情况下,它的两个字段都会被 Elasticsearch 设置为 text 的类型。为了能够让我们的数据在地图中进行显示,它必须是一个 geo_point 的数据类型。为此,我们必须通过如下命令来设置它的数据类型:

PUT covid19/_mapping
{
  "properties": {
    "location": {
      "type": "geo_point"
    }
  }
}

执行上面的指令,我们再使用如下的命令来对我们的数据重新进行处理:

POST covid19/_update_by_query?pipeline=covid19_parser

等执行完上面的命令后,我们重新来查看我们的文档:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

从上面我们可以看到一个叫做 location 的新字段。它含有 lon 及 lat 两个字段。我们同时也可以查看 covid19 的 Mapping。

GET covid19/_mapping

我们可以发现 Location 的数据类型为:

 "location" : {
     "type" : "geo_point"
  }

它显示 location 的数据类型是对的。

到目前为止,我们已经成功地把数据导入到 Elasticsearch 中。我们接下来针对 covid19 来进行数据分析。

展示并分析数据

创建 index pattern

为了对 covid19 进行分析,我们必须创建一个 index pattern:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

点击上面的 Create index pattern 按钮:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

点击 Next step:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

点击 Create index pattern:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

分析数据

创建 Maps visualization

我们打开 Visualization:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

点击 Create visualization:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

选择 Maps:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

选择 Documents:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们接着定制这个 Map:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

这样颜色越深,代表感染的越严重。点击 Save & close:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们把上面的 Visualization 保存为 covid-1。

找出感染和死亡最多的国家

同样地,我们创建一个叫做 Horizontal Bar 的

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

点击 Horizontal Bar:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

选择 covid-19:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们保存该 Visualization 为 covid-2。

同样地,我们得到死亡最多的前十个国家:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们保存该 Visualization 为 covid-3。

找出感染人数最多的地区

我们可以选择一个 pie 统计:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们保存上面的 Visualization 为 covid-3。

建立一个表格

我们最早得到的表格也是非常不错的。我们想把之前的那个表格的内容也放到我们的Dashboard 里,这样,我们可以在 Dashboard 里进行搜索。我们首先点击 Discover:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们选中我们的 Index pattern covid19*。同时在左下方选择我们想要的字段,并点击 add。我们添加 city, country, death, infected 等字段:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们保存这个表格为 covid-4。

创建 Dashboard

根据之前的 Visualization,我们来创建一个 Dashboard:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

我们把之前创建的 Visualization 一个一个地添加进来,并形成我们最终的 Dashboard:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

在上面图中,如果我们点击 China,那么所有的图形将变为:

运用 Elastic Stack 分析COVID-19数据—Elastic Stack 实战手册

从上面我们可以看出来整个表格都发送了变化,而且我们的饼状图也发生了相应的变化。

参考:


上一篇:【全观测系列】Elasticsearch应用性能监控最佳实践


下一篇:行在说 | 从阿里巴巴大数据之路看企业中台战略