【问题标题】:Reactor Framework (Flux) - java.lang.OutOfMemoryError: Direct buffer memoryReactor 框架 (Flux) - java.lang.OutOfMemoryError: 直接缓冲内存
【发布时间】:2020-10-16 22:17:22
【问题描述】:

我正在尝试使用 java reactor 框架获取所有 cf 应用程序

Flux<ApplicationSummary> appFlux= _cloudFoundryOperations.applications().list();
List<ApplicationSummary> result = appFlux.collectList().block();

此调用每 5 分钟进行一次,以获取最近部署的应用程序。

它运行了几个小时,然后它返回“java.lang.OutOfMemoryError”。堆栈跟踪如下,如何有效地进行此调用?

"stacktrace": [
  "java.lang.OutOfMemoryError: Direct buffer memory",
  "\tat java.nio.Bits.reserveMemory(Bits.java:711)",
  "\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nAssembly trace from producer [reactor.core.publisher.MonoFlatMap] :\n\treactor.core.publisher.Mono.checkpoint(Mono.java:1877)\n\torg.cloudfoundry.reactor.client.v2.organizations.ReactorOrganizations.list(ReactorOrganizations.java:202)\nError has been observed at the following site(s):\n\t|_ Mono.checkpoint ⇢ at org.cloudfoundry.reactor.client.v2.organizations.ReactorOrganizations.list(ReactorOrganizations.java:202)\n\t|_ Flux.checkpoint ⇢ at org.cloudfoundry.operations.applications.DefaultApplications.list(DefaultApplications.java:330)\nStack trace:",
  "\t\tat java.nio.Bits.reserveMemory(Bits.java:711)",
  "\t\tat java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)",
  "\t\tat java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)",
  "\t\tat io.netty.channel.unix.Buffer.allocateDirectWithNativeOrder(Buffer.java:40)",
  "\t\tat io.netty.channel.epoll.EpollEventArray.<init>(EpollEventArray.java:56)",
  "\t\tat io.netty.channel.epoll.EpollEventLoop.<init>(EpollEventLoop.java:95)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:151)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:84)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:58)",
  "\t\tat io.netty.util.concurrent.MultithreadEventExecutorGroup.<init>(MultithreadEventExecutorGroup.java:47)",
  "\t\tat io.netty.channel.MultithreadEventLoopGroup.<init>(MultithreadEventLoopGroup.java:59)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:112)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:99)",
  "\t\tat io.netty.channel.epoll.EpollEventLoopGroup.<init>(EpollEventLoopGroup.java:76)",
  "\t\tat reactor.netty.resources.DefaultLoopEpoll.newEventLoopGroup(DefaultLoopEpoll.java:64)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.cacheNativeServerLoops(DefaultLoopResources.java:252)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.cacheNativeClientLoops(DefaultLoopResources.java:267)",
  "\t\tat reactor.netty.resources.DefaultLoopResources.onClient(DefaultLoopResources.java:199)",
  "\t\tat reactor.netty.tcp.TcpClientRunOn.configure(TcpClientRunOn.java:51)",
  "\t\tat reactor.netty.tcp.TcpClientRunOn.configure(TcpClientRunOn.java:43)",
  "\t\tat reactor.netty.tcp.TcpClientBootstrap.configure(TcpClientBootstrap.java:39)",
  "\t\tat reactor.netty.tcp.TcpClientBootstrap.configure(TcpClientBootstrap.java:39)",
  "\t\tat reactor.netty.tcp.TcpClientSecure.configure(TcpClientSecure.java:53)",
  "\t\tat reactor.netty.tcp.TcpClientDoOn.configure(TcpClientDoOn.java:48)",
  "\t\tat reactor.netty.tcp.TcpClient.connect(TcpClient.java:196)",
  "\t\tat org.cloudfoundry.reactor.util.DefaultSslCertificateTruster.getUntrustedCertificates(DefaultSslCertificateTruster.java:166)",
  "\t\tat org.cloudfoundry.reactor.util.DefaultSslCertificateTruster.trust(DefaultSslCertificateTruster.java:91)",
  "\t\tat org.cloudfoundry.reactor._DefaultConnectionContext.lambda$trust$1(_DefaultConnectionContext.java:155)",
  "\t\tat java.util.Optional.map(Optional.java:215)",
  "\t\tat org.cloudfoundry.reactor._DefaultConnectionContext.trust(_DefaultConnectionContext.java:155)",
  "\t\tat org.cloudfoundry.reactor.DefaultConnectionContext.trust(DefaultConnectionContext.java:23)",
  "\t\tat org.cloudfoundry.reactor.AbstractRootProvider.trust(AbstractRootProvider.java:136)",
  "\t\tat org.cloudfoundry.reactor.AbstractRootProvider.lambda$getRoot$3(AbstractRootProvider.java:70)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.subscribeNextTrigger(MonoDelayUntil.java:210)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onNext(MonoDelayUntil.java:169)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)",
  "\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)",
  "\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2125)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:68)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)",
  "\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4218)",
  "\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:97)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:1994)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:134)",
  "\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:165)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1802)",
  "\t\tat reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.signalCached(MonoCacheTime.java:323)",
  "\t\tat reactor.core.publisher.MonoCacheTime$CoordinatorSubscriber.onError(MonoCacheTime.java:346)",
  "\t\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:126)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onError(MonoDelayUntil.java:175)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.subscribeNextTrigger(MonoDelayUntil.java:213)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onNext(MonoDelayUntil.java:169)",
  "\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317)",
  "\t\tat reactor.core.publisher.MonoDelayUntil$DelayUntilCoordinator.onSubscribe(MonoDelayUntil.java:159)",
  "\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8311)",
  "\t\tat reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)",
  "\t\tat reactor.core.publisher.MonoFlatMapMany.subscribeOrReturn(MonoFlatMapMany.java:49)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57)",
  "\t\tat reactor.core.publisher.MonoCacheTime.subscribeOrReturn(MonoCacheTime.java:132)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4203)",
  "\t\tat reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)",
  "\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4218)",
  "\t\tat reactor.core.publisher.Mono.block(Mono.java:1677)",
  "\t\tat com.sap.crun.healthapi.cf.CfApi.getApplications(CfApi.java:124)",
  "\t\tat com.sap.crun.healthapi.cf.CfApi.getCloudFoundryMetrics(CfApi.java:92)",
  "\t\tat com.sap.crun.healthapi.cf.GetCloudFoundryMetricsCommand.getCloudFoundryMetrics(GetCloudFoundryMetricsCommand.java:35)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthServiceClient.readCloudFoundryMetrics(HealthServiceClient.java:170)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.FetchHealthDataCommand.processLandscapeHealthData(FetchHealthDataCommand.java:51)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporter.run(HealthFactsImporter.java:39)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob.lambda$run$0(HealthFactsImporterAsJob.java:46)",
  "\t\tat com.sap.cds.services.impl.runtime.CdsRuntimeImpl.runInRequestContext(CdsRuntimeImpl.java:154)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob.run(HealthFactsImporterAsJob.java:44)",
  "\t\tat com.sap.crun.healthapi.services.healthdata.HealthFactsImporterAsJob$$FastClassBySpringCGLIB$$211f9f83.invoke(<generated>)",
  "\t\tat org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)",
  "\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)",
  "\t\tat org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)",
  "\t\tat org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)",
  "\t\tat org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)",
  "\t\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)",
  "\t\tat com.sap.dwc.util.headers.DwcContextTaskDecorator.lambda$decorate$0(DwcContextTaskDecorator.java:33)",
  "\t\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)",
  "\t\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)",
  "\t\tat java.lang.Thread.run(Thread.java:836)"
]

