【问题标题】:synchronize and merge messaging/data flow同步和合并消息/数据流
【发布时间】:2011-09-30 09:18:57
【问题描述】:

这是关于非常常见的传感器数据处理问题。

为了同步和合并来自不同来源的传感器数据,我想用 Java 实现它,而不需要太复杂的第三个库或框架。

比如说,我定义了一个对象 (O),它由例如 4 个属性 (A1,..A4) 组成。这 4 个属性来自不同的数据通道,例如套接字通道。

这4个属性一般以1.0~2.0Hz的速率到达,并且它们的到达是相互独立的。 一旦有 4 个属性(A1、..A4)同时出现(在一个小的时间窗口内,例如 100 毫秒),然后我从这 4 个属性构造一个新对象 (O)。

描述性场景如下。 A1~A4到达时间点标有*号。

对象O1~U3分别在t1、t2、t3时间点构建。 一些属性在 t2 和 t3 之间到达,但对于构​​造一个对象来说是不完整的,因此它们 将被删除和忽略。

  A1     *          *         *         *
  A2      *           *         *        *
  A3     *            *                  * 
  A4      *            *       *         * 
  --------|------------|-----------------|----------> time
          t1           t2                t3
          O1           O2                O3  

一些要求:

  1. 确定时间点 a.s.a.p. 以根据最后传入的 4 个属性构造对象。
  2. FIFO,O1 必须在 O2 之前构建,以此类推。
  3. Java 中的较少锁定
  4. 如果数据未完成构造对象,则最终删除数据。

关于实施的一些快速想法是:

  • 将任何传入属性存储在时间离散存储桶的 FIFO 队列中(每个存储桶包含 4 个不同的属性)。
  • 并发运行一个无限线程以检查 FIFO 队列(从队列的头部)是否有任何存储桶已被 4 个不同的属性填充。如果是,则构造一个对象并将存储桶从队列中移除。如果存储桶在特定时间窗口内未完全填充,它将被丢弃。

欢迎任何建议和指正!

【问题讨论】:

  • 只是为了确保我清楚数据的删除,你是说,例如,如果 A2-A4 没有在大约 100 毫秒内填充,则应该删除 A1?因此,如果 A1 到达,然后 150 毫秒内没有任何反应,然后 A2、A3 和 A4 在大约 50 毫秒内到达,则不会创建 Object,因为没有有效的 A1。如果新的 A1 在 A2、A3 和 A4 中最早的 100 毫秒内到达,则可以创建一个 Object,如果没有,则丢弃 A2、A3 和 A4。
  • 数据是否应该以一定的固定速率到达,以便您知道它们应该在什么时间到达?
  • 再想一想,你所描述的另一种方式是一个 100 毫秒长的滑动窗口,如果该窗口中所有四个点的数据都到达,则可以创建一个对象。
  • @Kaj:产生数据的传感器(这里的属性)可能不可靠。
  • @elgcom。在这种情况下,您可以查看我的示例。我认为它可以满足您的需求,并且可以处理可配置数量的审查器。

标签: java concurrency merge messaging sensors


【解决方案1】:

你可以做这样的事情,get 操作是阻塞的,直到数据到达,add 操作不是阻塞的。可以对 get 操作进行一些优化,以便您将候选对象保持在并行结构中,这样您在过滤掉旧项目时就不需要遍历所有候选对象。然而,迭代 4 个项目应该足够快。

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;

public class Filter<V> {

    private static final long MAX_AGE_IN_MS = 100;

    private final int numberOfSources;

    private final LinkedBlockingQueue<Item> values = new LinkedBlockingQueue<Item>();

    public Filter(int numberOfSources) {
        this.numberOfSources = numberOfSources;
    }

    public void add(String source, V data) {
        values.add(new Item(source, data));
    }

