文章目录

两阶段提交

何为EXACTLY_ONCE?
EXACTLY_ONCE简称EOS,每条输入消息只会影响最终结果一次,注意这里是影响一次,而非处理一次,Flink一直宣称自己支持EOS,实际上主要是对于Flink应用内部来说的,对于外部系统(端到端)则有比较强的限制
1.外部系统写入支持幂等性
2.外部系统支持以事务的方式写入
什么是两阶段提交?预提交+实际提交
flink里将两阶段提交逻辑抽象到TwoPhaseCommitSinkFunction类中,具体实现如下:
Flink两阶段提交

//TwoPhaseCommitSinkFunction
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		long checkpointId = context.getCheckpointId();
		//预提交,如果语义为EXACTLY_ONCE,什么也没干
		preCommit(currentTransactionHolder.handle);
		//pendingCommitTransactions插入当次检查点对应的currentTransactionHolder,包含事务生产者的实例(对于EXACTLY_ONCE模式)
		pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
		//这里又初始化了一次包含事务生产者的实例(对于EXACTLY_ONCE模式),并赋给currentTransactionHolder
		currentTransactionHolder = beginTransactionInternal();
		//清空state
		state.clear();
		//
		state.add(new State<>(
			this.currentTransactionHolder,
			new ArrayList<>(pendingCommitTransactions.values()),
			userContext));
	}

	public void initializeState(FunctionInitializationContext context) throws Exception {
		state = context.getOperatorStateStore().getListState(stateDescriptor);
		boolean recoveredUserContext = false;
		if (context.isRestored()) {
			for (State<TXN, CONTEXT> operatorState : state.get()) {
				userContext = operatorState.getContext();
				List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
				for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
					recoverAndCommitInternal(recoveredTransaction);
				}
				recoverAndAbort(operatorState.getPendingTransaction().handle);
				if (userContext.isPresent()) {
					finishRecoveringContext();
					recoveredUserContext = true;
				}
			}
		}
		if (!recoveredUserContext) {
			userContext = initializeUserContext();
		}
		this.pendingCommitTransactions.clear();
		//创建生产者事务,并返回句柄
		currentTransactionHolder = beginTransactionInternal();
	}

	public final void notifyCheckpointComplete(long checkpointId) throws Exception {
		//待提交的事务版本和事务句柄
		Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
		Throwable firstError = null;
		while (pendingTransactionIterator.hasNext()) {
			Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
			Long pendingTransactionCheckpointId = entry.getKey();
			TransactionHolder<TXN> pendingTransaction = entry.getValue();
			if (pendingTransactionCheckpointId > checkpointId) {
				continue;
			}
			try {
				//提交事务(最终调用commitTransaction)
				commit(pendingTransaction.handle);
			} catch (Throwable t) {
				//
			}
			pendingTransactionIterator.remove();
		}
	}
//FlinkKafkaProducer011
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		//调用父类的snapshotState方法
		super.snapshotState(context);
		//清空nextTransactionalIdHintState
		nextTransactionalIdHintState.clear();
		if (getRuntimeContext().getIndexOfThisSubtask() == 0 && semantic == Semantic.EXACTLY_ONCE) {
			long nextFreeTransactionalId = nextTransactionalIdHint.nextFreeTransactionalId;
			if (getRuntimeContext().getNumberOfParallelSubtasks() > nextTransactionalIdHint.lastParallelism) {
				nextFreeTransactionalId += getRuntimeContext().getNumberOfParallelSubtasks() * kafkaProducersPoolSize;
			}
			nextTransactionalIdHintState.add(new NextTransactionalIdHint(
				getRuntimeContext().getNumberOfParallelSubtasks(),
				nextFreeTransactionalId));
		}
	}

	public void initializeState(FunctionInitializationContext context) throws Exception {
		//如果检查点未开启,语义设置为NONE
		if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
			semantic = Semantic.NONE;
		}
		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
		//初始化事务ID生成器
		transactionalIdsGenerator = new TransactionalIdsGenerator(
			getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
			getRuntimeContext().getIndexOfThisSubtask(),
			getRuntimeContext().getNumberOfParallelSubtasks(),
			kafkaProducersPoolSize,
			SAFE_SCALE_DOWN_FACTOR);
		if (semantic != Semantic.EXACTLY_ONCE) {
			nextTransactionalIdHint = null;
		} else {
			//如果为EXACTLY_ONCE语义,初始化nextTransactionalIdHint(初始化lastParallelism和nextFreeTransactionalId为0),后面用来生成多个事务ID
			ArrayList<NextTransactionalIdHint> transactionalIdHints = Lists.newArrayList(nextTransactionalIdHintState.get());
			if (transactionalIdHints.size() > 1) {
			} else if (transactionalIdHints.size() == 0) {
				nextTransactionalIdHint = new NextTransactionalIdHint(0, 0);
				abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
			} else {
				nextTransactionalIdHint = transactionalIdHints.get(0);
			}
		}
		//调用父类的initializeState方法
		super.initializeState(context);
	}

假设一种场景,从Kafka Source拉取数据,经过一次窗口聚合,最后将数据发送到Kafka Sink,如下图:
1.JobManager向Source发送Barrier,开始进入pre-Commit阶段,当只有内部状态时,pre-commit阶段无需执行额外的操作,仅仅是写入一些已定义的状态变量即可。当chckpoint成功时Flink负责提交这些写入,否则就终止取消掉它们。

Flink两阶段提交
2.当Source收到Barrier后,将自身的状态进行保存,后端可以根据配置进行选择,这里的状态是指消费的每个分区对应的offset。然后将Barrier发送给下一个Operator。
Flink两阶段提交
3.当Window这个Operator收到Barrier之后,对自己的状态进行保存,这里的状态是指聚合的结果(sum或count的结果),然后将Barrier发送给Sink。Sink收到后也对自己的状态进行保存,之后会进行一次预提交。
Flink两阶段提交
4.预提交成功后,JobManager通知每个Operator,这一轮检查点已经完成,这个时候,会进行第二次Commit。
Flink两阶段提交
以上便是两阶段的完整流程,提交过程中如果失败有以下几种情况
1.Pre-commit失败,将恢复到最近一次CheckPoint位置
2.一旦pre-commit完成,必须要确保commit也要成功
因此,所有opeartor必须对checkpoint最终结果达成共识:即所有operator都必须认定数据提交要么成功执行,要么被终止然后回滚。

相关文章:

  • 2021-04-20
  • 2021-06-01
  • 2021-05-06
  • 2022-12-23
  • 2021-12-04
  • 2021-12-30
猜你喜欢
  • 2022-12-23
  • 2021-09-29
  • 2022-12-23
  • 2021-07-24
  • 2022-12-23
  • 2021-12-15
  • 2021-09-03
相关资源
相似解决方案