使用Apache Beam实现实时监控统计

网站使用Apache服务器,对于请求网站资源的事件被记录到日志中,我们需要基于该日志文件的数据进行实时监控统计,通过读取IP库数据可以得到每个访问IP的所属地域(国内按城市,国外按国家,IP库中没有的按未知处理)。其实,整个流程我们可以通过Flume收集聚合多个子站日志文件数据,并写入到下游的Kafka消息中间件集群中,然后可以直接从Kafka中进行消费,实现实时监控统计,最后结果更新到Redis中去。
为了简单,我们这里只是通过输入的日志文件作为数据源,下游直接通过Apache Beam来进行实时分析处理,结果输出到多个按时间分组的文件中。我们实现的实时监控功能目标,如下所示:

  • 输入事件日志文件,以及IP库文件;
  • 基于日志文件中的事件时间,每间隔5分钟输出一个统计文件,结果文件中包含“地域”和“访问次数”。

下面是文件格式示例。
事件日志文件的格式,示例如下所示:

113.246.155.26 - - [10/Dec/2017:01:03:28 +0800] "GET /wp-content/themes/media-maven/library/images/bg.jpg HTTP/1.1" 200 8113 "http://shiyanjun.cn/archives/1097.html" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:57.0) Gecko/20100101 Firefox/57.0" qxu1780320105.my3w.com image/jpeg "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/images/bg.jpg" 1365
119.4.252.140 - - [10/Dec/2017:01:03:29 +0800] "GET /wp-content/themes/media-maven/library/css/default.css HTTP/1.1" 200 4543 "http://shiyanjun.cn/archives/325.html" "Mozilla/5.0 (Linux; Android 5.0.2; HTC X9u Build/LRX22G) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.98 Mobile Safari/537.36" qxu1780320105.my3w.com text/css "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/css/default.css" 1759
42.197.51.105 - - [10/Dec/2017:01:08:06 +0800] "GET /wp-content/themes/media-maven/library/css/default.css HTTP/1.1" 200 4543 "http://shiyanjun.cn/archives/1075.html" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36" qxu1780320105.my3w.com text/css "/usr/home/qxu1780320105/htdocs/wp-content/themes/media-maven/library/css/default.css" 2452

IP库文件的格式,示例如下所示:

1.119.132.165    北京
1.14.146.248    上海
1.192.241.74    郑州
14.116.140.15    广州
14.116.140.17    广州
94.180.155.98    俄罗斯
95.46.246.36    捷克
97.107.132.213    美国

设计思路

这里,我们基于Apache Beam 2.1.0来实现。现在,我们来分析一下设计的要点:

  • 记录按时间分组

每隔5分钟输出一个统计文件,这里需要用到Apache Beam中的固定时间窗口(Fixed Time Window)。

  • IP库数据文件加载共享

输入指定了一个静态的IP库文件,所以需要给ParDo设置一个Side Input,将IP库数据加载到Pipeline中,为处理每个读取到的日志文件中的事件记录数据,提供IP到地域的关系映射,从而汇总得到每个地域对应的访问次数。

  • 输出统计结果分组

每隔5分钟输出一个统计文件,那么需要在输出的时候控制文件的生成,这里需要实现一个PTransform,通过读取到每个Window的开始时间(InternalWindow.start())和结束时间(InternalWindow.end()),并通过拼接开始时间和结束时间字符串来命名输出文件。

编程实现

创建PiplineOptions配置

首先需要创建一个PipelineOptions,设置我们需要的输入文件、Window相关参数等配置,实现代码如下所示:

interface WindowingOptions extends PipelineOptions {

    @Description("Path of the IP library file to read from.")
    String getIpFile();
    void setIpFile(String ipFile);

    @Description("Path of the event log file to read from.")
    String getEventFile();
    void setEventFile(String eventFile);

    @Description("Fixed window duration, in seconds.")
    @Default.Integer(5)
    Integer getWindowSizeSecs();
    void setWindowSizeSecs(Integer value);

    @Description("Fixed number of shards to produce per window.")
    @Default.Integer(1)
    Integer getNumShards();
    void setNumShards(Integer numShards);

    @Description("Directory of the output to write to.")
    String getOutputDir();
    void setOutputDir(String outputDir);

    @Description("Prefix of the output file prefix.")
    @Default.String("result")
    String getOutputFilePrefix();
    void setOutputFilePrefix(String outputFilePrefix);
}

上面代码中,主要指定了如下配置选项:

  • IP库文件路径
  • 事件日志文件路径
  • 固定时间窗口(Fixed Time Window)的时间长度(Duration),默认是5秒
  • 每个Window输出的分区文件个数,默认是1个
  • 输出目录字符串
  • 输出文件名称前缀字符串,默认前缀是result

在Apache Beam中,通过解析输入参数,获取到配置参数值,代码如下所示:

WindowingOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(WindowingOptions.class);

String ipFile = options.getIpFile();
String eventFile = options.getEventFile();
String output = new File(options.getOutputDir(),
        options.getOutputFilePrefix()).getAbsolutePath();

PipelineOptionsFactory通过反射机制,创建一个WindowingOptions对象,然后就可以获取到输入的参数值,方便在Pipeline配置过程中使用。

实现IP库的Side Input处理

将IP库文件读取出来,通过Map结构来保存对应的IP到地域的关系映射,这样能够在处理每个日志事件的过程中,直接将从事件记录中解析出来IP转换为地域名称,为后续统计做准备。
在Apache Beam中,首先需要创建一个PCollectionView,作为ParDo的Side Input传入到Pipeline中,代码如下所示:

final PCollectionView<Map<String, String>> ipToAreaMapView =
        pipeline.apply(TextIO.read().from(ipFile))
                .apply(ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String[] ipAreaPair = c.element().split("\t");
                if(ipAreaPair.length == 2) {
                    c.output(KV.of(ipAreaPair[0], ipAreaPair[1]));
                }
            }
        })).apply(View.<String, String>asMap());

然后,就可以在处理日志事件的时候读取到ipToAreaMapView中对应的Map结构的数据,代码如下所示:

// read input event source
PCollection<String> events = pipeline.apply(TextIO.read().from(eventFile))
        .apply(ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String event = c.element();
                    try {
                        String[] a = event.split("\\s+");
                        String ip = a[0];
                        String time = a[3].substring(1);

                        long ts = parseToTimestamp(time);
                        Instant instant = new Instant(ts);
                        String area = c.sideInput(ipToAreaMapView).get(ip);
                        area = (area == null ? "未知地域" : area);
                        c.outputWithTimestamp(area, instant);
                    } catch (ParseException e) {
                        // ignore it
                    }
                }
            }).withSideInputs(ipToAreaMapView)
        );

上面代码,我们解析出日志行中的时间戳,在调用ProcessContext.outputWithTimestamp()方法时指定为事件时间,供后续配置Window函数使用。上述代码通过ProcessContext输出每一个处理过的记录,包含地域、时间戳信息,后面可以根据这两个字段信息对进行分组统计。

配置Window函数

现在,我们需要为上面准备好的事件记录PCollection对象events,配置Window函数,控制输入数据元素的分组逻辑,使用Apache Beam内建实现的固定时间窗口函数即可,代码如下所示:

// configure windowing settings
PCollection<String> windowedEvents =
        events.apply(
                Window.<String>into(FixedWindows.of(
                        Duration.standardSeconds(options.getWindowSizeSecs()))));

在名称为events的PCollection对象上进行配置,通过Window.into()方法,设置一个WindowFn函数,返回一个作用于该PCollection对象的PTransform,对该数据集中数据元素进行分组处理。只有配置的Window函数,在上面代码中得到的windowedEvents之上添加PTransform,才会对每个Window中的数据进行操作。
配置好了Window函数的PCollection对象windowedEvents,对其执行按地域分组汇总操作,代码如下所示:

// count by (window, area)
PCollection<KV<String, Long>> areaCounts =
        windowedEvents.apply(Count.<String>perElement());

这里,会根据我们设置的固定时间窗口的时间长度,分割为多个Window,这些信息是不暴露给Bean应用开发人员的。我们只需要知道前面输出的是每条访问网站事件记录信息的地域名称即可,这里在实际执行中,会以(Window, 地域)作为Key进行分组汇总,Window可能是[12:00, 12:05)这种形式。汇总得到的结果,是每个地域对应一个访问次数,结果类似于一个三元组:([12:00, 12:05), 地域, 次数),也就是汇总每个Window中具有相同的地域记录的数量。

输出统计结果

输出最终的统计结果,实现代码如下所示:

// control to output final result
final PTransform<PCollection<String>, PDone> writer =
        new PerWindowOneFileWriter(output, options.getNumShards());

// format & output windowed result
areaCounts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
            @Override
            public String apply(KV<String, Long> input) {
                return input.getKey() + "\t" + input.getValue();
            }
        })).apply(writer);

