【问题标题】:Facing Race Condition In Flink connected Stream in apache flink在 apache flink 中的 Flink 连接流中面临竞争条件
【发布时间】:2018-08-01 07:49:36
【问题描述】:

在 flink 中实现流程功能时面临竞争条件 连接的流。我有Cache Map 在两个人之间共享 被并行调用的函数processElement1 & processElement2 通过 2 个不同的线程。

Streams1--->(发送报价数据)

Streams2--->(发送lms(忠诚度管理系统数据))

connect=Streams1.connect(Streams2);

connect.process(new TriggerStream);

TriggerStream Class 中,我使用唯一 ID:MemberId 将数据存储为 unique Key 以在缓存中存储和 lookup data。当数据流入时,我没有得到一致的结果

class LRUConcurrentCache<K,V>{
    private final Map<K,V> cache;
    private final int maxEntries;
    public LRUConcurrentCache(final int maxEntries) {
        this.cache = new LinkedHashMap<K,V>(maxEntries, 0.75F, true) {
            private static final long serialVersionUID = -1236481390177598762L;
            @Override
            protected boolean removeEldestEntry(Map.Entry<K,V> eldest){
                return size() > maxEntries;
            }
        };
    }
    //Why we cant lock on the key
    public void put(K key, V value) {
        synchronized(key) {
            cache.put(key, value);
        }
    }

    //get methode
    public V get(K key) {
        synchronized(key) {
            return cache.get(key);
        }
    }



public class TriggerStream extends CoProcessFunction<IOffer, LMSData, String> {


    private static final long serialVersionUID = 1L;
    LRUCache cache; 
    private String offerNode;
    String updatedValue, retrivedValue;
    Subscriber subscriber;

    TriggerStream(){
        this.cache== new LRUCache(10);
    }



@Override
    public void processElement1(IOffer offer) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub
            IOffer latestOffer = offer;

            //Check the subscriber is there or not

            retrivedValue = cache.get(latestOffer.getMemberId().toString());
            if ((retrivedValue == null)) {
                //Subscriber is the class that is used and converted as Json String & then store into map
                Subscriber subscriber = new Subscriber();
                subscriber.setMemberId(latestOffer.getMemberId());
                ArrayList<IOffer> offerList = new ArrayList<IOffer>();
                offerList.add(latestOffer);
                subscriber.setOffers(offerList);
                updatedValue = mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            } else {
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                List<IOffer> offers = subscriber.getOffers();
                offers.add(latestOffer);
                updatedValue= mapper.writeValueAsString(subscriber);
                cache.set(subscriber.getMemberId().toString(), subscriberUpdatedValue);
            }
        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

@Override
    public void processElement2(LMSData lms) throws Exception {
        try {
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            mapper.enableDefaultTyping();
            // TODO Auto-generated method stub

            //Check the subscriber is there or not

            retrivedValue = cache.get(lms.getMemberId().toString());
            if(retrivedValue !=null){
                Subscriber subscriber = mapper.readValue(retrivedValue, Subscriber.class);
                //do some calculations 
                String updatedValue = mapper.writeValueAsString(subscriber);
                //Update value
                cache.set(subscriber.getMemberId().toString(), updatedValue);
            }

        } catch (Exception pb) {
            applicationlogger.error("Exception in Offer Loading:"+pb);
            applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");
        }
        applicationlogger.debug("*************************FINISHED OFFER LOADING*******************************");

    }

}   

【问题讨论】:

    标签: java concurrency apache-flink java.util.concurrent flink-cep


    【解决方案1】:

    Flink 不保证CoProcessFunction(或任何其他 Co*Function)以何种顺序摄取数据。在分布式并行任务中维护某种确定性顺序的成本太高了。

    相反,您必须在函数中使用状态和可能的计时器来解决这个问题。函数中的LRUCache 应该保持状态(可能是keyed state)。否则,一旦发生故障,它将丢失。您可以为第一个流和缓冲区记录添加另一个状态,直到来自第二个流的查找值到达。

    【讨论】:

    • 是的,我在 couchbase 中维护 LRUCache 状态。所以状态不会丢失。做同样的事情 您可以为第一个流和缓冲区记录添加另一个状态,直到第二个流的查找值到达。
    • 我建议将状态存储在 Flink 中。否则,您将对每次查找和/或更新进行远程调用。此外,如果发生故障,外部状态不会重置,这意味着您不会实现完全一次的状态一致性。
    猜你喜欢
    • 2020-10-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-15
    • 1970-01-01
    • 1970-01-01
    • 2017-02-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多