源碼解析 | Flink Timer(定時器)機制與實現(xiàn)
點擊上方藍色字體,選擇“設為星標”

Timer簡介
Timer(定時器)是Flink Streaming API提供的用于感知并利用處理時間/事件時間變化的機制。官網上上給出的描述如下:
Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.
對于普通用戶來說,最常見的顯式利用Timer的地方就是KeyedProcessFunction。我們在其processElement()方法中注冊Timer,然后覆寫其onTimer()方法作為Timer觸發(fā)時的回調邏輯。根據時間特征的不同:
處理時間—調用Context.timerService().registerProcessingTimeTimer()注冊;onTimer()在系統(tǒng)時間戳達到Timer設定的時間戳時觸發(fā)。
事件時間—調用Context.timerService().registerEventTimeTimer()注冊;onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發(fā)。
舉個栗子,按天實時統(tǒng)計指標并存儲在狀態(tài)中,每天0點清除狀態(tài)重新統(tǒng)計,就可以在processElement()方法里注冊Timer。
ctx.timerService().registerProcessingTimeTimer(
tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1
);
public static long tomorrowZeroTimestampMs(long now, int timeZone) {
return now - (now + timeZone * 3600000) % 86400000 + 86400000;
}再在onTimer()方法里執(zhí)行state.clear()。so easy。
除了KeyedProcessFunction之外,Timer在窗口機制中也有重要的地位。提起窗口自然就能想到Trigger,即觸發(fā)器。來看下Flink自帶的EventTimeTrigger的部分代碼,它是事件時間特征下的默認觸發(fā)器。
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}可見,當水印還沒有到達窗口右邊沿時,就注冊以窗口右邊沿為時間戳的Timer。Timer到期后觸發(fā)onEventTime()方法,進而觸發(fā)該窗口相關聯(lián)的Trigger。
文章開頭引用的blog從用戶的角度給出了Flink Timer的4大特點,如下圖所示。

