文章目录
两阶段提交
何为EXACTLY_ONCE?
EXACTLY_ONCE简称EOS,每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制
1.外部系统写入支持幂等性
2.外部系统支持以事务的方式写入
什么是两阶段提交?预提交+实际提交
flink里将两阶段提交逻辑抽象到TwoPhaseCommitSinkFunction类中,具体实现如下:
//TwoPhaseCommitSinkFunction
public void snapshotState(FunctionSnapshotContext context) throws Exception {
long checkpointId = context.getCheckpointId();
//预提交,如果语义为EXACTLY_ONCE,什么也没干
preCommit(currentTransactionHolder.handle);
//pendingCommitTransactions插入当次检查点对应的currentTransactionHolder,包含事务生产者的实例(对于EXACTLY_ONCE模式)
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
//这里又初始化了一次包含事务生产者的实例(对于EXACTLY_ONCE模式),并赋给currentTransactionHolder
currentTransactionHolder = beginTransactionInternal();
//清空state
state.clear();
//
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
recoverAndCommitInternal(recoveredTransaction);
}
recoverAndAbort(operatorState.getPendingTransaction().handle);
if (userContext.isPresent()) {
finishRecoveringContext();
recoveredUserContext = true;
}
}
}
if (!recoveredUserContext) {
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
//创建生产者事务,并返回句柄
currentTransactionHolder = beginTransactionInternal();
}
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
//待提交的事务版本和事务句柄
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
try {
//提交事务(最终调用commitTransaction)
commit(pendingTransaction.handle);
} catch (Throwable t) {
//
}
pendingTransactionIterator.remove();
}
}
//FlinkKafkaProducer011
public void snapshotState(FunctionSnapshotContext context) throws Exception {
//调用父类的snapshotState方法
super.snapshotState(context);
//清空nextTransactionalIdHintState
nextTransactionalIdHintState.clear();
if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
}
nextTransactionalIdHintState.add(new NextTransactionalIdHint(
getRuntimeContext().getNumberOfParallelSubtasks(),
nextFreeTransactionalId));
}
}
public void initializeState(FunctionInitializationContext context) throws Exception {
//如果检查点未开启,语义设置为NONE
if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
semantic = Semantic.NONE;
}
nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
//初始化事务ID生成器
transactionalIdsGenerator = new TransactionalIdsGenerator(
getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
kafkaProducersPoolSize,
SAFE_SCALE_DOWN_FACTOR);
if (semantic != Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
//如果为EXACTLY_ONCE语义,初始化nextTransactionalIdHint(初始化lastParallelism和nextFreeTransactionalId为0),后面用来生成多个事务ID
ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
if (transactionalIdHints.size() > 1) {
} else if (transactionalIdHints.size() == 0) {
nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);
abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
} else {
nextTransactionalIdHint = transactionalIdHints.get(0);
}
}
//调用父类的initializeState方法
super.initializeState(context);
}
假设一种场景,从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink,如下图:
1.JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。
2.当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
3.当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
4.预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,会进行第二次Commit。
以上便是两阶段的完整流程,提交过程中如果失败有以下几种情况
1.Pre-commit失败,将恢复到最近一次CheckPoint位置
2.一旦pre-commit完成,必须要确保commit也要成功
因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。