【问题标题】:Reading from Infinispan cache goes into infinite loop从 Infinispan 缓存读取进入无限循环
【发布时间】:2019-09-23 05:54:29
【问题描述】:

我正在使用 Nifi 创建数据流管道,其中我使用 Infinispan 缓存服务器但是当我使用带有 Groovy 脚本的 executescript 时,它会进入无限循环并打开许多套接字连接。我试图关闭它,但它仍然打开了许多连接,然后它抛出了

java.net.SocketException: No buffer space available (maximum connections reached?): connect

通过以下链接,我更改了注册表 https://support.pitneybowes.com/VFP06_KnowledgeWithSidebarTroubleshoot?id=kA280000000PEE1CAO&popup=false&lang=en_US

然后用netstat -n检查打开的连接我打开了65534,因为上面的设置。

下面是从 Infinispan 缓存中读取的 groovy 脚本

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;

def cacheName = "mycache"

def configuration = new ConfigurationBuilder()
.addServer().host("localhost").port(11322).build();

def cacheManager = new RemoteCacheManager(configuration)

RemoteCache cacheA = cacheManager.getCache(cacheName)

flowFile = session.get()
if(!flowFile) return
key = flowFile.getAttribute('key')
id = flowFile.getAttribute('id')
jsonFromCache = cacheA.get(key + "_" + id);
if(cacheA != null) {
cacheA.stop()
}
if(cacheManager != null) {
cacheManager.stop()
}

flowFile = session.write(flowFile, {outputStream ->
  outputStream.write(jsonFromCache.getBytes(StandardCharsets.UTF_8))
} as OutputStreamCallback)
session.transfer(flowFile, REL_SUCCESS)

【问题讨论】:

  • 你试过finispan吗? (对不起......我无法抗拒。)
  • @user2478398 我的问题有什么问题吗?
  • 绝对不是。我只是不成熟(因此很抱歉)。如果我知道这里的任何图书馆足以提供帮助,我会知道的,但恐怕我不知道。
  • 在从会话中获取文件之前,您正在打开与缓存的连接。您正在打开连接,在这一行只退出脚本而不关闭它:if(!flowFile) return
  • 顺便说一句,可以处理处理器启动和停止以使每个处理器建立一次缓存连接。

标签: java groovy apache-nifi infinispan


【解决方案1】:

在从会话中获取文件之前,您正在打开与缓存的连接。

因此,您正在打开连接,并且在以下行中只是退出脚本而不关闭它:

if(!flowFile) return

还有一点: 您可以使用 ExecuteGroovyScript 处理器。然后可以管理处理器的启动和停止。您可以在此处找到示例:https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-groovyx-nar/1.9.2/org.apache.nifi.processors.groovyx.ExecuteGroovyScript/additionalDetails.html

import org.apache.nifi.processor.ProcessContext
import java.util.concurrent.atomic.AtomicLong

class Const{
  static Date startTime = null;
  static AtomicLong triggerCount = null;
}

static onStart(ProcessContext context){
  Const.startTime = new Date()
  Const.triggerCount = new AtomicLong(0)
  println "onStart $context ${Const.startTime}"
}

static onStop(ProcessContext context){
  def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000
  println "onStop $context executed ${ Const.triggerCount } times during ${ alive } seconds"
}

def flowFile = session.get()
if(!flowFile)return
flowFile.'trigger.count' = Const.triggerCount.incrementAndGet()
REL_SUCCESS << flowFile

【讨论】:

    猜你喜欢
    • 2020-01-17
    • 1970-01-01
    • 2013-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-25
    • 1970-01-01
    相关资源
    最近更新 更多