【问题标题】:Inconsistent results Java Multi-threading with REST calls不一致的结果 Java 多线程与 REST 调用
【发布时间】:2018-12-19 18:42:18
【问题描述】:

目前我正在使用他们内置的 REST API 从服务中提取大量数据。目前,服务返回 JSON 格式文件大约需要 600 毫秒,我需要返回 495 个 JSON 格式文件。

作为我最初的 POC,我只是在主线程中线性调用它们(不希望程序在所有查询都完成之前继续前进),这大约需要 300 秒才能完成。现在我已经展示了 POC,我需要对其进行相当多的优化,因为 5 分钟的查询不是很理想。目前我正在做的是使用带有固定线程池的 Executor Service 并向该服务添加 495 个任务并调用它们。

我唯一的问题是,现在我得到了错误的数据值。从逻辑上讲,什么都不应该改变,查询一次只返回 50 个元素,所以我所做的只是改变起点(我已经检查过,并且 URL 中没有重叠)。出于某种原因,我的结果丢失了,并且我有现有结果的副本。处理 JSON 的代码没有改变,唯一改变的是获取结果的方法。

我最初认为我在遍历线程时遇到了一个问题,它不是原子的,但是在我得到 JSON 之后真正发生的事情是我正在解析它,创建一个 Requirement 对象,然后将它添加到一个 Set .由于 Set 从未重新定义,只是添加了我的印象,它是 Atomic 不会有所作为(但是我可能 100% 错了)。

代码的第一个 sn-p 下面是我在主线程上线性运行 ti 的方式,而第二个 sn-p 是我的包含多线程的版本。我确实知道这有点混乱,这是我目前的 POC,用于确定多线程的速度有多快(目前从 ~300 秒到 ~45 秒),以及它是否值得应用于我程序中的其他调用。

我只需要弄清楚为什么在使用多个线程时值会重复和丢失(线性调用时没有重复或丢失的值)。 URL 决定了起点,大小永远不会改变或任何东西,我不知道为什么我有 2000 个短的要求和 224 个重复的条目,根本不应该有任何。

唯一改变的是执行器服务和我获得起始点的循环(也就是我只计算我需要多少个循环,而不是依赖于返回的当前位置)。 creatRequirement(obj) 函数所做的只是进一步解析 JSON 文件,并使用从 JSON 传递到构造函数的数据创建一个需求对象。

private void obtainAllRequirements() {
    int startingLocation = 0;
    boolean continueQueries = true;
    String output = null;
    do {
        output = executeRESTCall(baseUrl + "/abstractitems?maxResults=50&itemType=43&startAt=" + startingLocation);
        JSONObject obj = new JSONObject(output);
        if ((obj.getJSONObject("meta").getJSONObject("pageInfo").getInt("totalResults") - startingLocation) <= 50) {
            continueQueries = false;
        }
        createRequirements(obj);
        startingLocation += 50;

    } while (continueQueries);
}


private void obtainAllRequirements() {
    String output = executeRESTCall(baseUrl + "/abstractitems?itemType=43&startAt=0");
    int totalResults = new JSONObject(output).getJSONObject("meta").getJSONObject("pageInfo").getInt("totalResults");
    ExecutorService service = Executors.newFixedThreadPool(THREADS);
    List<Callable<Void>> tasks = new ArrayList<>();
    for (int i = 0; i < Math.ceil(totalResults/MAX_RESULTS); i++){
        final int iteration = i;
        tasks.add(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                System.out.println(baseUrl + "/abstractitems?maxResults="+MAX_RESULTS+"&itemType=43&startAt=" + (iteration*MAX_RESULTS));
                String o = executeRESTCall(baseUrl + "/abstractitems?maxResults="+MAX_RESULTS+"&itemType=43&startAt=" + (iteration*MAX_RESULTS));
                JSONObject obj = new JSONObject(o);
                createRequirements(obj);
                return null;
            }
        });
    }
    try {
        service.invokeAll(tasks);
        service.shutdown();
    }catch (InterruptedException e){
        e.printStackTrace();
    }

}

编辑:这是创建需求内部发生的情况,需求的构造函数只获取 JSON 数据并将值分配给特定的私有变量成员。

private void createRequirements(JSONObject json) {
    JSONArray dataArray = json.getJSONArray("data"); // Gets the data array in the JSON file
    for (int i = 0; i < dataArray.length(); i++) {
        JSONObject req = dataArray.getJSONObject(i);
        Requirement requirement = new Requirement(req);
        if (!requirement.INVALID_PROJECT) {
            requirements.add(requirement);
        }
    }
}

编辑:将需求集添加为 ConcurrentSet 但没有更改。

this.requirements = new ConcurrentHashMap&lt;&gt;().newKeySet();

编辑:添加了执行 REST 调用

