【问题标题】:Synchronized List - add multiple threads and remove one thread同步列表 - 添加多个线程并删除一个线程
【发布时间】:2022-01-31 12:34:58
【问题描述】:

我有这个代码:

private static boolean flagStop = false;

//synchronized list
private static List<Object> queue = Collections.synchronizedList(new ArrayList<>());

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
    while (!flagStop) {
        if (!queue.isEmpty()) {
            int size = queue.size();
            List<Object> copyList = queue.subList(0, size);
            for (int i = 0; i < size; i++) {
                queue.remove(0);
            }
            copyList.forEach(System.out::println);
        }
        try {
            Thread.sleep(1_000);
        }
        catch (Exception ignored) { }
    }
});

static {
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj) {
    queue.add(obj);
}

在这个类中,我有一个同步列表,用于添加来自不同线程的元素,并在另一个线程中打印和删除每个元素。

我的意思是不要停止addElement 处的线程并安全地处理(打印和删除)元素。

我在这里发帖询问我的代码是否是线程安全的或如何检查它。

【问题讨论】:

  • 您可以将addElement 方法设为synchronized,因为该方法将被多个线程访问,因此一次只有一个线程可以访问该方法
  • @YashRami,这是我尽量避免的。不停止add中的其他线程
  • @YashRami,是的。有多个线程同时访问(每秒多次)addElement
  • subList()不会创建副本,只是原始列表的视图。如果原始列表在结构上发生了变化(例如删除了元素),则子列表的语义是未定义的。另请注意,您可以使用clear() 删除列表(子列表)的所有元素——不,发布的代码不是线程安全的copyList.forEach 将抛出ConcurrentModificationException

标签: java multithreading


【解决方案1】:

在我看来,List 不是您要查找的课程。 Java 实际上有一个Queue 类(以及它的线程安全实现)。

然后,进一步查看您的代码,我发现它非常浪费; 您正在创建列表的副本,从原始对象中删除对象,然后打印副本。 这似乎是一个 CopyOnWriteArrayList 但对我来说线程安全性较低。这可能很有用,但队列解决方案最适合这种情况并且性能更高。

这是您的代码在更改为使用BlockingQueue 后的样子:

private static boolean flagStop = false;

//synchronized list
private static Queue<Object> queue = new BlockingQueue<Object>();

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {
    Object currentObject;
    while(!flagStop) {
            while((object = queue.poll()) != null) {
                System.out.println(currentObject);
            }
        }
        try { 
            Thread.sleep(1_000); 
        } catch (Exception ignored) {
        }
    }
});

static{
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
    queue.offer(obj);
}

另一方面,您还使用布尔值来告诉您的线程是否应该停止。这可能会导致同步问题。你真的应该用 AtomicBoolean 代替它 - 布尔的线程安全实现。 将boolean flagStop = false 更改为AtomicBoolean flagStop = new AtomicBoolean() 并将while (!flagStop) 更改为while(!flagStop.get())

我不太确定为什么您的课程也是静态的,但这通常是不好的做法。我会避免这种情况并使用实例。 我还建议实现 Runnable,使代码更清晰,并在 addElement 中添加一个空检查。 完成所有更正后,您的最终课程将如下所示:

public class MyPrintQueue implements Runnable {
    private AtomicBoolean shouldStop;

    //synchronized list
    private Queue<Object> queue;
    
    //thread which check once per second the size of queue
    //and print elements from list and try to remove them safely
    private Thread printQueueThread;
    
    public MyPrintQueue() {
        this.queue = new BlockingQueue<>();
        this.printQueueThread = new Thread(this);
    }

    @Override
    public void run() {
        Object currentObject;
        while(!flagStop.get()) {
                while((object = queue.poll()) != null) {
                    System.out.println(currentObject);
                }
            }
            try { 
                Thread.sleep(1_000); 
            } catch (Exception ignored) {
                // Ignored
            }
        }
    }

    public void startThread() {
        printQueueThread.start();
    }

    public void stop() {
        flagStop.set(true);
    }

    //add elements in queue
    //this is accessed by multiple threads, by few times per second.
    public void addElement(Object obj){
        if (obj == null)
            return;
        queue.offer(obj);
    }
}

话虽如此,您的代码对我来说似乎并不完全是线程安全的。我可能是错的,但您可以删除 Thread.sleep 调用并创建一个 TestClass,它在 while true 中调用 addElement() 以检查是否抛出了任何异常。 如果您不想使用 Queue 并且必须使用 List,则应将 Collections.synchronizedList 替换为 CopyOnWriteArrayList(),从而有效地将代码更改为此

private static boolean flagStop = false;

//synchronized list
private static List<Object> queue = new CopyOnWriteArrayList<>();

//thread which check once per second the size of queue
//and print elements from list and try to remove them safely
private static Thread printQueueThread = new Thread(() -> {

    while(!flagStop){

        if(!queue.isEmpty()){
            while (queue.size() > 0) {
                System.out.println(queue.get(0));
                queue.remove(0);
            }
        }

        try{ Thread.sleep(1_000); }catch(Exception ignored){}

    }

});

static{
    printQueueThread.start();
}

//add elements in queue
//this is accessed by multiple threads, by few times per second.
public static void addElement(Object obj){
    queue.add(obj);
}

请注意,上面的代码仍然是静态的 (!) 并且没有实现 AtomicBoolean。 还请查看以下似乎与您的问题相关的问题及其答案:Is externally synchronized ArrayList thread safe if its fields are not volatile?

【讨论】:

  • 谢谢。 flagStop 和写代码的方式,就是为了这个问题,尽量简短。只是为了提出我的问题。关于static 是因为我想直接从任何线程调用Foo.addElement 而不在它们之间共享实例。我认为这更容易管理。在我看来。
猜你喜欢
  • 2015-08-23
  • 2014-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-12-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多