【问题标题】:Issue with streaming formatted input stream to server将格式化的输入流流式传输到服务器的问题
【发布时间】:2017-03-15 09:53:40
【问题描述】:

我正在尝试将“格式化”输入流写入 tomcat servlet(使用 Guice)。

根本问题如下:我想将数据从数据库直接流式传输到服务器。因此我加载数据,将其转换为 JSON 并将其上传到服务器。我不想先将 JSON 写入临时文件,这是由于性能问题,所以我想通过直接流式传输到服务器来规避使用硬盘驱动器。

编辑:类似于Sending a stream of documents to a Jersey @POST endpoint

但答案中的评论说它正在丢失数据,我似乎有同样的问题。

我写了一个“ModelInputStream”

  1. 在流​​式传输上一个模型时从数据库中加载下一个模型
  2. 为类型(枚举序数)写入一个字节
  3. 为下一个字节数组 (int) 的长度写入 4 个字节
  4. 写入字符串 (refId)
  5. 为下一个字节数组 (int) 的长度写入 4 个字节
  6. 写入实际的 json
  7. 重复直到所有模型都流式传输

我还编写了一个“ModelStreamReader”,它知道该逻辑并进行相应的读取。

当我直接测试它时它工作正常,但是一旦我在客户端创建 ModelInputStream 并将服务器上的传入输入流与 ModelStreamReader 一起使用,实际的 json 字节小于定义长度的 4 个字节中指定的字节。我猜这是由于放气或压缩造成的。

我尝试了不同的内容标题来尝试禁用压缩等,但没有任何效果。

java.io.IOException: Unexpected length, expected 8586, received 7905

所以在客户端,JSON 字节数组是 8586 字节长,而当它到达服务器时,它是 7905 字节长,这打破了整个概念。

此外,它似乎并没有真正流式传输,而是首先缓存从输入流返回的全部内容。

我需要如何调整调用代码以获得我描述的结果?

模型输入流

package *;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

import ***.Daos;
import ***.IDatabase;
import ***.CategorizedEntity;
import ***.CategorizedDescriptor;
import ***.JsonExport;

import com.google.gson.Gson;
import com.google.gson.JsonObject;

public class ModelInputStream extends InputStream {

    private final Gson gson = new Gson();
    private final IDatabase db;
    private final Queue<CategorizedDescriptor> descriptors;
    private byte[] buffer = new byte[0];
    private int position = 0;

    public ModelInputStream(IDatabase db, List<CategorizedDescriptor> descriptors) {
        this.db = db;
        this.descriptors = new LinkedList<>();
        this.descriptors.addAll(descriptors);
    }

    @Override
    public int read() throws IOException {
        if (position == buffer.length) {
            if (descriptors.size() == 0)
                return -1;
            loadNext();
            position = 0;
        }
        return buffer[position++];
    }

    private void loadNext() throws IOException {
        CategorizedDescriptor descriptor = descriptors.poll();
        byte type = (byte) descriptor.getModelType().ordinal();
        byte[] refId = descriptor.getRefId().getBytes();
        byte[] json = getData(descriptor);
        buildBuffer(type, refId, json);
    }

    private byte[] getData(CategorizedDescriptor d) {
        CategorizedEntity entity = Daos.createCategorizedDao(db, d.getModelType()).getForId(d.getId());
        JsonObject object = JsonExport.toJson(entity);
        String json = gson.toJson(object);
        return json.getBytes();
    }

    private void buildBuffer(byte type, byte[] refId, byte[] json) throws IOException {
        buffer = new byte[1 + 4 + refId.length + 4 + json.length];
        int index = put(buffer, 0, type);
        index = put(buffer, index, asByteArray(refId.length));
        index = put(buffer, index, refId);
        index = put(buffer, index, asByteArray(json.length));
        put(buffer, index, json);
    }

    private byte[] asByteArray(int i) {
        return ByteBuffer.allocate(4).putInt(i).array();
    }

    private int put(byte[] array, int index, byte... bytes) {
        for (int i = 0; i < bytes.length; i++) {
            array[index + i] = bytes[i];
        }
        return index + bytes.length;
    }

}

ModelStreamReader

package *;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

import *.ModelType;

public class ModelStreamReader {

    private InputStream stream;

    public ModelStreamReader(InputStream stream) {
        this.stream = stream;
    }

    public Model next() throws IOException {
        int modelType = stream.read();
        if (modelType == -1)
            return null;
        Model next = new Model();
        next.type = ModelType.values()[modelType];
        next.refId = readNextPart();
        next.data = readNextPart();
        return next;
    }

    private String readNextPart() throws IOException {
        int length = readInt();
        byte[] bytes = readBytes(length);
        return new String(bytes);
    }

