问题触发的环境:kafka source + memory channel + hdfs sink
日志截图如下:
这里说的内容是:queue空间不足。sink好像没有紧跟source,或者是buffer大小太小。这里的queue代表什么?sink没有紧跟source的具体含义是什么?buffer又是什么?我分析源代码后,将结果在下面铺开向大家展示。
memory channel内部结构
memory channel内部有三个队列,分别是图中的putList,queue,takeList。有两个参数来控制他们的大小,默认值都是100,分别是:
capacity:代表queue的大小,主要的存储event的部分。
transactionCapacity:代表putList和takeList的大小。
channel是如何被使用的
channel之上有一把锁,当source主动向channel放数据或者sink主动从channel取数据时,会抢锁,谁取到锁,谁就可以操作channel。
每次使用时会首先调用tx.begin()开始事务,也就是获取锁。然后调用tx.commit()提交数据或者调用tx.rollback()取消操作。
source往channel放数据
就是一个死循环,source一直试图获取channel锁,然后从kafka获取数据,放入channel中,那每次放入多少个数据呢?在KafkaSource.java中,代码是这样的:
含义就是:每次最多放batchUpperLimit或最多等待batchEndTime的时间,就结束向channel放数据。这两个参数的默认值分别是1000个和1s。分别由batchSize和batchDurationMillis设置。
当获取了足够的数据,首先放入putList中,然后就会调用tx.commit()将putList的全部数据放入queue中。
sink从channel取数据
也是一个死循环,sink一直试图获取channel锁,然后从channel取一批数据,放入sink和takeList(仅仅用于回滚,在调用rollback时takeList的数据会回滚道queue中)。每次取多少个event呢?这在HdfsSink中,代码如下:
batchSize的大小默认是100,由hdfs.batchSize控制,取够了,再调用tx.commit(),将putList的所有数据放入queue。
Space for commit to queue couldn’t be acquired异常如何发生的
经过上面的一系列介绍,已经知道了kafka source、memory channel、hdfs sink协同工作的过程。因为“source往putList放数据,然后提交到queue中”与“sink从channel中取数据到sink和takeList,然后再从putList取数据到queue中”这两部分是分开来,任他们自由抢锁,所以,当前者多次抢到锁,后者没有抢到锁,同时queue的大小又太小,撑不住多次往里放数据,就会导致触发这个异常。
解决这个问题最直接的办法就是增大queue的大小,增大capacity和transacCapacity之间的差距,queue能撑住多次往里面放数据即可。
失败后flume是如何处理的
flume会暂停source向channel放数据,等待几秒钟,这期间sink应该会消费channel中的数据,当source再次开始想channel放数据时channel就有足够的空间了。