【问题标题】:java.lang.IllegalStateException: Connection pool shut down. How to handle a connection pool exception inside multithread application?java.lang.IllegalStateException:连接池关闭。如何处理多线程应用程序中的连接池异常?
【发布时间】:2020-08-25 09:36:49
【问题描述】:

我的应用程序正在创建多线程以从此处的 SQS 读取消息


new Thread(() -> {
            while (true) {
                readMessages();
            }
        });

readMessage 方法具有这样的功能

 public void readMessages() {
       ........

       Object messages = new ArrayList();
       try {
          slingshotMessage = (new ReceiveMessageRequest()).withQueueUrl(this.queueUrl)
                            .withWaitTimeSeconds(this.subProps.getWaitTimeSeconds())
                          .withVisibilityTimeout(this.subProps.getVisibilityTimeoutSeconds())
                         .withMaxNumberOfMessages(this.subProps.getMaxNumberMessages());
           messages = this.sqs.receiveMessage(slingshotMessage).getMessages();
           
        } catch (Exception var6) {
            log.error("An error occurred while reading messages for subscriber: '" + this.subProps.getSubscriberName() + "' queueUrl: '" + this.queueUrl + "'", var6);
        }

}

我看到异常在 this.sqs.receiveMessage 内部 read message() 函数中引发。我在这里捕获了异常,但是当抛出错误时,日志堆积起来,重复的异常说


java.lang.IllegalStateException: Connection pool shut down
    at org.apache.http.util.Asserts.check(Asserts.java:34) ~[httpcore-4.4.12.jar:4.4.12]
    at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:196) ~[httpcore-4.4.12.jar:4.4.12]
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:268) ~[httpclient-4.5.10.jar:4.5.10]
    at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.conn.$Proxy37.requestConnection(Unknown Source) ~[na:na]
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.10.jar:4.5.10]
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[httpclient-4.5.10.jar:4.5.10]
    at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1256) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1072) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:745) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:719) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:669) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:651) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:515) ~[aws-java-sdk-core-1.11.408.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2147) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2116) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2105) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1559) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1530) ~[aws-java-sdk-sqs-1.11.415.jar:na]
    at com.lmig.global.reuse.slingshot.subscriber.aws.AwsSubscriber.readMessages(AwsSubscriber.java:63) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
    at com.lmig.global.reuse.slingshot.subscriber.Subscriber.lambda$newSubThread$0(Subscriber.java:61) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]```

所以我的问题是

  1. 有没有办法解决连接池异常?
  2. 由于这是在 newthread lambda 中,它会继续创建线程,有没有一种方法可以让我以所需的堆栈跟踪静默退出,而不是在日志中堆积相同的错误。

【问题讨论】:

    标签: java multithreading connection-pooling amazon-sqs


    【解决方案1】:

    首先,不要为每条消息都创建新线程,这是个坏主意。 创建一个线程来不断轮询您的 SQS,然后使用工作窃取线程池或其他一些池将检索到的数据发送到另一个线程以进行进一步处理。

    这是伪代码,

    public final class AsyncThreadUtils {
    
        private static final ExecutorService WORKER_SERVICE = Executors.newWorkStealingPool();
    
        private AsyncThreadUtils() {
        }
    
        public static void addTask(Runnable asyncThread) {
            WORKER_SERVICE.submit(asyncThread);
        }
    
        public static void stopAllThreads() throws InterruptedException {
            if (!WORKER_SERVICE.isShutdown() || !WORKER_SERVICE.isTerminated()) {
                WORKER_SERVICE.awaitTermination(0, TimeUnit.NANOSECONDS);
                WORKER_SERVICE.shutdownNow();
            }
        }
    }        
    class DataConsumer implements Runnable {
            while(true) {
                // Get the message from SQS
                // If data received then give it to another thread for processing
                AsyncThreadUtils.addTask(data -> { 
                       // Process goes here.
    });
                // sleep for some time like 400ms
            }
        }
         
            // Initiate above thread during the bootup of your program.
            Thread sqsDataConsumer = new Thread(new DataConsumer())
           sqsDataConsumer.start();
    

    在你的java进程关闭之前调用线程池的shutdown方法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-10-23
      • 2018-12-04
      • 2011-03-09
      • 1970-01-01
      • 2013-05-23
      • 1970-01-01
      • 1970-01-01
      • 2019-07-07
      相关资源
      最近更新 更多