    private int readInt() throws IOException {
        byte[] bytes = readBytes(4);
        return ByteBuffer.wrap(bytes).getInt();
    }

    private byte[] readBytes(int length) throws IOException {
        byte[] buffer = new byte[length];
        int read = stream.read(buffer);
        if (read != length)
            throw new IOException("Unexpected length, expected " + length + ", received " + read);
        return buffer;
    }

    public class Model {

        public ModelType type;
        public String refId;
        public String data;

    }

}

调用代码

ModelInputStream stream = new ModelInputStream(db, getAll(db));
URL url = new URL("http://localhost:8080/ws/test/streamed");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setDoOutput(true);
con.setRequestMethod("POST");
con.connect();
int read = -1;
while ((read = stream.read()) != -1) {
    con.getOutputStream().write(read);
}
con.getOutputStream().flush();
System.out.println(con.getResponseCode());
System.out.println(con.getResponseMessage());
con.disconnect();

服务器部分(Jersey WebResource)

package *.webservice;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import *.ModelStreamReader;
import *.ModelStreamReader.Model;

@Path("test")
public class TestResource {

    @POST
    @Path("streamed")
    public Response streamed(InputStream modelStream) throws IOException {
        ModelStreamReader reader = new ModelStreamReader(modelStream);
        writeDatasets(reader);
        return Response.ok(new HashMap<>()).build();
    }

    private void writeDatasets(ModelStreamReader reader) throws IOException {
        String commitId = UUID.randomUUID().toString();
        File dir = new File("/opt/tests/streamed/" + commitId);
        dir.mkdirs();
        Model dataset = null;
        while ((dataset = reader.next()) != null) {
            File file = new File(dir, dataset.refId);
            writeDataset(file, dataset.data);
        }
    }

    private void writeDataset(File file, String data) {
        try {
            if (data == null)
                file.createNewFile();
            else
                Files.write(file.toPath(), data.getBytes(Charset.forName("utf-8")));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

【问题讨论】:

    标签: java tomcat servlets binary inputstream


    【解决方案1】:

    读取的字节必须移到 (0, 255) 范围内(请参阅ByteArrayInputStream)。

    模型输入流

    @Override
    public int read() throws IOException {
        ...
        return buffer[position++] & 0xff;
    } 
    

    最后这一行必须添加到调用代码中(用于分块):

    ...
    HttpURLConnection con = (HttpURLConnection) url.openConnection();
    con.setChunkedStreamingMode(1024 * 1024);
    ...
    

    【讨论】:

      【解决方案2】:

      我发现了一个完全不同性质的问题。

      首先输入流没有被压缩或任何东西。读取的字节必须移入 (0, 255) 范围而不是 (-128, 127)。因此流读取被 -1 字节值中断。

      模型输入流

      @Override
      public int read() throws IOException {
          ...
          return buffer[position++] + 128;
      } 
      

      其次,数据必须分块传输才能真正“流式传输”。因此必须将 ModelStreamReader.readBytes(int) 方法额外调整为:

      ModelStreamReader

      private byte[] readBytes(int length) throws IOException {
          byte[] result = new byte[length];
          int totalRead = 0;
          int position = 0;
          int previous = -1;
          while (totalRead != length) {
              int read = stream.read();
              if (read != -1) {
                  result[position++] = (byte) read - 128;
                  totalRead++;
              } else if (previous == -1) {
                  break;
              }
              previous = read;
          }
          return result;
      }
      

      最后这一行必须添加到调用代码中:

      ...
      HttpURLConnection con = (HttpURLConnection) url.openConnection();
      con.setChunkedStreamingMode(1024 * 1024);
      ...
      

      【讨论】:

      • 您不必移动字节或将它们移回。明显地。这样做完全没有任何效果。您通过这个冗余过程解决了其他一些问题。您似乎在寻找DataInputStream.readFully() 和朋友。
      • 字节的值介于 -128 和 127 之间,所以如果我不移动值,它将返回负值,如果值为 -1,它将终止读取流过早。这就是导致问题的原因,它正在使用我在答案中发布的代码(包括班次)
      • 您能解释一下为什么您对问题和答案投反对票吗?为什么这个问题没有显示出任何研究成果,为什么答案没有用。并请详细说明为什么它如此明显,显然对我来说并不明显。
      • 我检查了 ByteArrayInputStream 实现,我移动字节的方式是错误的(它适用于我的情况,但一般来说是错误的),所以我相应地更新了答案。
      猜你喜欢
      • 2013-05-11
      • 1970-01-01
      • 2012-08-12
      • 2012-03-06
      • 2010-12-09
      • 2014-06-06
      • 1970-01-01
      • 2012-01-24
      • 2011-07-01
      相关资源
      最近更新 更多