【问题标题】:Using RxJava to build asynchronous REST API使用 RxJava 构建异步 REST API
【发布时间】:2014-05-10 08:56:24
【问题描述】:

着眼于 RxJava 为我们的 API 构建异步支持。 现在我们使用 jetty + JAX-RS @Path 注释并且不确定 将传入的 REST api 调用绑定到 RxJava API 的正确方法是什么。

基本上这是在释放请求线程的上下文中,直到 来自 DB 的响应已准备就绪。

查看了 Vert.x,但这需要 java 7,我们现在与 java 6 绑定。

寻找关于上述内容的建议。典型的做法是什么 需要将传入的 http 请求绑定到 RxJava API。

【问题讨论】:

  • 您在寻找客户端还是服务器?

标签: rx-java


【解决方案1】:

下面是一个为 JAX-RS 创建 Customer Observable 的示例:

public class ApiService {
    Client client;

    public ApiService() {
        client = ClientBuilder.newClient();
    }

    public Observable<Customer> createCustomerObservable(final int customerId) {
        return Observable.create(new Observable.OnSubscribe<Customer>() {
            @Override
            public void call(final Subscriber<? super Customer> subscriber) {
                client
                        .target("http://domain.com/customers/{id}")
                        .resolveTemplate("id", customerId)
                        .request()
                        .async()
                        .get(new InvocationCallback<Customer>() {
                            @Override
                            public void completed(Customer customer) {
                                // Do something
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onNext(customer);
                                    subscriber.onCompleted();
                                }
                            }

                            @Override
                            public void failed(Throwable throwable) {
                                // Process error
                                if (!subscriber.isUnsubscribed()) {
                                    subscriber.onError(throwable);
                                }
                            }
                        });
            }
        });
    }
}

【讨论】:

    【解决方案2】:

    以下内容应该适用于 Jetty

    public class ApiService {
        HttpClient httpClient;
    
        public ApiService(HttpClient httpClient,) {
            this.httpClient = httpClient;
        }
    
        public <RequestType, ResultType> Observable<ResultType> createApiObservable(final RequestType requestContent) {
            return Observable.create(new Observable.OnSubscribe<ResultType>() {
                @Override
                public void call(final Subscriber<? super ResultType> subscriber) {
                    // Create the request content for your API. Your logic here...
                    ContentProvider contentProvider = serializeRequest(requestContent);
    
                    httpClient
                            .newRequest("http://domain.com/path")
                            .content(contentProvider)
                            .send(new Response.CompleteListener() {
                                @Override
                                void onComplete(Result result) {
                                    // Pass along the error if one occurred.
                                    if (result.isFailed()) {
                                        subscriber.onError(result.getFailure());
                                        return;
                                    }
    
                                    // Convert the response data to the ResultType. Your logic here...
                                    ResultType resultContent = parseResponse(result.getResponse());
    
                                    // Send the result to the subscriber.
                                    subscriber.onNext(responseBytes);
                                    subscriber.onCompleted();
                                }
                            });
                }
            });
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-12-11
      • 1970-01-01
      • 2018-08-22
      • 1970-01-01
      • 2018-03-31
      • 1970-01-01
      • 1970-01-01
      • 2022-08-19
      相关资源
      最近更新 更多