    public void get() throws InterruptedException {
        HashMap<String, Item> result = new HashMap<String, Item>();
        while (true) {
            while (result.size() < numberOfSources) {
                Item i = values.take();
                result.put(i.source, i);
                if (result.size() == numberOfSources) {
                    break;
                }
            }
            //We got candidates from each source now, check if some are too old.
            long now = System.currentTimeMillis();
            Iterator<Item> it = result.values().iterator();
            while (it.hasNext()) {
                Item item = it.next();
                if (now - item.creationTime > MAX_AGE_IN_MS) {
                    it.remove();
                }
            }
            if (result.size() == numberOfSources) {
                System.out.println("Got result, create a result object and return the items " + result.values());
                break;
            }
        }
    }

    private class Item {
        final String source;
        final V value;
        final long creationTime;

        public Item(String source, V value) {
            this.source = source;
            this.value = value;
            this.creationTime = System.currentTimeMillis();
        }

        public String toString() {
            return String.valueOf(value);
        }
    }


    public static void main(String[] args) throws Exception {
        final Filter<String> filter = new Filter<String>(4);
        new Thread(new Runnable() {
            public void run() {
                try {
                    filter.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        filter.add("a0", "va0.1");
        filter.add("a0", "va0.2");
        Thread.sleep(2000);
        filter.add("a0", "va0.3");
        Thread.sleep(100);
        filter.add("a1", "va1.1");
        filter.add("a2", "va2.1");
        filter.add("a0", "va0.4");
        Thread.sleep(100);
        filter.add("a3", "va3.1");
        Thread.sleep(10);
        filter.add("a1", "va1.2");
        filter.add("a2", "va2.2");
        filter.add("a0", "va0.5");

    }


}

【讨论】:

    【解决方案2】:

    这是另一种方法——不过它只是伪代码,你需要自己编写:)

    class SlidingWindow {
        AtomicReference<Object> a1;
        AtomicReference<Object> a2;
        AtomicReference<Object> a3;
        AtomicReference<Object> a4;
    
        Queue<Long> arrivalTimes = new Queue(4);
    
        public Bucket setA1(Object data) {
            a1.set(data);
            now = System.currentTimeInMillis()
            long oldestArrivalTime = arrivalTimes.pop();
            arrivalTimes.push(now);
            if (now - oldestArrivalTime < 100) {
                return buildBucket();
            }
            return null;
        }
    
        public Bucket setA2(Object data) { ...
    
        ...
    
        private Bucket buildBucket() {
            Bucket b = new Bucket(a1, a2, a3, a4);
            a1.clear();
            a2.clear();
            a3.clear();
            a4.clear();
            return b;
        }
    
    }
    

    【讨论】:

      【解决方案3】:

      这是另一个疯狂的想法:

      使用单个 LinkedBlockingQueue 将值写入所有传感器 A1-A4

      将此队列分配给AtomicReference变量

      创建一个定时器任务,它会以指定的时间间隔(100 毫秒)用一个新队列切换这个队列

      从旧队列中取出所有数据,看看是否有所有数据 A1-A4

      如果是,则创建对象,否则删除所有内容

      【讨论】:

      • 我认为问题在于您可能会丢失数据 - A1 可能在第一个队列的生命周期结束时到达,而其他队列可能在下一个队列的生命周期开始时到达.因此,四个数据点在时限内到达,但不会创建对象,因为它们不共享同一个队列。
      • 是的,那是真的..会想别的:)
      【解决方案4】:

      这不太可能解决您的问题,但它可能会为您指明正确的方向。

      我会使用 Google Guava 的 MapMaker 进行第一次尝试:

      ConcurrentMap<Key, Bucket> graphs = new MapMaker()
                                         .expireAfterAccess(100, TimeUnit.MILLISECOND)
                                         .makeComputingMap(new Function<Key, Bucket>() {
                                                           public Bucket apply(Key key) {
                                                               return new Bucket(key);
                                                           }
                                          });
      

      这将创建一个映射,如果 100 毫秒未访问其条目将消失,并在请求时创建一个新存储桶。

      我无法弄清楚 Key 到底是什么:S 你真正追求的是队列形式的相同功能。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2019-09-17
        • 1970-01-01
        • 1970-01-01
        • 2016-02-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2011-12-09
        相关资源
        最近更新 更多