【问题标题】:Creating a queue that operates a job创建操作作业的队列
【发布时间】:2018-01-26 10:40:55
【问题描述】:

我想构建一个线程来侦听队列并在我将项目添加到队列时执行一项工作。

但我不太了解如何开发它。我已经尝试了一些来自 RxJava2 的 Flowable 示例,但不知道该怎么做。

我对 Android 和 Java 中的所有示例持开放态度,也许消息处理程序或执行程序将是一个简单的解决方案。可惜没有专业知识。尤其是 RxJava2 会很棒。

更新

换句话说,我想在其上构建一个队列机制,因为长日志显示为单独的,并且定时使它们混合在一起,只要它们中的 2 个在近时间调用。

public final class Logcat {

   private static final String TAG = "HOWDY";

   public static void v(String message) {
       Log.v(TAG, message);
   }

   public static void d(String message) {
       Log.d(TAG, message); 
       //TODO I will add a for-loop later for long messages to make sure to show all of them for each method.
   }

   public static void e(Throwable throwable) {
       Log.e(TAG, throwable.getMessage());
   }

   public static void e(String message) {
       Log.e(TAG, message);
   }

   public static void e(ApiError error) {
       Log.e(TAG, error.message);
   }
}

【问题讨论】:

  • 这就是java.util.concurrent.Executors 的用途。为什么要用 RxJava 来做呢?
  • 好点。我忘记添加了。但是,从来没有经历过。我只对 RxJava2 感到好奇。
  • 这些项目是什么,你想做什么工作?项目是否可以相互并行处理。是否要为每个项目生成结果?您要等待处理每个或所有项目吗?项目的数量是有限的吗?
  • 我只需要单 io 线程。每个工作都需要互相等待。没有并行处理。我已更新问题以获取更多详细信息。

标签: java android queue rx-java2


【解决方案1】:

好的,这就是我的做法..

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

public class DemoRxJava2 {

    public static void testWithQueue() {

        CompletableFuture<String> allDone = new CompletableFuture<>();
        AtomicBoolean submitDone = new AtomicBoolean(Boolean.FALSE);
        final Queue<Long> queue = new ConcurrentLinkedQueue<>();

        Observable.interval(2, TimeUnit.SECONDS)
        .takeWhile(tick -> !queue.isEmpty() || !submitDone.get())
        .flatMap(tick -> {
            return Observable.create(sub -> {
                while (!queue.isEmpty()) {
                    sub.onNext(queue.poll());
                }
                sub.onComplete();
            });
        })
        .subscribeOn(Schedulers.single())
        .doOnSubscribe(dis -> System.out.println("Queue processing active"))
        .doOnComplete(() -> {
            System.out.println("Queue processing done");
            allDone.complete("DONE");
        })
        .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

        Observable.interval(1,TimeUnit.SECONDS)
        .take(10)
        .doOnSubscribe(dis -> System.out.println("Job submitter start"))
        .doOnNext(tick -> {
            long ms = System.currentTimeMillis() / 1000;
            System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
            queue.add(ms);
        })
        .doOnComplete(() -> submitDone.set(Boolean.TRUE))
        .blockingSubscribe();

        try {
            allDone.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void testWithSubject() {

        CompletableFuture<String> allDone = new CompletableFuture<>();

        PublishSubject<Long> queue = PublishSubject.create();

        queue.observeOn(Schedulers.single())
        .flatMap(tx -> Observable.just(tx).delay(2, TimeUnit.SECONDS))
        .doOnSubscribe(dis -> System.out.println("Queue processing active"))
        .doOnComplete(() -> allDone.complete("DONE"))
        .subscribe(nextTs -> System.out.printf("[%s] : Processing tx : %d\n", Thread.currentThread().getName(), nextTs));

        Observable.interval(1, TimeUnit.SECONDS)
        .take(10)
        .doOnSubscribe(dis -> System.out.println("Job submitter start"))
        .doOnNext(tick -> {
            long ms = System.currentTimeMillis() / 1000;
            System.out.printf("[%s] : Submitting tx : %d\n", Thread.currentThread().getName(), ms);
            queue.onNext(ms);
        })
        .doOnComplete(() -> queue.onComplete())
        .blockingSubscribe();

        //wait until all done
        try {
            allDone.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        testWithQueue();
        testWithSubject();
    }
}

这只是演示如何使用 RxJava 在单独的线程中处理对象队列,根据您的需要进行调整

【讨论】:

    【解决方案2】:

    我就是这样做的。

    interface ILog {
    
       String TAG = "HOWDY";
    
       void display();
    }
    

    还有我的抽象类;

    abstract class AbstractLog implements ILog {
    
       String mLog;
    
       AbstractLog(@NonNull String log) {
           mLog = log;
       }
    }
    

    这是我的具体课程;还有另一个类,如 Verbose 等。

    public class ErrorLog extends AbstractLog {
    
       ErrorLog(@NonNull String log) {
           super(log);
       }
    
       ErrorLog(@NonNull Throwable throwable) {
           super(throwable.getMessage());
       }
    
       ErrorLog(@NonNull ApiError error) {
           super(error.message);
       }
    
       @Override
       public void display() {
          Log.e(TAG, mLog);
       }
    }
    

    这是要与之交互的类开发人员。

    public final class Logcat {
    
       private static LogQueue sQueue = new LogQueue();
    
       public static void v(String log) {
           Message message = new Message();
           message.obj = new VerboseLog(log);
    
           sQueue.sendMessage(message);
       }
    
       public static void d(String log) {
           Message message = new Message();
           message.obj = new DebugLog(log);
    
           sQueue.sendMessage(message);
       }
    
       public static void e(Throwable throwable) {
           Message message = new Message();
           message.obj = new ErrorLog(throwable);
    
           sQueue.sendMessage(message);
       }
    
       public static void e(String log) {
           Message message = new Message();
           message.obj = new ErrorLog(log);
    
           sQueue.sendMessage(message);
       }
    
       public static void e(ApiError error) {
           Message message = new Message();
           message.obj = new ErrorLog(error);
    
           sQueue.sendMessage(message);
       }
    
       private static class LogQueue extends Handler {
    
           @Override
           public void handleMessage(Message msg) {
               super.handleMessage(msg);
    
               ILog log = (ILog) msg.obj;
               log.display();
           }
       }
    }
    

    希望对其他人有所帮助。

    【讨论】:

    • 我在您的回答中没有看到任何 RxJava2。您似乎不是在寻找消息的异步处理。我以为您正在寻找一种方法来通过专用的单独线程处理队列中的对象(由其他线程提交)?
    • 是的。它是我想要的;每个日志必须等待完成队列中的当前日志。否则(如果它是并行的)它们分开输出。我希望 RxJava2 使用 Flowable 来实现这个功能但不知道该怎么做的原因。
    • 看我的回答,希望对你有帮助
    猜你喜欢
    • 1970-01-01
    • 2015-12-02
    • 1970-01-01
    • 2018-09-11
    • 2019-01-23
    • 1970-01-01
    • 2013-05-08
    • 1970-01-01
    • 2015-07-07
    相关资源
    最近更新 更多