每天实时处理 50 亿次会话

自从  七个月前我们首次发布 Answers以来,我们对移动社区的广泛采用感到非常兴奋。我们现在看到每天大约有 50 亿次会话,并且还在增长。数以亿计的设备每秒向 Answers 端点发送数百万个事件。在您阅读到此处的这段时间内,Answers 后端将接收并处理大约 10,000,000 个分析事件。

我们面临的挑战是利用这些信息为应用程序开发人员提供对其移动应用程序的可靠、实时和可操作的见解。

在高层次上,我们根据解耦组件、异步通信和优雅服务降级的原则来指导我们的架构决策,以应对灾难性故障。我们利用 Lambda 架构将数据完整性与实时数据更新相结合。

在实践中,我们需要设计一个系统来接收事件、归档它们、执行离线和实时计算,并将这些计算的结果合并成连贯的信息。所有这些都需要以每秒数百万个事件的规模发生。

让我们从第一个挑战开始:接收和处理这些事件。

事件接收
在设计我们的设备-服务器通信时,我们的目标是:减少对电池和网络使用的影响;确保数据可靠性;并尽可能接近实时地获取数据。为了减少对设备的影响,我们分批发送分析事件并在发送前对其进行压缩。为确保有价值的数据始终到达我们的服务器,设备会在随机回退后重试失败的数据传输,并达到设备上的磁盘大小限制。为了尽快将事件传递到服务器,有几个触发器会导致设备尝试传输:当应用程序处于前台时每隔几分钟触发一次的时间触发器、多个事件触发器和一个应用程序进入后台扳机。

这种通信协议导致设备每秒向我们发送数十万个压缩有效载荷。这些有效负载中的每一个都可能包含数十个事件。为了可靠地处理这种负载并以一种允许轻松线性扩展的方式,我们希望使接受事件的服务变得非常简单。

每天处理 50 亿次会话——实时

该服务是用 GOLANG 编写的,以 Amazon Elastic Load Balancer (ELB) 为前端,并且只是将它接收到的每个有效负载排入一个持久的 Kafka 队列

归档
因为 Kafka 将接收到的消息写入磁盘并支持为每条消息保留多个副本,所以它是一个持久存储。因此,一旦信息在其中,我们知道我们可以通过稍后处理或重新处理消息来容忍下游延迟或故障。然而,Kafka 并不是我们历史数据的永久真实来源——按照我们所看到的信息传入速度,我们需要数百个盒子来存储几天的所有数据。因此,我们将 Kafka 集群配置为将信息保留几个小时(足够我们响应任何意外的重大故障),并尽快将数据传输到我们的永久存储 Amazon Simple Storage Service (Amazon S3)。

我们广泛使用 Storm 进行实时数据处理,第一个相关拓扑是从 Kafka 读取信息并将其写入 Amazon S3 的拓扑。

每天处理 50 亿次会话——实时

批量计算
一旦数据进入 Amazon S3,我们就可以通过 Amazon Elastic MapReduce (Amazon EMR) 计算我们的数据允许我们进行的任何事情。这包括我们的客户在他们的仪表板中看到的所有数据的批处理作业,以及我们处理新功能时的实验性作业。

每天处理 50 亿次会话——实时

我们在 Cascading中编写 MapReduce 并通过 Amazon EMR 运行它们。Amazon EMR 读取我们在 Amazon S3 中存档的数据作为输入,并在处理完成后将结果写回 Amazon S3。我们通过在 Storm 中运行的调度程序拓扑检测作业的完成情况,并将 Amazon S3 的输出泵入 Cassandra 集群,以使其可用于亚秒级 API 查询。

速度计算
到目前为止,我们所描述的是一个用于执行分析计算的持久且容错的框架。然而,有一个明显的问题——它不是实时的。一些计算每小时运行一次,而另一些则需要一整天的数据作为输入。计算时间从几分钟到几小时不等,将输出从 Amazon S3 获取到服务层所需的时间也是如此。因此,充其量,我们的数据总是会落后几个小时,并且不会满足我们实时和可操作的目标。

为了解决这个问题,在归档数据的同时,我们对其执行流计算。

每天处理 50 亿次会话——实时

一个独立的 Storm 拓扑使用与我们的归档拓扑相同的 Kafka 主题,并执行与我们的 MapReduce 作业相同的计算,但是是实时的。这些计算的输出被写入不同的独立 Cassandra 集群以进行实时查询。

为了弥补我们在速度层中的时间比批处理更少的事实,并且可能更少的资源,我们使用概率算法,如 Bloom Filters 和 HyperLogLog  (以及一些自制的)。这些算法使我们能够在空间和时间复杂度上比它们的蛮力替代方案获得数量级的增益,代价是精度损失可以忽略不计。

将其组合在一起
既然我们有两组独立生成的数据(批量和速度),我们如何将它们结合起来以提供一个连贯的答案?

每天处理 50 亿次会话——实时

我们将它们与我们的 API 中的逻辑结合起来,在特定条件下利用每个数据集。

因为批量计算是可重复的,并且比速度更容错,我们的 API 总是支持批量生成的数据。因此,例如,如果我们的 API 收到对 30 天时间序列 DAU 图的数据请求,它将首先从批处理服务的 Cassandra 集群请求全部范围。如果这是一个历史查询,那么所有数据都将在那里得到满足。但是,在查询包括当天的更有可能的情况下,查询将主要由批量生成的数据来满足,而最近一两天将由速度数据来满足。

故障场景
处理让我们回顾几个不同的故障场景,看看这个架构如何让我们在面对它们时优雅地降级而不是宕机或丢失数据。

我们已经讨论了设备上的回退后重试策略。重试可确保在客户端网络不可用或后端服务器短暂中断的情况下,数据最终到达我们的服务器。随机回退可确保在单个区域的短暂网络中断或我们的后端服务器短暂不可用后,设备不会压倒 (DDos) 我们的服务器。

如果我们的速度(实时)处理层出现故障会发生什么?我们的待命工程师将得到传呼并解决问题。由于速度处理层的输入是一个持久的 Kafka 集群,因此不会丢失任何数据,一旦速度层恢复运行,它将赶上它在停机期间应该处理的数据。

由于速度层与批处理层完全解耦,批处理层处理将继续进行而不会受到影响。因此,唯一的影响是在速度层中断期间对数据点的实时更新延迟。

如果批处理层出现问题或严重延迟会怎样?我们的 API 将无缝地从速度层查询更多数据。以前可能从速度层收到一天数据的时间序列查询现在将查询它两天或三天的数据。由于速度层与批处理层完全解耦,因此速度层的处理将不受任何影响。同时,我们的随叫随到的工程师将得到寻呼并解决批处理层问题。一旦批处理层备份,它将赶上延迟的数据处理,我们的 API 将再次无缝利用现在可用的批处理生成的数据。

我们的后端架构由四个主要组件组成:事件接收、事件归档、速度计算和批量计算。每个组件之间的持久队列确保其中一个组件的中断不会溢出到其他组件,并且我们以后可以从中断中恢复。我们 API 中的查询逻辑允许我们无缝地优雅降级,然后在计算层之一延迟或关闭然后恢复时恢复。

我们对 Answers 的目标是创建一个仪表板,使了解您的用户群变得非常简单,这样您就可以花时间构建令人惊叹的体验,而不是挖掘数据。 在此处了解有关答案的更多信息 并立即开始。

非常感谢 Answers 团队为使该架构成为现实所做的所有努力。还要感谢 Nathan Marz 的《 大数据》一书