这里,先是创建了一个PerWindowOneFileWriter,它是一个PTransform,在最后输出时使用它,同时对输出数据进行格式化,使用TAB见分隔“地域”和“访问次数”,形成一行记录字符串,输出到文件中,其实是一个Window中数据对应一个输出文件(如果我们设置了WindowingOptions配置对象中的numShards值为1)。
下面,看下实现输出文件的PTransform PerWindowOneFileWriter的实现,代码如下所示:

package org.shirdrn.beam.monitoring;

import com.google.common.base.Verify;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

public class PerWindowOneFileWriter extends PTransform<PCollection<String>, PDone> {

  private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
  private String filenamePrefix;
  private Integer numShards;

  public PerWindowOneFileWriter(String filenamePrefix, Integer numShards) {
    this.filenamePrefix = filenamePrefix;
    this.numShards = numShards;
  }

  @Override
  public PDone expand(PCollection<String> input) {
    String prefix = "";
    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
    if (!resource.isDirectory()) {
      prefix = Verify.verifyNotNull(resource.getFilename(),
          "A non-directory resource should have a non-null filename: %s",
          resource);
    }

    TextIO.Write write = TextIO.write()
        .to(resource.getCurrentDirectory())
        .withFilenamePolicy(new PerWindowFiles(prefix))
        .withWindowedWrites();
    write = write.withNumShards(numShards == null ? 1 : numShards);
    return input.apply(write);
  }

  public static class PerWindowFiles extends FilenamePolicy {

    private final String prefix;

    public PerWindowFiles(String prefix) {
      this.prefix = prefix;
    }

    private String generateFilenamePrefix(IntervalWindow window) {
      return String.format("%s_%s-%s", prefix,
              FORMATTER.print(window.start()),
              FORMATTER.print(window.end()));
    }

    @Override
    public ResourceId windowedFilename(
            ResourceId outputDirectory, WindowedContext context, String extension) {
      IntervalWindow window = (IntervalWindow) context.getWindow();
      int numShards = context.getNumShards();
      String filename;
      String prefix = generateFilenamePrefix(window);
      if(numShards == 1) {
        filename = String.format("%s", prefix);
      } else {
        filename = String.format("%s_%s_%s%s",
                prefix, context.getShardNumber(), context.getNumShards(), extension);
      }
      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
            ResourceId outputDirectory, Context context, String extension) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }
}

上面代码实现,是参考Apache Beam发行包中自带的例子修改而来,最核心的就是控制文件生成的FilenamePolicy的实现,它通过实现windowedFilename()方法,基于WindowedContext获取到Window对应的信息(Window开始时间和结束时间),从而生成带有窗口时间范围的字符串,作为最终输出的文件名称的一部分。

运行程序

运行上述程序,需要指定输入的文件路径以及Apache Beam相关的参数,示例如下所示:
如果我们通过命令行的方式,启动程序运行,可以参考如下的配置:

args = new String[] {
        "—-ipFile=/Users/yanjun/Data/beam/events/ips.txt",
        "—-eventFile=/Users/yanjun/Data/beam/events/apache_event_20171210.log",
        "—-outputDir=/Users/yanjun/Data/beam/events/output/",
        "—-outputFilePrefix=result",
        "—-windowSizeSecs=300",
        "—-numShards=1"
};

这样,就可以获取到对应的参数值,在Pipeline中可以读取并使用。运行配置好的Pipeline,代码如下所示:

// execute beam pipeline
PipelineResult result = pipeline.run();
try {
    result.waitUntilFinish();
} catch (Exception exception) {
    result.cancel();
}

运行程序后,可以看到生成的结果文件,文件名称类似如下的文件列表:

result_00:00-00:05
result_00:05-00:10
result_00:10-00:15
result_00:15-00:20
result_00:20-00:25
result_00:25-00:30
result_00:30-00:35
result_00:35-00:40
result_00:40-00:45
result_00:45-00:50
result_00:50-00:55
result_00:55-01:00
result_01:00-01:05
result_01:05-01:10
result_01:10-01:15

查看其中某个结果文件内容,类似如下所示:

厦门    11
福州    101
武汉    12
成都    210
南京    13
杭州    219
广州    35
美国    16
上海    107
北京    617
未知地域    108

可以看到,文件输出的结果,正是我们希望的。
如果不使用文件输出,那就需要修改最后控制输出的PTransform(PerWindowOneFileWriter)的处理逻辑,将结果数据写入到任何需要的存储系统中,保存以供其他系统使用(如实时动态地显示监控图表)。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>