【问题标题】:Java stream parallelStream How to map multple functions to stream mapJava流并行Stream如何将多个函数映射到流映射
【发布时间】:2021-08-28 08:59:09
【问题描述】:

我有这个适用于 Arraylist 的代码 数组列表中的每个元素都需要由 ArithmeticBBB 和 ArithmeticCCC 处理 并创建 2 个返回到流中的元素 我正在使用 stream = stream.map(a::work);在循环 。 或使用 flatMap

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 2" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
        @Override
        public String arithmetic(String n) {
            String ss  = n+" 3" + " ,Thread ID:" +Thread.currentThread().getId();
            return ss;
        }
    }
    
    public class ArithmeticManagerStream {
        private List<String> integerList = null;
        private List<String> resultArray = null;
    
    
        public ArithmeticManagerStream(List<String> dataFromUser) {
            this.integerList =  dataFromUser;
        }
    
        public List<String> invokerActions(List<ArithmeticAction> actions) throws
                InterruptedException {
    
            Stream<String> stream = integerList.parallelStream();
            for (final ArithmeticManagerStream.ArithmeticAction a : actions) {
                stream = stream.flatMap(str -> actions.stream().map(b -> b.arithmetic(str)));
             }
            return resultArray = stream.collect(Collectors.toList());
        }
        public interface ArithmeticAction {
            String arithmetic(String n);
        }
    
        public static void main(String[] args) {
            List<ArithmeticAction> actions = new ArrayList();
            actions.add(new ArithmeticBBB());
            actions.add(new ArithmeticCCC());
            List<String> intData = new ArrayList<>();
            intData.add("1");
            intData.add("2");
            intData.add("3");
    
            ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
    
            try {
                List<String> result = arithmeticManagerStream.invokerActions(actions);
                System.out.println("***********************************************");
                for(String i : result) {
                    System.out.println(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

目前,如果你运行这个程序,你会看到结果是连接到字符串的。

1 2 ,Thread ID:12 2 ,Thread ID:12
1 2 ,Thread ID:12 3 ,Thread ID:12
1 3 ,Thread ID:12 2 ,Thread ID:12
1 3 ,Thread ID:12 3 ,Thread ID:12
2 2 ,Thread ID:1 2 ,Thread ID:1
2 2 ,Thread ID:1 3 ,Thread ID:1
2 3 ,Thread ID:1 2 ,Thread ID:1
2 3 ,Thread ID:1 3 ,Thread ID:1
3 2 ,Thread ID:13 2 ,Thread ID:13
3 2 ,Thread ID:13 3 ,Thread ID:13
3 3 ,Thread ID:13 2 ,Thread ID:13
3 3 ,Thread ID:13 3 ,Thread ID:13

我需要每个元素都在自己的元素中,例如:

1 2 ,Thread ID:12 
1 2 ,Thread ID:12  
1 3 ,Thread ID:12  
1 3 ,Thread ID:12 
2 2 ,Thread ID:1  
2 2 ,Thread ID:1 
2 3 ,Thread ID:1 
2 3 ,Thread ID:1 
3 2 ,Thread ID:13 
3 2 ,Thread ID:13  
3 3 ,Thread ID:13  
3 3 ,Thread ID:13 

【问题讨论】:

  • 您是否尝试生成与Iwork 中的Iwork 对象一样多的列表?如果没有,您将如何处理多个 map 结果?如果您希望通过应用这些操作产生多个列表,则需要多个流。
  • @ernest_k 我用最终目标更新了问题
  • @Holger 我更新了问题
  • 那么您只需要flatMap。这有点像笛卡尔积。
  • @ernest_k 你能举个例子吗?感谢重播

标签: java multithreading parallel-processing java-stream


【解决方案1】:

显然,由于拼写错误和其他问题的数量,已发布问题中的代码并未从实际运行的代码中删除。所以不可能说出实际的问题是什么。

但是,以下代码会运行:

public class ParallelStreams {

    public static void main(String[] args) {

        List<String> strList = List.of("a1", "a2", "a3", "a4", "a5");
        List<IWork> w = List.of(new Work1(), new Work2());

        Stream<String> stream = strList.parallelStream();
        for (final IWork a : w) {
            stream = stream.map(a::work);
        }
        List<String> finalArray = stream.collect(Collectors.toList());

        for (String s : finalArray) {
            System.out.println(s);
        }
    }
}


interface IWork {
    String work(String s);
}

class Work1 implements IWork {
    @Override
    public String work(String s) {
        return s + " * " + Thread.currentThread().getId();
    }
}

class Work2 implements IWork {
    @Override
    public String work(String s) {
        return s + " ! " + Thread.currentThread().getId();
    }
}

并产生以下输出:

a1 * 13 ! 13
a2 * 13 ! 13
a3 * 1 ! 1
a4 * 1 ! 1
a5 * 1 ! 1

为了更清楚地了解运行的是哪个 IWork 实现,我添加了一个“*”或“!”来代替原始实现中的空格。

【讨论】:

  • 感谢您的回答,但这就是我所做的,除了我没有做 LIst.of 而是在循环中添加了 IWork,因为我使用 java 8
【解决方案2】:

首先,感谢您将其更改为可运行代码,以便我们了解发生了什么。

由于数据只是个位数,而您的操作只是个位数,线程 ID 可以是个位数,因此您的理解有点困难。所以我将数据更改为“A”、“B”和“C”以使其更容易。

在您的for 循环中,您正在循环actions,但实际上不要将action 用于循环中的任何内容。所以它本质上只是一个循环,运行次数与您的操作一样多,但在每次迭代中都没有不同。因为您调用了循环变量a,所以您让这变得更难捕捉。尝试将其命名为更有意义的名称。

真正令人困惑的是,您正在循环通过 actions,然后在循环内流式传输操作。因此,这将应用的操作数平方。我不敢相信这就是您要寻找的东西,但您似乎并不对输出的这方面感到不安,只是因为它已针对每个输入进行合并。

此外,目前尚不清楚您对线程本身的关心程度。你在输出中有它们,但为什么呢?为什么使用ParallelStream()

您还通过将 Stream 视为变量并逐步移动它来使其变得更加复杂。在我发布的第一个答案中,我发现将它们组合在一起会导致整个流通过单个线程运行,所以我没有在那个答案中这样做,因为我不确定线程​​对你有多重要(并且他们不应该)。

最后,如果您转储 for 循环,然后将其全部合并到一个 Stream 语句中,它会完美运行:

class ArithmeticBBB implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 2" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

class ArithmeticCCC implements ArithmeticManagerStream.ArithmeticAction {
    @Override
    public String arithmetic(String n) {
        String ss = n + " 3" + " ,Thread ID:" + Thread.currentThread().getId();
        return ss;
    }
}

public class ArithmeticManagerStream {
    private List<String> integerList = null;
    private List<String> resultArray = null;


    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<ArithmeticAction> actions) throws InterruptedException {
        return integerList.parallelStream()
                          .flatMap(str -> actions.stream().map(action -> action.arithmetic(str)))
                          .collect(Collectors.toList());
    }

    public interface ArithmeticAction {
        String arithmetic(String n);
    }

    public static void main(String[] args) {
        List<ArithmeticAction> actions = new ArrayList();
        actions.add(new ArithmeticBBB());
        actions.add(new ArithmeticCCC());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);

        try {
            List<String> result = arithmeticManagerStream.invokerActions(actions);
            System.out.println("***********************************************");
            for (String i : result) {
                System.out.println(i);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这会导致:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
C 2 ,Thread ID:1
C 3 ,Thread ID:1

这对我来说似乎是正确的。每个源项都会通过每个动作,生成自己的输出,然后将其平面映射回原始流。

最后一点。我希望简化代码,并注意到您的界面是Functional 构造。因此,您实际上可以放弃实现类,并在加载 actions 数组时使用 lambda 声明它们。然后,当然,很明显您的界面基本上是Function&lt;String, String&gt;。所以我完全放弃了接口,然后清理了代码,使用 Stream 作为最终输出。

结果基本上是微不足道的,我添加了更多动作只是为了说明一点:

public class ArithmeticManagerStream {
    private List<String> integerList = null;

    public ArithmeticManagerStream(List<String> dataFromUser) {
        this.integerList = dataFromUser;
    }

    public List<String> invokerActions(List<Function<String, String>> actions) {
        return integerList.parallelStream().flatMap(str -> actions.stream().map(action -> action.apply(str))).collect(Collectors.toList());
    }

    public static void main(String[] args) {
        List<Function<String, String>> actions = new ArrayList();
        actions.add(n -> n + " 2" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 3" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 4" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 5" + " ,Thread ID:" + Thread.currentThread().getId());
        actions.add(n -> n + " 6" + " ,Thread ID:" + Thread.currentThread().getId());
        List<String> intData = new ArrayList<>();
        intData.add("A");
        intData.add("B");
        intData.add("C");

        ArithmeticManagerStream arithmeticManagerStream = new ArithmeticManagerStream(intData);
        List<String> result = arithmeticManagerStream.invokerActions(actions);
        System.out.println("***********************************************");
        result.forEach(System.out::println);
    }
}

输出是:

***********************************************
A 2 ,Thread ID:13
A 3 ,Thread ID:13
A 4 ,Thread ID:13
A 5 ,Thread ID:13
A 6 ,Thread ID:13
B 2 ,Thread ID:1
B 3 ,Thread ID:1
B 4 ,Thread ID:1
B 5 ,Thread ID:1
B 6 ,Thread ID:1
C 2 ,Thread ID:14
C 3 ,Thread ID:14
C 4 ,Thread ID:14
C 5 ,Thread ID:14
C 6 ,Thread ID:14

【讨论】:

    猜你喜欢
    • 2018-12-22
    • 2015-09-12
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 2022-11-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多