經由上面的介紹,我們有了兩個入手點(KeyedProcessFunction、Trigger)來分析Timer的細節(jié)。接下來從前者入手,let's get our hands dirty。
TimerService、InternalTimerService
負責實際執(zhí)行KeyedProcessFunction的算子是KeyedProcessOperator,其中以內部類的形式實現(xiàn)了KeyedProcessFunction需要的上下文類Context,如下所示。
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
private final TimerService timerService;
private StreamRecord<IN> element;
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
@Override
public Long timestamp() {
checkState(element != null);
if (element.hasTimestamp()) {
return element.getTimestamp();
} else {
return null;
}
}
@Override
public TimerService timerService() {
return timerService;
}
// 以下略...
}可見timerService()方法返回的是外部傳入的TimerService實例,那么我們就回到KeyedProcessOperator看一下它的實現(xiàn),順便放個類圖。
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
private transient ContextImpl context;
private transient OnTimerContextImpl onTimerContext;
public KeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> function) {
super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
@Override
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
// 以下略...
}
通過閱讀上述代碼,可以總結出:
TimerService接口的實現(xiàn)類為SimpleTimerService,它實際上又是InternalTimerService的非常簡單的代理(真的很簡單,代碼略去)。
InternalTimerService的實例由getInternalTimerService()方法取得,該方法定義在所有算子的基類AbstractStreamOperator中。它比較重要,后面再提。
KeyedProcessOperator.processElement()方法調用用戶自定義函數(shù)的processElement()方法,順便將上下文實例ContextImpl傳了進去,所以用戶可以由它獲得TimerService來注冊Timer。
Timer在代碼中叫做InternalTimer(是個接口)。
當Timer觸發(fā)時,實際上是根據時間特征調用onProcessingTime()/onEventTime()方法(這兩個方法來自Triggerable接口),進而觸發(fā)用戶函數(shù)的onTimer()回調邏輯。后面還會見到它們。
接下來就看看InternalTimerService是如何取得的。
/**
* Returns a {@link InternalTimerService} that can be used to query current processing time
* and event time and to set timers. An operator can have several timer services, where
* each has its own namespace serializer. Timer services are differentiated by the string
* key that is given when requesting them, if you call this method with the same key
* multiple times you will get the same timer service instance in subsequent requests.
*
* Timers are always scoped to a key, the currently active key of a keyed stream operation.
* When a timer fires, this key will also be set as the currently active key.
*
*
Each timer has attached metadata, the namespace. Different timer services
* can have a different namespace type. If you don't need namespace differentiation you
* can use {@link VoidNamespaceSerializer} as the namespace serializer.
*
* @param name The name of the requested timer service. If no service exists under the given
* name a new one will be created and returned.
* @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
* @param triggerable The {@link Triggerable} that should be invoked when timers fire
* @param The type of the timer namespace.
*/
public <K, N> InternalTimerService<N> getInternalTimerService(
String name,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable) {
checkTimerServiceInitialization();
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
TypeSerializer<K> keySerializer = keyedStateBackend.getKeySerializer();
InternalTimeServiceManager<K> keyedTimeServiceHandler = (InternalTimeServiceManager<K>) timeServiceManager;
TimerSerializer<K, N> timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer);
return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable);
}該方法的注釋描述非常清楚,所以一起粘貼過來。簡單來講:
每個算子可以有一個或多個InternalTimerService。
InternalTimerService的四要素是:名稱、命名空間類型N(及其序列化器)、鍵類型K(及其序列化器),還有上文所述Triggerable接口的實現(xiàn)。
InternalTimerService經由InternalTimeServiceManager.getInternalTimerService()方法取得。
例如,上文KeyedProcessOperator初始化的InternalTimerService,名稱為"user-timers",命名空間類型為空(VoidNamespace),Triggerable實現(xiàn)類則是其本身。如果是WindowOperator的話,其InternalTimerService的名稱就是"window-timers",命名空間類型則是Window。
InternalTimerService在代碼中仍然是一個接口,其代碼如下。方法的簽名除了多了命名空間之外(命名空間對用戶透明),其他都與TimerService提供的相同。
public interface InternalTimerService<N> {
long currentProcessingTime();
long currentWatermark();
void registerProcessingTimeTimer(N namespace, long time);
void deleteProcessingTimeTimer(N namespace, long time);
void registerEventTimeTimer(N namespace, long time);
void deleteEventTimeTimer(N namespace, long time);
// ...
}下面更進一步,看看InternalTimeServiceManager是如何實現(xiàn)的。
InternalTimeServiceManager、TimerHeapInternalTimer
顧名思義,InternalTimeServiceManager用于管理各個InternalTimeService。部分代碼如下:
public class InternalTimeServiceManager<K> {
@VisibleForTesting
static final String TIMER_STATE_PREFIX = "_timer_state";
@VisibleForTesting
static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
@VisibleForTesting
static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";
private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext;
private final PriorityQueueSetFactory priorityQueueSetFactory;
private final ProcessingTimeService processingTimeService;
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
private final boolean useLegacySynchronousSnapshots;
@SuppressWarnings("unchecked")
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
Triggerable<K, N> triggerable) {
InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);
timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);
return timerService;
}
@SuppressWarnings("unchecked")
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService = new InternalTimerServiceImpl<>(
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
timerServices.put(name, timerService);
}
return timerService;
}
private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
String name,
TimerSerializer<K, N> timerSerializer) {
return priorityQueueSetFactory.create(
name,
timerSerializer);
}
// 以下略...
}從上面的代碼可以得知:
Flink中InternalTimerService的最終實現(xiàn)實際上是InternalTimerServiceImpl類,而InternalTimer的最終實現(xiàn)是TimerHeapInternalTimer類。
InternalTimeServiceManager會用HashMap維護一個特定鍵類型K下所有InternalTimerService的名稱與實例映射。如果名稱已經存在,就會直接返回,不會重新創(chuàng)建。
初始化InternalTimerServiceImpl時,會同時創(chuàng)建兩個包含TimerHeapInternalTimer的優(yōu)先隊列(該優(yōu)先隊列是Flink自己實現(xiàn)的),分別用于維護事件時間和處理時間的Timer。
說了這么多,最需要注意的是,Timer是維護在JVM堆內存中的,如果頻繁注冊大量Timer,或者同時觸發(fā)大量Timer,也是一筆不小的開銷。
TimerHeapInternalTimer的實現(xiàn)比較簡單,主要就是4個字段和1個方法。為了少打點字,把注釋也弄過來。
/**
* The key for which the timer is scoped.
*/
@Nonnull
private final K key;
/**
* The namespace for which the timer is scoped.
*/
@Nonnull
private final N namespace;
/**
* The expiration timestamp.
*/
private final long timestamp;
/**
* This field holds the current physical index of this timer when it is managed by a timer heap so that we can
* support fast deletes.
*/
private transient int timerHeapIndex;
@Override
public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
return Long.compare(timestamp, other.getTimestamp());
}
}可見,Timer的scope有兩個,一是數(shù)據的key,二是命名空間。但是用戶不會感知到命名空間的存在,所以我們可以簡單地認為Timer是以key級別注冊的(Timer四大特點之1)。正確估計key的量可以幫助我們控制Timer的量。
timerHeapIndex是這個Timer在優(yōu)先隊列里存儲的下標。優(yōu)先隊列通常用二叉堆實現(xiàn),而二叉堆可以直接用數(shù)組存儲,所以讓Timer持有其對應的下標可以較快地從隊列里刪除它。
comparePriorityTo()方法則用于確定Timer的優(yōu)先級,顯然Timer的優(yōu)先隊列是一個按Timer時間戳為關鍵字排序的最小堆。下面粗略看看該最小堆的實現(xiàn)。
HeapPriorityQueueSet
上面代碼中PriorityQueueSetFactory.create()方法創(chuàng)建的優(yōu)先隊列實際上的類型是HeapPriorityQueueSet。它的基本思路與Java自帶的PriorityQueue相同,但是在其基礎上加入了按key去重的邏輯(Timer四大特點之2)。不妨列出它的部分代碼。
private final HashMap<T, T>[] deduplicationMapsByKeyGroup;
private final KeyGroupRange keyGroupRange;
@Override
@Nullable
public T poll() {
final T toRemove = super.poll();
return toRemove != null ? getDedupMapForElement(toRemove).remove(toRemove) : null;
}
@Override
public boolean add(@Nonnull T element) {
return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
}
@Override
public boolean remove(@Nonnull T toRemove) {
T storedElement = getDedupMapForElement(toRemove).remove(toRemove);
return storedElement != null && super.remove(storedElement);
}
private HashMap<T, T> getDedupMapForKeyGroup(
@Nonnegative int keyGroupId) {
return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupId)];
}
private HashMap<T, T> getDedupMapForElement(T element) {
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
keyExtractor.extractKeyFromElement(element),
totalNumberOfKeyGroups);
return getDedupMapForKeyGroup(keyGroup);
}
private int globalKeyGroupToLocalIndex(int keyGroup) {
checkArgument(keyGroupRange.contains(keyGroup), "%s does not contain key group %s", keyGroupRange, keyGroup);
return keyGroup - keyGroupRange.getStartKeyGroup();
}要搞懂它,必須解釋一下KeyGroup和KeyGroupRange。KeyGroup是Flink內部KeyedState的原子單位,亦即一些key的組合。一個Flink App的KeyGroup數(shù)量與最大并行度相同,將key分配到KeyGroup的操作則是經典的取hashCode+取模。而KeyGroupRange則是一些連續(xù)KeyGroup的范圍,每個Flink sub-task都只包含一個KeyGroupRange。也就是說,KeyGroupRange可以看做當前sub-task在本地維護的所有key。
解釋完畢。容易得知,上述代碼中的那個HashMap數(shù)組就是在KeyGroup級別對key進行去重的容器,數(shù)組中每個元素對應一個KeyGroup。以插入一個Timer的流程為例:
從Timer中取出key,計算該key屬于哪一個KeyGroup;
計算出該KeyGroup在整個KeyGroupRange中的偏移量,按該偏移量定位到
HashMap數(shù)組的下標;[] 根據putIfAbsent()方法的語義,只有當對應HashMap不存在該Timer的key時,才將Timer插入最小堆中。
接下來回到主流程,InternalTimerServiceImpl。
InternalTimerServiceImpl
在這里,我們終于可以看到注冊和移除Timer方法的最底層實現(xiàn)了。注意ProcessingTimeService是Flink內部產生處理時間的時間戳的服務。
private final ProcessingTimeService processingTimeService;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue;
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
private ScheduledFuture<?> nextTimer;
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
nextTimer = processingTimeService.registerTimer(time, this);
}
}
}
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
@Override
public void deleteEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}由此可見,注冊Timer實際上就是為它們賦予對應的時間戳、key和命名空間,并將它們加入對應的優(yōu)先隊列。特別地,當注冊基于處理時間的Timer時,會先檢查要注冊的Timer時間戳與當前在最小堆堆頂?shù)腡imer的時間戳的大小關系。如果前者比后者要早,就會用前者替代掉后者,因為處理時間是永遠線性增長的。
Timer注冊好了之后是如何觸發(fā)的呢?先來看處理時間的情況。
InternalTimerServiceImpl類繼承了ProcessingTimeCallback接口,表示它可以觸發(fā)處理時間的回調。該接口只要求實現(xiàn)一個方法,如下。
@Override
private Triggerable<K, N> triggerTarget;
public void onProcessingTime(long time) throws Exception {
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}可見,當onProcessingTime()方法被觸發(fā)回調時,就會按順序從隊列中獲取到比時間戳time小的所有Timer,并挨個執(zhí)行Triggerable.onProcessingTime()方法,也就是在上文KeyedProcessOperator的同名方法,用戶自定義的onTimer()邏輯也就被執(zhí)行了。
最后來到ProcessingTimeService的實現(xiàn)類SystemProcessingTimeService,它是用調度線程池實現(xiàn)回調的。相關的代碼如下。
private final ScheduledThreadPoolExecutor timerService;
@Override
public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
try {
return timerService.schedule(
new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
} else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
} else {
throw e;
}
}
}
// 注意:這個是TriggerTask線程的run()方法
@Override
public void run() {
synchronized (lock) {
try {
if (serviceStatus.get() == STATUS_ALIVE) {
target.onProcessingTime(timestamp);
}
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
}
}
}可見,onProcessingTime()在TriggerTask線程中被回調,而TriggerTask線程按照Timer的時間戳來調度。到這里,處理時間Timer的情況就講述完畢了。
再來看事件時間的情況。事件時間與內部時間戳無關,而與水印有關。以下是InternalTimerServiceImpl.advanceWatermark()方法的代碼。
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}該邏輯與處理時間相似,只不過從回調onProcessingTime()變成了回調onEventTime()而已。然后追蹤它的調用鏈,回到InternalTimeServiceManager的同名方法。
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}繼續(xù)向上追溯,到達終點:算子基類AbstractStreamOperator中處理水印的方法processWatermark()。當水印到來時,就會按著上述調用鏈流轉到InternalTimerServiceImpl中,并觸發(fā)所有早于水印時間戳的Timer了。
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}至此,我們算是基本打通了Flink Timer機制的實現(xiàn)細節(jié)。

版權聲明:
文章不錯?點個【在看】吧!??




