【发布时间】:2019-03-25 22:44:19
【问题描述】:
我正在使用 Apache Camel File 组件从本地目录读取并上传到 AWS S3 存储桶。这条路线在过去完美无缺,但现在某些文件存在问题。
在我到目前为止的调试调查中(3天的痛苦),我发现路由到达DelegateSyncProcessor类并且没有在交换上设置任何异常(参考见前面提到的类的相关代码以下)。考虑到没有任何例外,我似乎无法弄清楚为什么 onCompletion() 方法永远不会针对特定文件触发。有问题的文件没有/没有任何例外;但是,在所有逻辑执行后(包括最终将消息传输到.to() 端点),文件和锁仍然存在,这表明存在一些内部 Camel 问题。我怀疑这是因为除了文件和锁删除之外,路由在每个方面都表现正常。
为 Camel 启用调试日志后,我很沮丧地看到在路由执行期间没有与错误相关的日志。我很想听听任何关于幕后可能发生的事情的建议。
需要注意的一些额外事项:
- 我正在运行骆驼 2.16.0
- 通过骆驼代码进行调试时没有发现容易检测到的问题。
-
.process()部分中的逻辑一直运行(发生了一些异常,但处理得当。
更新:
为骆驼启用调试日志进行更多调试没有产生任何结果。我只发现我的.process() 代码中的逻辑是问题所在。当代码执行时间过长(即使没有异常),文件删除也会失败。
更新 2:
我发现了骆驼路线实际上分崩离析的地方。 CamelEventLogger.java 类在尝试“logEvent”时完全失败。下面添加了该类的相关代码。当代码到达matcher.find()时,超时。
MyRouteClass.java:
from(importProcessingEndpoint)
.convertBodyTo(byte[].class)
.process((exchange) -> {
// some logic here
})
.to(outgoingEndpoint)
.threads(MAX_NUMBER_OF_CAMEL_THREADS)
.process((exchange) -> {
// some logic here
})
.log("Finished processing import file.")
.to(outgoingEndpoint);
DelegateSyncProcessor.java:
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// force calling the sync method
try {
processor.process(exchange);
} catch (Throwable e) {
// must catch throwable so we catch all
exchange.setException(e);
} finally {
// we are bridging a sync processor as async so callback with true
callback.done(true);
}
return true;
}
CamelEventLogger.java
private void logEvent(final String label, final Exchange exchange, final Endpoint endpoint,
final long elapsedTime, final boolean logTID) {
Matcher matcher = PATTERN.matcher(extractMessage(exchange));
if (matcher.find()) {
//CHECKSTYLE:OFF
String evtType = matcher.group(1);
String evtName = matcher.group(2);
String guid = matcher.group(3);
String tid = matcher.group(4);
//CHECKSTYLE:ON
if (tid == null || !logTID) {
tid = "";
} else {
tid = "intuit_tid=" + tid;
}
log.info(LOG_FORMAT, label, exchange.getExchangeId(), guid, evtName, evtType, endpoint, elapsedTime, tid);
} else { // the message is not parseable, fall back to minimum log entry
log.info("Event {} id={} {} elapsedTimeMs={}", label, exchange.getExchangeId(), endpoint, elapsedTime);
}
}
【问题讨论】:
-
1.您是否发现有问题的文件有任何可疑之处?它们是不是特别大,它们是否都具有 ok 文件所没有的一些特征?和 2. 您是否尝试过调试到 DelegateSyncProcessor 类本身?和 3. 你跑的是什么版本的骆驼?
-
对于 S&G,您能否添加一个
onException()处理程序,并在它命中时中断处理器内部? -
@RoddyoftheFrozenPeas,我添加了更多信息来解决您的问题。我还想指出,我已经使用调试器逐步完成,但没有发现任何相关内容。
-
@NotaJD,我不确定我是否完全理解您要说的内容。我尝试使用在路由顶部定义的
onException()来查看是否抛出了任何异常并且调试器从未中断(在我在添加的 onException 代码上设置断点之后)。这让我更加怀疑根本没有异常(没有记录,调试时也没有发现)。 -
@ChadVanDeHey 这就是我的意思。我绝对感受到你的痛苦,骆驼对我来说也是一个非常不愉快的原因。这听起来很荒谬,但值得一试:当您移除处理器并让它通过路由运行时,文件是否被移动并锁定文件被删除?
标签: java apache-camel