有人可以帮忙吗?

【问题讨论】:

  • 你对收集到的应用列表做了什么?您需要将它们全部保存在内存中吗?
  • 我不需要将它保存在内存中。收到申请表后,我会坚持自己的表。 .

标签: project-reactor flux reactor


【解决方案1】:
Flux<ApplicationSummary> appFlux= _cloudFoundryOperations.applications().list();
List<ApplicationSummary> result = appFlux.collectList().block();

在不直接了解更多上下文的情况下,这看起来很像它会得到一个所有应用程序的列表。这与您所说的要求相反:

此调用每 5 分钟进行一次,以获取最近部署的应用程序。

...这可能也是您内存不足的原因,因为列表可能会变得太大而无法一次全部放入内存中。如果您真正每 5 分钟需要一次完整的列表并且它无法放入内存中,那么您无能为力 - 您要么需要重新构建应用程序,要么添加更多内存(尽管在某一点之后,后者可能不是一个可行的选择!)

您没有详细说明您要在此处实现的具体目标,但有两个选项可能很适合您的用例:

  • 理想情况下,您根本不会阻塞 - 但如果您只需要每 5 分钟将 5 分钟的值作为列表处理一次,那么您可以简单地将通量窗口化为该时间跨度,然后按照您的意愿处理结果:
_cloudFoundryOperations.applications().list()
        .window(Duration.ofMinutes(5))
        .flatMap(Flux::collectList)
        .doOnNext(list -> {
            //process list
        })
  • 作为“最少更改”的方法如果您想像现在这样每次都阻塞,但又想确保不会耗尽内存,您可以使用具有某种合理值的takeLast()获取最后一个 n 元素(如果您对部署的最后一个 n 应用程序感到满意,这将起作用):
_cloudFoundryOperations.applications().list()
        .takeLast(10_000)
        .collectList().block()

...但是,请记住,阻塞通常会破坏使用 reactor 的目的(除非您专门将其作为正在进行的迁移的一部分或因为没有其他选择) - 所以我最好的建议是尝试设计将所有阻塞调用完全排除在您的系统之外。

【讨论】:

  • 你好迈克尔,我将使用第一种方法。在这种情况下,当我使用 window() 时,我希望接收最近 5 分钟内部署的最新应用程序。除此之外,我遇到了 Mono block(), response = _cloudFoundryOperations.getCloudFoundryClient().applicationsV2() .statistics(request).block(); 的同样问题。我们没有用于单声道的 window()。有什么想法吗?
  • 在这种情况下,任何时候您收到OutOfMemoryError 都可能是因为您尝试收集(或阻止)的值太大。在 Flux 中更容易解决,因为您可以只收集较小的块,或者单独收集 - 如果有一个 Mono 值导致该问题,则不太清楚,因为没有一种固有的明显方法可以将其拆分。你要么“得到”Mono,要么没有。我唯一建议的是查看 API,看看是否有更好的方法来获取统计信息。
猜你喜欢
  • 2017-02-20
  • 1970-01-01
  • 2017-08-11
  • 1970-01-01
  • 1970-01-01
  • 2015-12-03
  • 2019-06-23
  • 2020-04-25
  • 2017-11-21
相关资源
最近更新 更多