【问题标题】:How to synchronize between executors of an ExecutorService如何在 ExecutorService 的 executor 之间进行同步
【发布时间】:2018-08-14 20:26:16
【问题描述】:

我有一个客户端套接字列表,通常大小约为 2000。这些客户端是动态的,它们来来去去。

我有一个ExecutorService,它有一个由 32 个线程组成的固定线程池来处理这些线程。这个执行器服务负责解码和发送要发送给这 2000 个客户端的消息。

我想防止执行器服务的两个(或更多)线程同时处理同一个客户端。

一种方法可能是引入另一个簿记线程(所以我最终得到了 32 + 1 个线程),它负责在对应于同一客户端的上一条消息完成时调用 ExecutorService.submit(mesage)。但我不确定这是否会引入瓶颈,这意味着这个新引入的 bookkeeping 线程 无法跟上提交消息的速度。

理想情况下,我不想提前为一组客户端预分配线程,因为消息负载在客户端之间分布不均。也是事先不知道的。

有什么方法可以解决这个问题?它们是由java.util.concurrent 功能提供的吗?

更新

这是一个简短的总结,因为 cmets 指出存在一些误解:

  • 我不希望每个客户端有一个线程,因为我最终会得到 2000 个线程。

  • 理想情况下,我不想为一组客户端预先分配一个线程,因为消息速率不是在所有客户端之间均匀分布,而且事先不知道。

  • 必须保留消息顺序。

  • 我认为线程 A 正在等待线程 B 并不好,因为 B 已经在向同一个客户端发送消息。换句话说,始终只有一个线程在处理一个客户端。

【问题讨论】:

  • “我想防止执行器服务的两个(或多个)线程同时处理同一个客户端。”不明白这个要求。您是否会同时为同一个客户提供 2 个工作?为什么会有这样的问题?
  • 我认为他想序列化来自同一来源的处理请求。不知道为什么
  • 客户端socket只能被一个线程同时使用,不是线程安全的。此外,客户端的消息必须按正确的顺序发送。
  • 不是序列化线程,而是序列化套接字写入。
  • 这不是最理想的吗,因为许多线程可能正在等待保护一个线程持有的套接字的锁?另外,我们如何在这种方法中保留消息顺序?

标签: java multithreading java.util.concurrent


【解决方案1】:

