【问题标题】:RxJava - Flowable.map not getting called after flatMapRxJava - 在 flatMap 之后没有调用 Flowable.map
【发布时间】:2017-04-28 16:35:48
【问题描述】:

我在 rxjava (io.reactivex.rxjava2 - v2.0.8) 中的 Flowable vs Observable 遇到了奇怪的问题。这里我的代码如下所示,其中 ma​​p(...).subscribe(...) 函数没有被调用/执行。

flowable.flatMap(...return new flowable...).map(...).subscribe(...)

令人惊讶的是,如果我翻转代码以使用 Observable 而不是 Flowable,则 ma​​p(...).subscribe(...) 会被调用 /按预期执行。我可能遗漏了一些简单的东西,请告诉我可能是什么?

谢谢

`

public class Application {
    public static void main(String[] args) {      
        // Note: Working; get the list of databases
        DatabaseFlowable databases = new DatabaseFlowable(sourceClient);
        // Note: Working; for each database get the list of collections in it
        Flowable<Resource> resources = databases
            .flatMap(db -> {
                logger.info(" ==> found database {}", db.getString("name"));
                return new CollectionFlowable(sourceClient, db.getString("name"));
                // Note: Working; CollectionFlowable::subscribeActual works as well
            });
        resources
            .map(resource -> {
                // Note: Nothing in here gets executed
                logger.info(" ====> found resource {}", resource.toString());
                return resource;
            })
            .subscribe(m -> {
                // Note: Nothing in here gets executed
                logger.info(m.toString());
            });
    }
}



public class DatabaseFlowable extends Flowable<Document> {
    private final static Logger logger = LoggerFactory.getLogger(DatabaseFlowable.class);
    private final MongoClient client;

    public DatabaseFlowable(MongoClient client) {
        this.client = client;
    }

    @Override
    protected void subscribeActual(Subscriber<? super Document> subscriber) {
        ListDatabasesIterable<Document> cursor = client.listDatabases();
        MongoCursor<Document> iterator = cursor.iterator();

        while(iterator.hasNext()) {
            Document item = iterator.next();
            if (!item.isEmpty()) {
                    String message = String.format(" found database name: %s, sizeOnDisk: %s",
                            item.getString("name"), item.get("sizeOnDisk"));
                    logger.info(message);
                    subscriber.onNext(item);
            }
        }
        subscriber.onComplete();
    }
}

public class CollectionFlowable extends Flowable<Resource> {

    private final static Logger logger = LoggerFactory.getLogger(CollectionFlowable.class);
    private final MongoClient client;
    private final String databaseName;

    public CollectionFlowable(MongoClient client, String databaseName) {
        this.databaseName = databaseName;
        this.client = client;
    }


    @Override
    protected void subscribeActual(Subscriber<? super Resource> subscriber) {
        MongoDatabase database = client.getDatabase(databaseName);

        ListCollectionsIterable<Document> cursor = database.listCollections();
        MongoCursor<Document> iterator = cursor.iterator();

        while(iterator.hasNext()) {
            Document item = iterator.next();
            if (!item.isEmpty()) {
                logger.info(" ... found collection: {}.{}", database.getName(), item.getString("name"));
                Resource resource = new Resource(databaseName,
                        item.getString("name"),
                        (Document) item.get("options"));
                subscriber.onNext(resource);
            }
        }
        subscriber.onComplete();
    }
}

`

【问题讨论】:

    标签: java rx-java2


    【解决方案1】:

    那是因为您没有正确遵循Flowable 协议。没有对subscriber.onSubscribe(...) 的调用,并且它不遵守subscriber.request(...) 施加的限制。

    由于您的实现绝不会观察到背压,因此请使用Flowable.create 创建一个缓冲版本,或者将您的实现移至没有背压的Observable

    您看到该行为的原因是下游观察者没有请求任何项目,因此您对 onNext 的调用被丢弃。

    【讨论】:

    • 我同意我可能没有正确使用 Flowables。您能否帮助建议我应该更改什么或提供一些可能会更清楚的示例。谢谢
    • 您所有的自定义 Flowables 基本上都可以替换为 Flowable.create(...) 实现。如果您使用 BUFFER 背压策略,则不会丢失排放。
    猜你喜欢
    • 1970-01-01
    • 2018-02-20
    • 2020-04-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-22
    • 2021-03-22
    • 2018-06-29
    相关资源
    最近更新 更多