【问题标题】:Understanding and Resolving File Transferring issue using Netty使用 Netty 理解和解决文件传输问题
【发布时间】:2017-02-02 03:26:08
【问题描述】:


大家好,
我需要帮助解决文件传输问题。我实现了 Netty 代码,将单个 10MB 二进制文件从一台主机(节点 0)传输到另一台主机(节点 1),但只有 8.5KB 的文件被传输,我很难弄清楚原因。我正在使用 ChunkWriteHandler 通过 ChunkedNioFile 一次发送 1MB 的文件块(请参阅下面的代码)。此外,我尝试传输大于 1MB 的文件,例如 100MB、500MB 和 1GB,但仅传输了 8.5KB 的文件。如果我将 ChunkedNioFile 中指定的块大小从 1MB 减少到 512KB 或更低,则传输 17 KB,这是先前文件传输大小的两倍。另外,我尝试只使用 ChunkedFile,但我收到了相同的传输结果。我可以成功传输和接收文件头:文件名、文件大小(长度)和文件偏移量(从哪里开始读取或写入),但实际文件只有几 KB。 谁能告诉我发生了什么以及如何解决这个问题? (下面是代码)。

谢谢,

代码设置:

  • FileSenderInitializer.java
  • FileSenderHandler.java
  • FileSender.java
  • FileReceiverInitializer.java
  • FileReceiverHandler.java
  • FileReceiver.java

FileSenderInitializer.java - 使用通道处理程序初始化通道管道

公共类 FileSenderInitializer 扩展 ChannelInitializer {

        @Override
        public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(
        //new LengthFieldPrepender(8),
        new ChunkedWriteHandler(),
        new FileSenderHandler());
        }
       }

FileSenderHandler.java - 发送文件头信息 - 文件名、偏移量、长度,然后是实际文件

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 try {
 String fileRequest = "ftp Node0/root/10MB_File.dat Node1/tmp/10MB_File_Copy.dat";

 //Source File to send / transfer to the Destination Node
 String theSrcFilePath =  "/root/10MB_File.dat";

 //File Name to write on the destination node, once the file is received  
 String theDestFilePath = "/tmp/10MB_File_Copy.dat";

//Get the source file to send
 File theFile = new File(theSrcFilePath);
 FileChannel theFileChannel = new RandomAccessFile(theFile, "r").getChannel();

//Get the length of the file
 long fileLength = theFileChannel.size();
 //Get the offset
 long offSet = 0;

 //Copy the offset to the ByteBuf
 ByteBuf offSetBuf = Unpooled.copyLong(offSet);
 //Copy the file length to the ByteBuf
 ByteBuf fileLengthBuf = Unpooled.copyLong(fileLength);

 //Get the Destination Filename (including the file path) in Bytes
 byte[] theDestFilePathInBytes = theDestFilePath.getBytes();
 //Get the length of theFilePath
 int theDestSize = theDestFilePathInBytes.length;
 //Copy the Dest File Path length to the ByteBuf
 ByteBuf theDestSizeBuf = Unpooled.copyInt(theDestSize);
 //Copy the theDestFilePathInBytes to the Byte Buf
 ByteBuf theDestFileBuf = Unpooled.copiedBuffer(theDestFilePathInBytes);

 //Send the file Headers: FileName Length, the FileName, the Offset and the file length
 ctx.write(theDestSizeBuf);
 ctx.write(theDestFileBuf);
 ctx.write(offSetBuf);
 ctx.write(fileLengthBuf);
 ctx.flush();

 //Send the 10MB File in 1MB chunks as specified by the following chunk size (1024*1024*1)
 ctx.write(new ChunkedNioFile(theFileChannel, offSet, fileLength, 1024 * 1024 * 1));
 ctx.flush();

 }catch(Exception e){
 System.err.printf("FileSenderHandler: Channel Active: Error: "+e.getMessage());
 e.printStackTrace();
 }
} //End channelActive

FileSender.java - 引导通道并将此客户端/主机连接到另一台主机

  public static void main(String[] args) throws Exception {
     // Configure the client/ File Sender
     EventLoopGroup group = new NioEventLoopGroup();
     try {
     Bootstrap b = new Bootstrap();
     b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new FileSenderInitializer());

     // Start the client.
     ChannelFuture f = b.connect(HOST, PORT).sync();

     // Wait until the connection is closed.
     //f.channel().closeFuture().sync();
     } finally {
     // Shut down the event loop to terminate all threads.
     group.shutdownGracefully();
     }
     }
}

FileReceiverInitializer.java - 使用通道处理程序初始化通道管道

public class FileReceiverInitializer extends ChannelInitializer<SocketChannel> {

 public FileReceiverInitializer(){

 }

@Override
 public void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast( 
  //Read in 1MB data at a time (which is the max frame length), length field offset starts at 0, length of the length field is 8 bits, length adjustment is 0, strip the 8 bits representing the length field from the frame
 //new LengthFieldBasedFrameDecoder(1024*1024*1, 0, 8, 0, 8),
 new FileReceiverHandler());
 }
}

FileReceiverHandler.java - 接收文件头信息 - 文件名、偏移量、长度和实际文件

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
 while (msg.readableBytes() >= 1){
   //Read in the size of the File Name and it's directory path
   if (!fileNameStringSizeSet) {
     fileNameStringSizeBuf.writeBytes(msg, ((fileNameStringSizeBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringSizeBuf.writableBytes())); //INT_SIZE = 4 & LONG_SIZE = 8 (the byte size of an int and long)
     if (fileNameStringSizeBuf.readableBytes() >= INT_SIZE) {
       fileNameStringSize = fileNameStringSizeBuf.getInt(fileNameStringSizeBuf.readerIndex());//Get Size at index = 0;
       fileNameStringSizeSet = true;
   //Allocate a byteBuf to read in the actual file name and it's directory path
       fileNameStringBuf = ctx.alloc().buffer(fileNameStringSize);
    }
   } else if (!readInFileNameString) {
     //Read in the actual file name and it's corresponding directory path
     fileNameStringBuf.writeBytes(msg, ((fileNameStringBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringBuf.writableBytes()));
     if (fileNameStringBuf.readableBytes() >= fileNameStringSize) {
       readInFileNameString = true;
       //convert the data in the fileNameStringBuf to an ascii string
       thefileName = fileNameStringBuf.toString(Charset.forName("US-ASCII"));

       //Create file
       emptyFile = new File(thefileName); //file Name includes the directory path
       f = new RandomAccessFile(emptyFile, "rw");
       fc = f.getChannel();
    }
 }else if (!readInOffset) {
   offSetBuf.writeBytes(msg, ((offSetBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : offSetBuf.writableBytes()));
   if (offSetBuf.readableBytes() >= LONG_SIZE) {
     currentOffset = offSetBuf.getLong(offSetBuf.readerIndex());//Get Size at index = 0;
     readInOffset = true;
   }

 } else if (!readInFileLength) {
   fileLengthBuf.writeBytes(msg, ((fileLengthBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileLengthBuf.writableBytes()));
   //LONG_SIZE = 8
   if (fileLengthBuf.readableBytes() >= LONG_SIZE) {
   fileLength = fileLengthBuf.getLong(fileLengthBuf.readerIndex());//Get Size at index = 0;
   remainingFileLength = fileLength;
   readInFragmentLength = true;
  }
 } else {
   if (!readInCompleteFile) {
     if (msg.readableBytes() < remainingFileLength) {
       if (msg.readableBytes() > 0) {
         currentFileBytesWrote = 0
         while ( msg.readableBytes >= 1 ){
           int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset);
           currentOffset += fileBytesWrote;
           remainingFileLength -= fileBytesWrote;
           msg.readerIndex(msg.readerIndex + fileBytesWrote);
         }
       }
     } else {
       int remainingFileLengthInt = (int) remainingFileLength;
       while (remainingFileLength >= 1){
         int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), remainingFileLengthInt), currentOffset);

         currentOffset += fileBytesWrote;
         remainingFileLength -= fileBytesWrote;
         remainingFileLengthInt-= fileBytesWrote;
         msg.readerIndex(msg.readerIndex + fileBytesWrote );
       }

      //Set readInCompleteFile to true
      readInCompleteFile = true;

    }
   }//End else if file chunk
  }//End Else
 }//End While
}//End Read Method

FileReceiver.java - 引导服务器并接受连接

public static void main(String[] args) throws Exception {
 // Configure the server
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
 ServerBootstrap b = new ServerBootstrap();
 b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new FileReceiverInitializer())
 .childOption(ChannelOption.AUTO_READ, true) 
 .bind(LOCAL_PORT).sync().channel().closeFuture().sync();
 } finally {
 bossGroup.shutdownGracefully();
 workerGroup.shutdownGracefully();
 }
}

-- 

【问题讨论】:

    标签: java sockets netty


    【解决方案1】:

    问题在于,当主 FileSender.java 应用程序完成执行其代码时,它会终止,从而导致 FileSenderHandler 终止。但是,为了阻止主 FileSender.java 应用程序终止,我使用了以下语句:f.channel().closeFuture().sync();。其中 f 是通过调用连接到服务器而呈现的 ChannelFuture:b.connect(HOST, PORT).sync();这将使 FileSender 保持正常运行,并允许 fileSenderHandler 发送所有信息而不会提前终止。

    但是,我的新问题是:应用程序如何关闭通道并导致主应用程序在所有数据发送并确认后解除阻塞?目前它被阻止调用 f.channel() .closeFuture().sync();。但是在我发送所有数据并收到确认之后,我该如何解除对主应用程序的阻止。我想如果我关闭通道,closeFuture 将返回为 true,从而解除对主应用程序的阻塞。此外,我尝试使用 ctx.channel().close() 从 FileSenderHandler 和 FileReceiverHandler 中关闭通道,但通道没有关闭并取消阻塞主应用程序。

    我需要解除对应用程序的阻塞的原因是,在发送并确认所有数据后,我可以将吞吐量打印到控制台。如果我有多个数据通道并且程序被阻塞,则只会打印第一个数据通道处理程序吞吐量。所以 FileSender.java 如下所示。但即使我有一个数据通道并且我尝试在 FileSenderHandler 中关闭通道,主应用程序 (FileSender.java) 仍然阻塞并挂在 ChannelFuture.channel().closeFuture().sync();要退出,我必须在终端输入 control C。 关于在发送和接收所有数据后如何解锁主应用程序有什么想法吗?

    FileSender.java - 引导通道并将此客户端/主机连接到另一台主机

    public static void main(String[] args) throws Exception {
     // Configure the client/ File Sender
     EventLoopGroup group = new NioEventLoopGroup();
     try {
    for (int i =0; i<numOfDataChannels; i++) {
     Bootstrap b = new Bootstrap();
     b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new FileSenderInitializer());
    
     // Start the client.
     ChannelFuture f = b.connect(HOST, PORT).sync();
    
      addChannelFutureToList(f);
    }
    
     // Wait until the connection is closed for each data channel, but also who can actually close the channel
     for ( ChannelFuture f: channelFutureList){
       f.channel().closeFuture().sync();
    }
    
    //When Channel is closed PRINT THROUGHPUT  OF ALL THE DATA CHANNELS
    printThroughput();
     } finally {
     // Shut down the event loop to terminate all threads.
     group.shutdownGracefully();
     }
     }
    }
    

    FileSenderHandler.java - 处理 I/O 通道事件,例如读/写

    public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
    try {
     .
     .
     //After received msg Ack, close the channel, this should unblock the main application (FileSender.java) since after closing the channel closeFuture will be fulfilled
     ctx.channel().close();
    
    }catch(Exception e){
        System.err.printf("ChannelRead Error Msg: " + e.getMessage());
        e.printStackTrace();
    
    }
    

    【讨论】:

      【解决方案2】:

      也许我错了,但以下内容对我来说很奇怪:

             int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset);
             currentOffset += fileBytesWrote;
             remainingFileLength -= fileBytesWrote;
             msg.readerIndex(msg.readerIndex + fileBytesWrote); 
             // msg.readerIndex (or msg.readerIndex() ?) changed already
      

      您可能希望在执行此分配之前备份 readerIndex() 值。

      有几个 KB 似乎与以下任一有关: - 您不会按照自己的意愿消耗所有数据包(仅第一个?) - 你读错了(我在前面显示的代码中怀疑跳过了一些字节)

      您能否跟踪每个读取操作(服务器端)?它可能会对您有所帮助(例如,知道您收到了多少字节,写了多少,readerIndex/readableBytes/offset 是多少)。

      【讨论】:

        【解决方案3】:

        另一个原因可能是:客户端,一旦连接,您立即关闭该组。这可能是原因,因为客户端可以“中止”传输,因此服务器不会进行完整传输?

        【讨论】:

        • 谢谢弗雷德里克
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-11-28
        • 1970-01-01
        • 2020-10-25
        • 2011-08-20
        相关资源
        最近更新 更多