【问题标题】:Prevent race condition with RxJava asynchronous thread implementation使用 RxJava 异步线程实现防止竞争条件
【发布时间】:2018-04-17 11:45:45
【问题描述】:

我正在使用 Spring 框架 StringRedisTemplate 来更新一个多线程发生的条目。

public void processSubmission(final String key, final Map<String, String> submissionDTO) {
    final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
    this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
    final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
    Map<String, String> data = findByKey(key);
    String json;
    if (data != null) {
        data.putAll(submissionDTO);
        json = convertSubmission(data);
    } else {
        json = convertSubmission(submissionDTO);
    }
    ops.put(key, hashKey, json);
}

在这个 json 条目中如下所示,

key (assignmentId) -> value (submissionId, status) 

如代码所示,在更新缓存条目之前,我获取当前条目并添加新条目并将它们全部放入。但由于此操作可以在多个线程中进行,因此可能会出现竞争条件导致数据丢失的情况。我可以同步上述方法,但这将成为 RxJava 实现的并行处理能力的瓶颈,其中 processSubmission 方法是通过 RxJava 在两个异步线程上调用的。

class ProcessSubmission{

@Override
    public Observable<Boolean> processSubmissionSet1(List<Submission> submissionList, HttpHeaders requestHeaders) {
        return Observable.create(observer -> {
            for (final Submission submission : submissionList) {            
                //Cache entry insert method invoke via this call
                final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
                observer.onNext(status);
            }
            observer.onCompleted();
        });
    }

    @Override
    public Observable<Boolean> processSubmissionSet2(List<Submission> submissionList, HttpHeaders requestHeaders) {
        return Observable.create(observer -> {
            for (final Submission submission : submissionList) {
                //Cache entry insert method invoke via this call
                final Boolean status = processSubmissionExecutor.processSubmission(submission, requestHeaders);
                observer.onNext(status);
            }
            observer.onCompleted();
        });
    }

} 

上面将从下面的服务API调用。

class MyService{    
public void handleSubmissions(){
    final Observable<Boolean> statusObser1 = processSubmission.processSubmissionSet1(subListDtos.get(0), requestHeaders)
                .subscribeOn(Schedulers.newThread());
    final Observable<Boolean> statusObser2 = processSubmission.processSubmissionSet2(subListDtos.get(1), requestHeaders)
                    .subscribeOn(Schedulers.newThread());                   
    statusObser1.subscribe();
    statusObser2.subscribe();
    }       
}

因此,handleSubmissions 调用每个分配 ID 的多个线程。但随后每个主线程创建并调用两个反应式 Java 线程并处理与每个分配关联的提交列表。

在保持 RxJava 实现的性能的同时,防止 redis 进入竞争条件的最佳方法是什么?有没有办法可以更有效地执行此 redis 操作?

【问题讨论】:

    标签: java spring multithreading redis rx-java


    【解决方案1】:

    看起来您最后只使用ops 变量来执行put 操作,您可以隔离需要同步的那个点。

    在我所做的简短研究中,我找不到 HashOperations 是否还不是线程安全的)。

    但是,您可以如何隔离您关注的部分的一个示例是执行以下操作:

    public void processSubmission(final String key, final Map<String, String> submissionDTO) {
        final String hashKey = String.valueOf(Hashing.MURMUR_HASH.hash(key));
        this.stringRedisTemplate.expire(key, 60, TimeUnit.MINUTES);
        Map<String, String> data = findByKey(key);
        String json;
        if (data != null) {
            data.putAll(submissionDTO);
            json = convertSubmission(data);
        } else {
            json = convertSubmission(submissionDTO);
        }
        putThreadSafeValue(key, hashKey, json);
    }
    

    并且有一个只为put 操作同步的方法:

    private synchronized void putThreadSafeValue(key, hashKey, json) {
        final HashOperations<String, String, String> ops = this.stringRedisTemplate.opsForHash();
        ops.put(key, hashKey, json);
    }
    

    有很多方法可以做到这一点,但看起来您可以将线程争用限制为 put 操作。

    【讨论】:

      猜你喜欢
      • 2021-06-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-05-27
      • 2022-12-14
      • 2010-11-27
      • 2016-03-21
      • 1970-01-01
      相关资源
      最近更新 更多