【问题标题】:Oozie custom asynchronous actionOozie 自定义异步操作
【发布时间】:2014-10-03 15:01:58
【问题描述】:

我在 Oozie 中实现自定义异步操作时遇到问题。我的类从 ActionExecutor 扩展而来,并覆盖了 initActionType、start、end、check、kill 和 isCompleted 方法。

在启动方法中,我想启动一个 YARN 作业,它是通过我的 BiohadoopClient 类实现的。为了使调用异步,我将 client.run() 方法包装在 Callable 中:

public void start(final Context context, final WorkflowAction action) {
...
  Callable<String> biohadoop = new Callable<String>() {
    BiohadoopClient client = new BiohadoopClient();
    client.run();
  }

  // submit callable to executor
  executor.submit(biohadoop);

  // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html
  context.setStartData(externalId, callBackUrl, callBackUrl);
...
}

这很好用,例如,当我以 fork/join 方式使用自定义操作时,操作的执行是并行运行的。

现在,问题是,Oozie 在执行此操作时仍处于 RUNNING 状态。似乎不可能将其更改为已完成状态。 Oozie 从不调用 check() 方法,end() 方法也是如此。在 Callable 中手动设置 context.setExternalStatus()、context.setExecutionData() 和 context.setEndData() 没有帮助(在 client.run() 完成之后)。我也尝试手动排队 ActionEndXCommand,但没有运气。

当我在 start() 方法中等待 Callable 完成时,状态会正确更新,但 fork/join 中的执行不再是并行的(这似乎是逻辑,因为执行等待 Callable 完成)。

How external clients notify Oozie workflow with HTTP callback 没有帮助,因为使用回调似乎没有任何改变(好吧,我可以看到它发生在日志文件中,但除此之外,什么都没有......)。此外,答案提到,SSH 操作异步运行,但我还没有发现这是如何完成的。 Callable 内部有一些包装,但最后直接调用 Callable 的 call() 方法(不提交给 Executor)。

到目前为止,我还没有找到任何如何编写异步自定义操作的示例。有人可以帮帮我吗?

谢谢

编辑

这里是initActionType()、start()、check()、end()的实现,可调用的实现可以在start()动作里面找到。

可调用对象在 start() 操作中提交给执行程序,之后调用其 shutdown() 方法 - 因此执行程序在可调用对象完成后关闭。作为下一步,调用 context.setStartData(externalId, callBackUrl, callBackUrl)。

private final AtomicBoolean finished = new AtomicBoolean(false);

public void initActionType() {
    super.initActionType();
    log.info("initActionType() invoked");
}

public void start(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    log.info("start() invoked");

    // Get parameters from Node configuration
    final String parameter = getParameters(action.getConf());

    Callable<String> biohadoop = new Callable<String>() {
        @Override
        public String call() throws Exception {
            log.info("Starting Biohadoop");

            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            BiohadoopClient client = new BiohadoopClient();
            client.run(parameter);
            log.info("Biohadoop finished");             

            finished.set(true);
            // No difference if check() is called manually
            // or if the next line is commented out
            check(context, action);

            return null;
        }
    };

    ExecutorService executor = Executors.newCachedThreadPool();
    biohadoopResult = executor.submit(biohadoop);
    executor.shutdown();

    String externalId = action.getId();
    String callBackUrl = context.getCallbackUrl("finished");
    context.setStartData(externalId, callBackUrl, callBackUrl);
}

public void check(final Context context, final WorkflowAction action)
        throws ActionExecutorException {
    // finished is an AtomicBoolean, that is set to true,
    // after Biohadoop has finished (see implementation of Callable)
    if (finished.get()) {
        log.info("check(Context, WorkflowAction) invoked - 
            Callable has finished");
        context.setExternalStatus(Status.OK.toString());
        context.setExecutionData(Status.OK.toString(), null);
    } else {
        log.info("check(Context, WorkflowAction) invoked");
        context.setExternalStatus(Status.RUNNING.toString());
    }
}

public void end(Context context, WorkflowAction action)
        throws ActionExecutorException {
    log.info("end(Context, WorkflowAction) invoked");
    context.setEndData(Status.OK, Status.OK.toString());
}

【问题讨论】:

  • 您能否展示一下您是如何实现 check() 和 initActionType() 方法以及如何在 Callable 中实现 call() 方法的?
  • @SSaikia_JtheRocker:我已经添加了实现

标签: hadoop asynchronous oozie


【解决方案1】:

有一件事-我可以看到您在提交作业后立即关闭了执行程序-executor.shutdown();。这可能是导致问题的原因。您能否尝试将此语句移至 end() 方法?

【讨论】:

  • 感谢您的想法。我试过了,但没有任何区别。 JavaDoc 对使用关闭非常清楚:Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down. This method does not wait for previously submitted tasks to complete execution 也许您有其他想法?
  • @gappc:我怀疑 AtomicBoolean 变量不知何故没有更新。请从 start() 方法中删除 check 方法语句,然后检查您在 check() 中执行的日志消息是否可见。此外,记录finished.get() 的值。如果您可以使用 JUnit 测试用例对其进行测试,那就更好了。
  • AtomicBoolean 设置正确,如果我手动调用 check() (不同的日志输出),我可以从日志文件中看到。如果我删除手动 check() 调用,则根本不会调用 check() 和 end()。对于 JUnit 测试的情况:你是对的 :) 但是在当前阶段,所需的解决方案根本不起作用,额外的努力是不值得的 - 至少在我看来。我从 oozie 邮件列表中得到了一些答案,我现在正在尝试,我会告诉你进展情况
【解决方案2】:

最后我没有找到解决问题的“真正”解决方案。对我有用的解决方案是实现一个动作,使用 Java Executor 框架并行调用 Biohadoop 实例。调用后,我等待(仍在操作中)线程完成

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-12-20
    • 2018-04-11
    • 1970-01-01
    • 2021-09-02
    • 2017-12-22
    • 2018-05-07
    • 1970-01-01
    相关资源
    最近更新 更多