引言
清楚数据的类型有助于我们设计一个性能更高,更有针对性的数据系统,比如在线系统,离线系统(批处理)。近实时系统(流处理)等等。比如说批处理系统,这显然就适合用户对实时性要求不高,而对于吞吐量邀请较高,且数据流可以分为多个数据块,比如生成索引,显然我们需要一整块的数据,然后经过一系列的处理可以生成一个索引。那么流处理系统呢,这就适合与用户对于数据的实时性有要求,可以在事件发生后不久进行处理,这里数据是无界的,所以显然无法像批处理一样分批处理,就比如某个业务其实一直在进行,且我们希望处理这些业务的请求,并生成一些数据,但是不希望像批处理一样一次处理一段时间内的请求,这样实时性不好,所以需要把数据当做水流一样,源源不断的进行处理,这样的场景也是很多的,比如用户的活动事件、传感器的感应事件,数据库的写入操作等。
如何发送事件流
上面提到了批处理系统其实并不适合流式数据处理,所以我们需要一些工具来处理流式数据,当然数据库也可以完成这个任务,生产者发送任务到数据库中,消费者轮询查找,但是当数据多了以后显然效率上是有问题的,所以这其实是需要某些通知机制或者更为高效的模型,如下:
- 发布订阅系统
- 生产者消费者直接通信
- 消息队列
这里其实就有一点豁然开朗的感觉,以前只是知道这是分布式通信的方法,却不知道这其实本质是流式数据处理。
我们拿消息队列和数据库作对比,其实本质上和数据库非常类似,但还是存在差异:
- 数据库一般数据在明确要求删除时删除,而消息队列实在数据被消费时删除。
- 如果消息队列内数据过多的话要么进行限流,要么把数据转储回磁盘,效率降低。
- 数据库通常支持加快查找的方法,而消息队列确没有。
- 消息队列不支持查询,但可以支持发布订阅。
这些被视为传统观点,体现在JMS/AMQP这样的标准中,但因为其对序列不敏感的原因,并不是只有这样的实现。一些MQ实现了日志式的消息队列,它们保证数据的持久性,且保证顺序,这样就导致一个处理时间长的任务可能阻塞后面任务的处理。比如kafka,RoketMQ等。因为这些原因,当消息处理代价过高,希望并行处理,且消息排序并不重要的时候我们可以使用JMS/AMQP类型的消息队列,但对于消息处理很快,且对于顺序要求高的任务,基于日志的消息队列工作的更好。
有一点值得一提,就是在消息队列上客户端出现的幂等问题,这里的通用解决方案是偏移量,也就是客户端在处理了每个消息以后向MQ发送自己的偏移量,下一次的消息处理从偏移量开始。但这样仍有危险,就是在处理了消息后还没有记录偏移量,在重新启动后还会处理那些已经处理过的消息,如果从MQ下手问题可能不好解决,这里我们可以在客户端下手,保证幂等性。
流处理
现在我们清楚了流的来源,知道了流的传输,那么我们该如何处理这些数据呢?可能有如下几种可能:
- 写入数据库,缓存等存储系统,直接客户端使用。
- 直接把数据推送到用户,这样的话人是流的最终消费者(比如热点新闻)。
- 处理输入流然后产生另一个数据流,向批处理系统一样最后的数据流向前两步一样。
我们可以发现一个有意思的事实,就是流式数据处理和批处理的过程非常类似,但是我们仍能发现以下差异:
- 排序对于流式数据来说没有什么意义(无界的)。
- 容错机制不再相同,批处理中可以选择重新运行作业,因为输入不变,但是流处理不可能重新运行整个数据集,因为可能已经运行了很长时间,这并不现实。
那么流处理适用于哪些事件呢?如下:
- 复杂事件处理:我们可以其实就是在流中搜索特定类型的事件,我们只需要在流处理系统中维护一个所要匹配的特定模式即可。当发现匹配模式的事件时产生一个输出。
- 流分析:这里我们更为关心大量事件的累计效果和统计指标。比如测量某种事件的速率,统计一段时间某个值的平均值等。
- 在流中搜索:与复杂事件处理类似,不过这里是匹配单个事件而不是模式匹配。
不可靠的时钟
以上我们提到了一个问题,就是时间段,我们知道在多机器之间时钟是不可信任的,那么如果确定时间段呢?首先可以使用本地时间的话,这样的话优点是简单,但是可能本地时钟与客户端存在明显的滞后,且还有不可控的网络,这样的话可能准确度就没办法保证。如果使用客户端时间呢?显然存在很大的问题,就是客户端时间不可信任,如果恶意攻击的话会有很大的安全问题。再来考虑一个问题,我们现在需要统计每一分钟的流量,假如现在需要统计第十分钟的流量,十分钟已经过去了,现在到了十一分钟或者十二分钟,那么何时终止第十分钟的计算呢,因为还可能有第十分钟的事件没有到来,此时有两种方法,一个是维护一个全局偏移量,当在十一分钟收到十分钟数据时简单的丢弃,或者针对这个滞后时间发布一个更正事件,显然后者会增加系统的复杂度。
那么时钟问题如何解决呢,一种有效的方法是记录多个事件戳:
- 根据设备的时钟记录时间按发生的时间。
- 根据设备的时钟记录将事件发送到服务器的时间。
- 根据服务器的时钟记录服务器收到事件的时间。
这样我们可以使用第三个时间戳减去第二个时间戳,去估计出服务器和设备之间的偏移量,然后可以将该偏移量应用于事件时间戳。这样我们可以确定真实发生的时间戳。这个时候我们想要确定一个时间段就容易了,因为我们获取了近似的真实时间戳,然后只需要选择一个合理的窗口用以表示时间段即可。
容错
我们上面提到流处理的容错机制与批处理有所不同,主要原因就是流处理系统是无界的,也就是说我们不能简单的重启任务,所以我们需要其他的方法。
- 微批处理:其实就是把流式处理看做一个个小的批处理,这需要我们存储一段时间内的数据,这里通常为1S左右,显然较小的批处理会导致更大的协调开销,较大的微处理会导致较高的延迟,这显然是一个性能折中的考虑,这样我们只需要在宕机的时候重启这个微处理即可。
- 原子提交:我们可以把多个输入当做一个事务,并在失败时重试,这样我们可以丢弃掉失败任务的部分输出(未提交)。其实要达到相同的目的还有其他的方法,就是依赖于幂等性,我们只要保证处理一个输入不会造成多余的问题就可以达到丢弃失败任务的部分输出。上面我们提到了偏移量可以解决这个问题。
总结
流式数据显然是一种非常普遍的数据场景,这也是很多相关框架崛起的原因。从处理数据中得出的见解(insights)是有价值的。这样的见解(insights)并非都是生来平等的。一些见解(insights)在发生后不久就具有很高的价值,并且随着时间的流逝,这种价值会迅速减少。流处理针对这样的场景。流处理的关键优势在于相比于批处理它能够更快地提供见解(insights)。
参考:
- 博文《浅谈流处理》
- 书籍《Designing Data-Intensive Application》