public String executeRESTCall(String urlValue) {
    try {
        URL url = new URL(urlValue);
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        conn.setRequestMethod("GET");
        conn.setRequestProperty("Accept", "application/json");
        String encoding = Base64.getEncoder()
                .encodeToString((Credentials.XXX + ":" + Credentials.XXX).getBytes("UTF-8"));
        conn.setRequestProperty("Authorization", "Basic " + encoding);
        if (conn.getResponseCode() != 200) {
            throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
        }

        BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
        return br.readLine();
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "";
}

【问题讨论】:

  • “由于 Set 从未重新定义,只是添加了......” 线程安全不仅仅与对象分配安全有关。对象的操作同样重要。所以,不,你不能期望从多个线程向任何Set 添加数据,并让它可靠地工作。所以绝对不要那样做,从并发集实现开始。你能展示一下createRequirements里面发生了什么吗?
  • @GPI 我添加了 createRequirements 的代码。您能否更详细地解释一下为什么它不能可靠地工作。数据值不应该重叠,因为每个 REST 调用都应该返回唯一的信息,并且它只是添加到集合中,因为我不关心顺序我不关心是否最后添加第一个查询和签证 verca。 ConcurrentSet 不是一个有效的对象类型,或者你的意思是自己实现一个(使用这个)? javarevisited.blogspot.com/2017/08/…
  • @GPI 我实现了由 ConcurrentHashMap 支持的 Set。所有代码仍然有效并且正确生成(在我的错误之外),因此它接受了更改。但是我仍然遇到重复查询结果的问题。这很奇怪,因为我正在执行多个其他查询,以完全相同的方式线程化(但少于 10 次调用),根本没有问题,只是要求有 495 次以上的调用才是问题。
  • @97WaterPolo 如果线程数影响您的结果,那么您不能信任您的线程,即使它看起来使用的线程数较少。使用适当的多线程,线程数不应影响您的结果;只有速度会受到线程数的影响,您应该调整线程数以获得最佳性能。
  • 你能展示一下executeRESTCall函数的主体吗?它是线程安全的吗?因为其他一切乍一看都还可以。

标签: java json multithreading rest optimization


【解决方案1】:

不要忘记异常

此外,您可能还想等待任务真正完成。

您需要更好的异常处理,但现在请用这个(使用许多线程)进行测试并发布输出:

private void obtainAllRequirements() {

    String output = executeRESTCall(baseUrl + "/abstractitems?itemType=43&startAt=0");
    int totalResults = new JSONObject(output).getJSONObject("meta").getJSONObject("pageInfo")
            .getInt("totalResults");

    ExecutorService service = Executors.newFixedThreadPool(THREADS);
    List<Callable<Void>> tasks = new ArrayList<>();
    for (int i = 0; i < Math.ceil(totalResults / MAX_RESULTS); i++) {
        final int iteration = i;
        tasks.add(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
                try
                    final String request = baseUrl + "/abstractitems?maxResults=" + MAX_RESULTS
                        + "&itemType=43&startAt=" + (iteration * MAX_RESULTS);
                    // hash codes to tie the request and responses together,
                    // since multithreading will have them printing interleaved
                    System.out.println(hashCode() + ":request: " + request);
                    String response = executeRESTCall(request);
                    System.out.println(hashCode() + ":response: " + response);
                    JSONObject obj = new JSONObject(response);
                    createRequirements(obj);
                    return null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    try {
        service.invokeAll(tasks);

        service.shutdown();
        // you might want to await termination ?
        service.awaitTermination(1, TimeUnit.MINUTES);

        // catch all exceptions ?
        // you'll need some better error handling
    } catch (Exception e) {
        e.printStackTrace();
    }
}

【讨论】:

  • 运行给定代码时,invokeAll() 出现以下错误; bitbucket.org/snippets/97WaterPolo/jer78q
  • @97WaterPolo 我猜,你需要 shutdownthen awaitTermination。异常已经在invokeAll 中发生,即执行者拒绝了那么多工作。您可能需要增加其workQueue 的大小,请参阅here 以了解拒绝原因。在任何情况下:确保所有异常至少都记录在某处。
  • @maaartinus 感谢您的帮助!是的,我的主要目的是不丢失异常并正确调试多线程代码。
【解决方案2】:

我所做的只是将固定线程的数量从 400 减少到 10。不知道为什么我选择了 400,我没有考虑到这一点,并且认为 JVM 会处理线程,而我不会真的不得不担心它。将其减少到 10 可以解决我遇到的数据丢失和重复的问题,我不知道为什么会出现这种情况,我很想了解原因。

【讨论】:

  • 如果线程数影响您的结果,那么您不能信任您的线程,即使它看起来使用较少的线程。使用适当的多线程,线程数不应影响您的结果;只有速度会受到线程数的影响,您应该调整线程数以获得最佳性能。
  • @xtratic 那么我有这个数据重复/丢失错误还有其他可能的原因吗?调用不应两次返回相同的数据,返回的结果取决于第 3 方应用程序,该应用程序在查询时具有一致的值。
  • 您可以验证来自服务器的响应,因为您现在记录它们。如果响应是有效的,那么它必须在客户端,如果串行执行工作正常但并发不是,那么很可能是线程代码导致了问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多