当线程 (A) 开始处理消息 (#1) 时,它需要向共享管理器对象注册客户端 ID。每个注册的客户都有一个队列。

当另一个线程 (B) 开始为同一个客户端处理消息 (#2) 时,注册将检测到线程 A 已经在处理,并将消息 #2 添加到客户端的队列中。然后线程 B 将停止并处理下一条消息。

当线程 A 处理完消息 #1 后,它将尝试取消注册,但由于消息 #2 是队列,线程 A 将改为开始处理该消息。之后,当它再次尝试注销时,队列中没有消息,线程将停止并处理下一条消息。

正确同步访问取决于管理器代码,因此第二条消息要么由线程 B 处理,要么移交给线程 A,而不会丢失。

上述逻辑确保线程 B 不会等待线程 A,即没有空闲时间,并且消息 #2 会尽快处理,即以最小的延迟处理,而不会为同一客户端处理两条消息时间。

每个客户端的消息顺序被保留。在全局范围内,当然不会保留消息顺序,因为消息#2 的处理有延迟。

注意,每个线程只有一个队列,所以只有 32 个队列,而且只有“重复”消息是队列,所以所有队列通常都保持为空。


更新

示例:为了在此处进行标识,消息被命名为clientId.messageId,其中messageId 是全局的。

消息按此顺序提交给 Executor(3 个线程):

1.1, 2.2, 1.3, 2.4, 3.5, 1.6

  1. 线程 A 获取 1.1 并开始处理。

  2. 线程 B 获取 2.2 并开始处理。

  3. 线程 C 获取 1.3,将其添加到线程 A 的队列中,然后返回。

  4. 线程 C 获取 2.4,将其添加到线程 B 的队列中,然后返回。

  5. 线程 C 获取 3.5 并开始处理。

  6. 线程 A 处理完消息 1.1 并开始处理 1.3

  7. 线程 C 处理完消息 3.5 并返回。

  8. 线程 C 获取 1.6,将其添加到线程 A 的队列中,然后返回。
    线程 C 现在处于空闲状态。

  9. 线程 B 处理完消息 2.2 并开始处理 2.4

  10. 线程 A 处理完消息 1.3 并开始处理 1.6

  11. 线程 B 处理完消息 2.4 并返回。
    线程 B 现在处于空闲状态。

  12. 线程 A 处理完消息 1.6 并返回。
    线程 A 现在处于空闲状态。

【讨论】:

  • 我认为这种方法有一个缺点:如果消息率很高,我们最终可能会遇到这样的情况,即 32 个线程一遍又一遍地优先考虑相同的 32 个客户端(就像在每个消息,另一条消息已准备好发送),这意味着其他 2000 - 32 个客户端实际上正在挨饿,如果这有意义的话。
  • @IsaacVero 这就是它应该的方式,因为原始的全局消息顺序表明这些消息应该已经被处理,所以它们得到优先级。如果客户端无法跟上(即客户端的注册队列中的消息过多),可以添加额外的逻辑来开始丢弃消息,但如果需要,则需要由 OP 定义规则。
  • 让我改写一下,也许我理解错了。线程A 正在向客户端1 发送消息。在此期间,添加了两条消息:一条消息给客户端2,另一条消息给客户端1。如果我理解正确,您的方法是线程A 在第一条消息之后立即处理1 的第二条消息,而不是客户端2 的消息。现在假设消息率很高。我相信这意味着我们一遍又一遍地优先考虑相同的客户,而让其他客户挨饿?
  • 另外,我知道有32个线程和32个队列。所以每个线程都在做while(true) { process(queue.take()); }?那么决定应该在哪个队列中添加一条新消息?
  • @IsaacVero 没有。有 33 个队列:Executor 队列和 32 个“延迟消息”队列,每个线程一个。消息被提交给执行器,执行器将它们委托给线程以在线程可用时进行处理。如果执行者遵守提交顺序,这将确保保留全局消息顺序。如果线程拾取“重复”消息,则将该消息添加到当前为给定客户端处理消息的线程的“延迟消息”队列中。 那个线程会在处理完第一条消息后立即处理延迟的消息。
【解决方案2】:

让每个线程服务于它自己的队列。给插座编号。将每个请求放入队列[socket num % num of threads]。

这将确保来自特定套接字的请求按顺序处理。

很遗憾,您不会以这种方式获得负载平衡。

或者,使用并发哈希映射来存储正在服务的套接字。如果一个线程为当前正在处理的套接字请求提供服务,只需将请求放回队列中即可。

【讨论】:

  • 为什么投反对票,你能解释为什么这个答案不正确吗?
  • 每个客户端没有线程,消息顺序被保留并且没有显式阻塞。
  • 我没有否决您的问题(我对此没有足够的声誉),但我很难理解您在最后一句话中的意思。也许你可以详细说明?
  • @IsaacVero 我想象中的样子:gist.github.com/zapl/62d6af5e7dde5cc209c058313ca7ed16
  • @IsaacVero 不这么认为。 Downvote 最有可能得到更短、看起来不太好看的答案(最初)。如果只有 1 个客户端处于活动状态,由于您的订购要求,活动线程不能超过 1 个。发生在任一解决方案中。当涉及到几个客户端之间的不均匀负载时,另一个更具动态性,但我怀疑这实际上是一个现实世界的问题。 2k 个客户端之间的分布应该相当均匀地超过 32 个线程。这也是 netty 所做的,线程和客户端之间的固定绑定,它们是 java 的#1 网络框架。
【解决方案3】:

您希望按顺序处理每个客户端的消息,同时您不想为每个客户端分配单独的线程。这是使用Actor model 的确切用例。 Actor 就像轻量级线程。它们不像通常的线程那么强大,但非常适合像您这样的可重复任务。 如果你觉得 Google 发现的 java actor libraries 太重了,你可以使用我的 Github 仓库中的 most compact actor implementation,或者查看我的异步库 df4j 中包含的扩展 actor 实现。

【讨论】:

    猜你喜欢
    • 2011-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-05
    • 1970-01-01
    • 2018-12-04
    • 2021-04-25
    相关资源
    最近更新 更多