使用Fluentd读写OSS

前言

Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。

安装

首先下载并安装td-agent,笔者使用的是td-agent-3.3.0-1.el7.x86_64.rpm,使用rpm命令安装:

[root@apache ~]# rpm -ivh td-agent-3.3.0-1.el7.x86_64.rpm

然后,需要安装Fluentd的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem install fluent-plugin-aliyun-oss

这里请注意,因为我们使用的是td-agent,安装Fluentd plugin时需要使用td-agent的td-agent-gem(/usr/sbin/td-agent-gem)。原因是td-agent有自己的Ruby,你需要将plugin安装到它的Ruby里面,而不是其他的Ruby,否则将会找不到已经安装好的plugin。

具体可以参见Fluentd的官方文档

安装完成后,我们可以查看安装的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem list fluent-plugin-aliyun-oss

*** LOCAL GEMS ***

fluent-plugin-aliyun-oss (0.0.1)

fluent-plugin-aliyun-oss这个plugin包含两部分:

Fluent OSS output plugin
将数据缓存在本地,达到设定的条件后,将缓存的数据(压缩后,如果设置的话)上传到OSS。

Fluent OSS input plugin
首先,OSS的bucket需要配置事件通知,这篇文章介绍了如何设置,得到MNS的Queue与Endpoint。
设置好之后,这个plugin会定时地从MNS拉取消息,从消息中获取上传的Objects,最后从OSS读取这些Objects,再发往下游。

下面将分别介绍如何配置,具体的配置参数说明参见github: https://github.com/aliyun/fluent-plugin-oss

配置(向OSS写数据)

下面是一个例子,将读到的数据,每分钟一个文件写到OSS中。其中endpoint/bucket/access_key_id/access_key_secret是必填项,其他的都是可选项。

<system>
  workers 6
</system>

<match input.*>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  upload_crc_enable false
  path "fluent-oss/logs"
  auto_create_bucket true
  key_format "%{path}/%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}"
  #key_format %{path}/events/ts=%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}
  time_slice_format %Y%m%d-%H
  store_as gzip
  <buffer tag,time>
    @type file
    path /var/log/fluent/oss/${ENV['SERVERENGINE_WORKER_ID']}
    timekey 60 # 1 min partition
    timekey_wait 1s
    # timekey_use_utc true
    flush_thread_count 1
  </buffer>
  <format>
    @type json
  </format>
</match>

我们可以从OSS控制台看到效果:
使用Fluentd读写OSS

配置(从OSS读数据)

下面是一个配置示例,其中endpoint/bucket/access_key_id/access_key_secret和MNS的endpoint/queue是必填项,其他的都是可选项。

<source>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  store_local false
  store_as gzip
  flush_batch_lines 800
  flush_pause_milliseconds 1

  download_crc_enable false
  <mns>
    endpoint <MNS endpoint to connect to, E.g.,{account-id}.mns.cn-zhangjiakou-internal.aliyuncs.com>
    queue <MNS queue>
    poll_interval_seconds 1
  </mns>
  <parse>
    @type json
  </parse>
</source>

我们可以从log中看一下运行状态

2019-04-23 15:38:14 +0800 [info]: #5 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:14 +0800 [info]: #5 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:14 +0800 [info]: #5 read object fluent-oss/logs/20190423-12/events_10_70226640160100.gz, size 4389548 from OSS
2019-04-23 15:38:15 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages?ReceiptHandle=0BC1EA4E51483D4EAC69736941044AAE-MjY5ODkgMTU1NjAwNTAwODMwNSAzNjAwMDA
2019-04-23 15:38:16 +0800 [info]: #1 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:16 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:16 +0800 [info]: #1 read object fluent-oss/logs/20190423-09/events_50_69939581261780.gz, size 6750045 from OSS

参考资料

https://github.com/aliyun/fluent-plugin-oss
https://rubygems.org/gems/fluent-plugin-aliyun-oss
https://www.fluentd.org/
https://docs.fluentd.org/v1.0/articles/quickstart

上一篇:如何将Elasticsearch的快照备份至OSS


下一篇:[家里蹲大学数学杂志]第187期实数集到非负实数集的双射有无穷多个间断点