【问题标题】:Is there any way to pass Observable<String> into AbstractInputStreamContent?有没有办法将 Observable<String> 传递给 AbstractInputStreamContent?
【发布时间】:2018-03-21 16:45:36
【问题描述】:

我正在将文本文件上传到 Google 云端硬盘

ByteArrayContent content = new ByteArrayContent("text/csv", fileContent.getBytes(Charset.forName("UTF-8")));
Drive.Files.Insert request = drive.files().insert(file, content);

在哪里type(fileContent) = String

我想将fileContent 的类型重构并更改为Observable&lt;String&gt;,有没有什么好的解决方法可以将它传递给insert() 函数(它将AbstractInputStreamContent 作为第二个参数)?

谢谢

【问题讨论】:

  • 您可能需要编写一个完整的阻塞适配器,如 AbstractInputStreamContent 或寻找使用 Google 自己的响应式 API(gRPC、Agera 等)之一编写的适配器。

标签: arrays google-drive-api rx-java rx-java2 google-drive-realtime-api


【解决方案1】:

这是一个通用的Flowable -> InputStream 您可以委托给:

import java.io.*;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;

import io.reactivex.FlowableSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

public final class FlowableStringInputStream {

    private FlowableStringInputStream() {
        throw new IllegalStateException("No instances!");
    }

    public static InputStream createInputStream(
            Publisher<String> source, Charset charset) {
        StringInputStream parent = new StringInputStream(charset);
        source.subscribe(parent);
        return parent;
    }

    static final class StringInputStream extends InputStream
    implements FlowableSubscriber<String> {

        final AtomicReference<Subscription> upstream;

        final Charset charset;

        volatile byte[] bytes;

        int index;

        volatile boolean done;
        Throwable error;

        StringInputStream(Charset charset) {
            this.charset = charset;
            upstream = new AtomicReference<>();
        }

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1);
            }
        }

        @Override
        public void onNext(String t) {
            bytes = t.getBytes(charset);
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public void onError(Throwable t) {
            error = t;
            done = true;
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public void onComplete() {
            done = true;
            synchronized (this) {
                notifyAll();
            }
        }

        @Override
        public int read() throws IOException {
            for (;;) {
                byte[] a = awaitBufferIfNecessary();
                if (a == null) {
                    Throwable ex = error;
                    if (ex != null) {
                        if (ex instanceof IOException) {
                            throw (IOException)ex;
                        }
                        throw new IOException(ex);
                    }
                    return -1;
                }
                int idx = index;
                if (idx == a.length) {
                    index = 0;
                    bytes = null;
                    upstream.get().request(1);
                } else {
                    int result = a[idx] & 0xFF;
                    index = idx + 1;
                    return result;
                }
            }
        }

        byte[] awaitBufferIfNecessary() throws IOException {
            byte[] a = bytes;
            if (a == null) {
                synchronized (this) {
                    for (;;) {
                        boolean d = done;
                        a = bytes;
                        if (a != null) {
                            break;
                        }
                        if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
                            break;
                        }
                        try {
                            wait();
                        } catch (InterruptedException ex) {
                            if (upstream.get() != SubscriptionHelper.CANCELLED) {
                                InterruptedIOException exc = new InterruptedIOException();
                                exc.initCause(ex);
                                throw exc;
                            }
                            break;
                        }
                    } 
                }
            }
            return a;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
                throw new IndexOutOfBoundsException(
                    "b.length=" + b.length + ", off=" + off + ", len=" + len);
            }
            for (;;) {
                byte[] a = awaitBufferIfNecessary();
                if (a == null) {
                    Throwable ex = error;
                    if (ex != null) {
                        if (ex instanceof IOException) {
                            throw (IOException)ex;
                        }
                        throw new IOException(ex);
                    }
                    return -1;
                }
                int idx = index;
                if (idx == a.length) {
                    index = 0;
                    bytes = null;
                    upstream.get().request(1);
                } else {
                    int r = 0;
                    while (idx < a.length && len > 0) {
                        b[off] = a[idx];
                        idx++;
                        off++;
                        r++;
                        len--;
                    }
                    index = idx;
                    return r;
                }
            }
        }

        @Override
        public int available() throws IOException {
            byte[] a = bytes;
            int idx = index;
            return a != null ? Math.max(0, a.length - idx) : 0;
        }

        @Override
        public void close() throws IOException {
            SubscriptionHelper.cancel(upstream);
            synchronized (this) {
                notifyAll();
            }
        }
    }
}

用法:

@Test(timeout = 10000)
public void async() throws Exception {
    AtomicInteger calls = new AtomicInteger();

    Flowable<String> f = Flowable.range(100, 10).map(Object::toString)
            .doOnCancel(() -> calls.incrementAndGet())
            .subscribeOn(Schedulers.computation())
            .delay(10, TimeUnit.MILLISECONDS);

    try (InputStream is = FlowableStringInputStream.createInputStream(f, utf8)) {
        assertEquals('1', is.read());
        assertEquals('0', is.read());
        assertEquals('0', is.read());

        byte[] buf = new byte[3];
        assertEquals(3, is.read(buf));

        assertArrayEquals("101".getBytes(utf8), buf);
    }

    assertEquals(1, calls.get());
}

【讨论】:

  • 看起来不错,你能帮我整合这段代码吗(我没有得到关于委派的部分)?我试过这样做,但它不起作用,因为insert() 需要AbstractInputStreamContent 作为第二个参数。 InputStream in = FlowableStringInputStream.createInputStream(Flowable.just(""), Charset.forName("UTF-8")); Drive.Files.Insert req = drive.files().insert(file, in);
  • 创建AbstractInputStreamContent 的实例并委托给in 的相应方法。
  • 谢谢,所以基本上有3种方法:getInputStream()应该返回我之前创建的这个实例(InputStream inputStream = FlowableStringInputStream.createInputStream(textFlowable, Charset.forName("UTF-8"));retrySupported -&gt; false,我如何为getLength提供实现一个不错的Rx时尚?我在FlowableStringInputStream没有找到对应的方法。更新了一个问题。
  • 因为流通常是未知的长度,所以你不能提前告诉它,除非你愿意缓冲所有的数据,提前转换它并得到它的长度。你也可以不知道:getLength().
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-07-27
  • 2021-04-13
  • 2019-08-18
  • 1970-01-01
  • 1970-01-01
  • 2020-04-29
相关资源
最近更新 更多