【发布时间】:2022-10-04 23:52:27
【问题描述】:
版本控制 API 功能强大。然而,随着pattern 的使用,代码很快就会变得混乱,难以阅读和维护。
随着时间的推移,产品需要快速发展以引入新的业务/需求。有什么建议可以明智地使用这个 API。
【问题讨论】:
标签: cadence-workflow temporal-workflow
版本控制 API 功能强大。然而,随着pattern 的使用,代码很快就会变得混乱,难以阅读和维护。
随着时间的推移,产品需要快速发展以引入新的业务/需求。有什么建议可以明智地使用这个 API。
【问题讨论】:
标签: cadence-workflow temporal-workflow
我建议使用全局版本提供者设计模式如果可能,在节奏/时间工作流程中。
版本控制 API 非常强大,可以让您以确定的方式(向后兼容)更改现有工作流执行的行为。在现实世界中,您可能只关心添加新行为,并且可以只将此新行为引入新启动的工作流执行。在这种情况下,您使用全局版本提供程序来统一整个工作流程的版本控制。
关键思想是我们正在对整个工作流程进行版本控制(这就是为什么它被称为GlobalVersionProvider)。每次添加新版本时,我们都会更新版本提供者并提供新版本。
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.temporal.workflow.Workflow;
import java.util.HashMap;
import java.util.Map;
public class GlobalVersionProvider {
private static final String WORKFLOW_VERSION_CHANGE_ID = "global";
private static final int STARTING_VERSION_USING_GLOBAL_VERSION = 1;
private static final int STARTING_VERSION_DOING_X = 2;
private static final int STARTING_VERSION_DOING_Y = 3;
private static final int MAX_STARTING_VERSION_OF_ALL =
STARTING_VERSION_DOING_Y;
// Workflow.getVersion can release a thread and subsequently cause a non-deterministic error.
// We're introducing this map in order to cache our versions on the first call, which should
// always occur at the beginning of an workflow
private static final Map<String, GlobalVersionProvider> RUN_ID_TO_INSTANCE_MAP =
new HashMap<>();
private final int versionOnInstantiation;
private GlobalVersionProvider() {
versionOnInstantiation =
Workflow.getVersion(
WORKFLOW_VERSION_CHANGE_ID,
Workflow.DEFAULT_VERSION,
MAX_STARTING_VERSION_OF_ALL);
}
private int getVersion() {
return versionOnInstantiation;
}
public boolean isAfterVersionOfUsingGlobalVersion() {
return getVersion() >= STARTING_VERSION_USING_GLOBAL_VERSION;
}
public boolean isAfterVersionOfDoingX() {
return getVersion() >= STARTING_VERSION_DOING_X;
}
public boolean isAfterVersionOfDoingY() {
return getVersion() >= STARTING_VERSION_DOING_Y;
}
public static GlobalVersionProvider get() {
String runId = Workflow.getInfo().getRunId();
GlobalVersionProvider instance;
if (RUN_ID_TO_INSTANCE_MAP.containsKey(runId)) {
instance = RUN_ID_TO_INSTANCE_MAP.get(runId);
} else {
instance = new GlobalVersionProvider();
RUN_ID_TO_INSTANCE_MAP.put(runId, instance);
}
return instance;
}
// NOTE: this should be called at the beginning of the workflow method
public static void upsertGlobalVersionSearchAttribute() {
int workflowVersion = get().getVersion();
Workflow.upsertSearchAttributes(
ImmutableMap.of(
WorkflowSearchAttribute.TEMPORAL_WORKFLOW_GLOBAL_VERSION.getValue(),
workflowVersion));
}
// Call this API on each replay tests to clear up the cache
@VisibleForTesting
public static void clearInstances() {
RUN_ID_TO_INSTANCE_MAP.clear();
}
}
请注意,由于a bug in Temporal/Cadence Java SDK,Workflow.getVersion 可以释放线程并随后导致非确定性错误。 我们引入这张地图是为了在第一次调用时缓存我们的版本,这应该 总是发生在工作流执行的开始。
在每个重放测试上调用clearInstances API 以清除缓存。
因此在工作流代码中:
public class HelloWorldImpl{
private GlovalVersionProvider globalVersionProvider;
@VisibleForTesting
public HelloWorldImpl(final GlovalVersionProvider versionProvider){
this.globalVersionProvider = versionProvider;
}
public HelloWorldImpl(){
this.globalVersionProvider = GlobalVersionProvider.get();
}
@Override
public void start(final Request request) {
if (globalVersionProvider.isAfterVersionOfUsingGlobalVersion()) {
GlobalVersionProvider.upsertGlobalVersionSearchAttribute();
}
...
...
if (globalVersionProvider.isAfterVersionOfDoingX()) {
// doing X here
...
}
...
if (globalVersionProvider.isAfterVersionOfDoingY()) {
// doing Y here
...
}
...
}
对于每个新版本
STARTING_VERSION_XXXX
MAX_STARTING_VERSION_OF_ALL
要删除旧代码路径(版本),请添加一个新版本以不执行旧代码路径,然后稍后使用搜索属性查询,如
GlobalVersion>=STARTING_VERSION_DOING_X AND GlobalVersion<STARTING_VERSION_NOT_DOING_X 了解是否有现有的工作流执行仍在运行某些版本。
弃用代码路径DoingX 的示例:
因此在工作流代码中:
public class HelloWorldImpl implements Helloworld{
...
@Override
public void start(final Request request) {
...
...
if (globalVersionProvider.isAfterVersionOfDoingX() && GlobalVersionProvider.get().isAfterVersionOfNotDoingX()) {
// doing X here
...
}
}
###TODO 在 Golang 中的示例
应该没有什么坏处。使用此模式不会阻止您直接在工作流中使用原始版本控制 API。您可以将此模式与其他模式结合在一起。
【讨论】: