【问题标题】:Data transfer rate getting slower after 15GB for larger file transfer15GB 后数据传输速率变慢以传输更大的文件
【发布时间】:2025-12-30 12:00:16
【问题描述】:

发送大于 15GB 的文件时,我遇到了数据传输速率问题。我有 3 台服务器和一个客户端。当从客户端向服务器发送文件时,我将文件拆分为块(每个块通常为 256MB),每个块在 2 个服务器上复制。复制发生在管道方法中。发送块时,每个块被分割成更小的数据包(每个数据包通常为 128 KB),发送到服务器,并在服务器端合并以存储在硬盘中。这里一切都很好。我对系统进行了 5GB 到 50GB 文件的测试,增量为 5GB。所有文件的平均写入速度约为 600MB/秒。见下图。这里我是和HDFS比较的。

从服务器读取相同文件时会出现此问题。文件分布在多个服务器上。例如,我可以从 server1 中读取 block1,从 server2 中读取 block2,依此类推。直观地说,读必须比写快,因为客户端从 3 个服务器并行读取。读取小于 15GB {5GB, 10GB, 15GG} 的文件时,性能在 1.1GB/sec 左右。读取大于 20GB {20GB, 25GB, ...., 50GB} 的文件时会出现问题。性能会随着文件大小的增加而降低。

上图是读取 50GB 文件的基准测试。每个黑点显示一个单独的块读取时间。如您所见,在第 60 到 70 个块之后性能开始下降。有趣的是,所有大于 15GB 的文件都会发生这种情况,在同一位置(大约 65 个块)附近放慢速度。随着文件大小的增加,慢速部分占主导地位,性能越来越差。我觉得16GB左右有一些障碍。我看到的唯一可能有帮助的提示是 3 台服务器并行随机发送块,直到 65 号左右。所以块的转移是重叠的。之后,一台服务器以循环顺序一次发送。我可以从日志输出中看到这一点。这里仍有一些重叠,但没有第 65 块之前那么多。

我在这个项目中使用 java 1.8 和 netty 4.1.8。作为 tcp 服务器。 操作系统为 CentOS 7。 每台服务器有两个 CPU(Intel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHz)= 40 个内核 64GB 内存 10 GBit 以太网。

我花了很多时间,找不到问题的根本原因。 问题可能由 Java VM、Netty、OS、OS TCP 默认值或其他原因引起。

服务器端 BlockSenderManager

@Override
    public void run(){

        while(nodeManager.isRunning()){
            try
            {
                BlockRequest br = blockSenders.take();
                if(br != null){
                    executor.execute(new BlockSender( br, this));
                }

                if(wait.take())
                    System.out.println(br.getBlockId()+" Delivered");
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }

服务器端的 BlockSender:

    @Override
        public void run()
        {
            FileInputStream fis = null;

            try
            {
                java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId());

                fis = new FileInputStream(file);
                fSize = file.length();
                long rem = fSize;

                sendBlockInfo();
                int bufSize;
                if (fSize < (long) packetSize)
                    bufSize = (int) fSize;
                else
                    bufSize = packetSize;
                int read = 0, packetOrder = 1;

                byte[] data;
                if(bufSize <= rem)
                    data = new byte[bufSize];
                else
                    data = new byte[(int)rem];
                while ((read = (fis.read(data))) > 0)
                {
                    if (read < 1)
                        break;

                    BlockPacket bp = new BlockPacket();

                    bp.setRequestId(blockRequest.getRequestId());
                    bp.setBlockId(blockRequest.getBlockId());
                    bp.setData(data);
                    bp.setPacketSeqNo(packetOrder);
                    if(read < bufSize)
                    {
                        bp.setIsLastPacket(true);
                    }

                    executor.execute(new Sender(bp));

                    packetOrder++;
                    if(rem > bufSize)
                        rem = rem - bufSize;

                    if(bufSize <= rem)
                        data = new byte[bufSize];
                    else
                    {
                        data = new byte[(int)rem];
                    }
                }

                fis.close();
                executor.shutdown();
            }
            catch (FileNotFoundException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

public class Sender implements Runnable
    {
        private final BlockPacket bp;
        private final FileBlock fb;
        private DataClient dc;

        public Sender(BlockPacket bp)
        {
            this.bp = bp;
            this.fb = null;
            dc = getDataClient(requestClient);
        }

        public Sender(FileBlock fb)
        {
            this.bp = null;
            this.fb = fb;
            dc = getDataClient(requestClient);
        }

        @Override
        public void run()
        {

            if (dc != null)
            {
                if (bp != null)
                {
                    dc.send(bp);
                }
                else if (fb != null)
                {
                    dc.send(fb);
                }
            }

        }
    }

客户端的ReceivedPacketProcessor

public void processBlockPacket(BlockPacket bp)
    {
        ByteBuffer buf = ByteBuffer.wrap(bp.getData());
        try
        {
            inChannel.write(buf);  
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
        public void run()
        {
            try
            {
                aFile = new RandomAccessFile(path+"/"+fileName, "rw");
                inChannel = aFile.getChannel();
                //java.io.File f = new java.io.File(path+"/"+fileName);
                //fop = new FileOutputStream(f);
                String reqId = file.getFileID();
                currentBlockId = reqId + "_" + currentBlockSeq;
                while (true)
                {
                    BlockPacket bp = null;
                    if (numberOfBlocks > 0)
                    {
                        try
                        {
                            bp = this.blockingQueue.take();
                        }
                        catch (InterruptedException e)
                        {
                            e.printStackTrace();
                        }
                        if (bp.getBlockId().equals(currentBlockId))
                        {
                            if (currentPacket == bp.getPacketSeqNo())
                            {

                                if(fileBlocks.containsKey(currentBlockId))
                                {
                                    processBlockPacket(bp);
                                    if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets())
                                        currentPacket++;
                                    else
                                    {
                                        if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
                                        {
                                            removeFileBlock(currentBlockId);
                                            currentBlockSeq++;
                                            currentBlockId = reqId + "_" + currentBlockSeq;
                                            currentPacket = 1;
                                            numberOfBlocks--;
                                        } 
                                    } 
                                }
                                else
                                {
                                    tempList.add(bp); 
                                }

                                for(int k =numberOfBlocks; k>0; k--)
                                {
                                    if(fileBlocks.containsKey(currentBlockId))
                                    {
                                        int pCount = fileBlocks.get(currentBlockId).getNoOfPackets();
                                        int i;
                                        for (i = currentPacket; i <= pCount; i++)
                                        {
                                            if (fileBlocks.get(currentBlockId).getPackets().containsKey(i))
                                            {
                                                processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i));
                                                currentPacket++;
                                            }
                                            else
                                            {
                                                break;
                                            }
                                        }
                                        if(i <= pCount)
                                        {
                                            break;
                                        }
                                        else
                                        {
                                            if (fileBlocks.get(currentBlockId).getPackets().size()  < 1)
                                            {
                                                removeFileBlock(currentBlockId);
                                                currentBlockSeq++;
                                                currentBlockId = reqId + "_" + currentBlockSeq;
                                                currentPacket = 1;
                                                numberOfBlocks--;
                                            } 
                                        }
                                    }
                                }
                            }
                        }
                        else
                        {
                            if(fileBlocks.containsKey(bp.getBlockId())){
                                fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp);
                            }else{
                                tempList.add(bp);
                            }

                        }
                    }
                    else{
                        break;
                    }
                    for(int i=0; i<tempList.size(); i++){
                        if(fileBlocks.containsKey(tempList.get(i).getBlockId())){
                            BlockPacket temp = tempList.remove(i);
                            fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp); 
                        }
                    }
                }
                System.out.println("CLOSING FILE....");
                this.isCompleted.put(true);
                inChannel.force(true);
                inChannel.close();
            }
            catch (FileNotFoundException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch (InterruptedException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

启用 -XX:+PrintGCDetails,here is a sample log

感谢任何评论/帮助。

【问题讨论】:

  • 我会检查 JVM GC、网络带宽利用率、读取服务器上的 IO。服务器上的文件系统缓存利用率。
  • @JigarJoshi 感谢 cmets。我根本没有接触 JVM GC。它以默认值运行。如果网络带宽是问题,我认为,对于较小的 15GB 文件也是问题。对于这种情况,从本地 IO 读取块不是瓶颈。我已经检查过了。
  • @celik 我认为 Jigar 想说的是打开 GC 日志并查看 might be 是否有问题。目前还不是很清楚你是如何阅读这些文件的......可能是一些代码或你实现它的方式? FileChannels?
  • 在如此复杂的系统中数据不足。
  • @the8472 此过程涉及的部件太多。我也会尝试放一些代码。

标签: java networking tcp jvm netty


【解决方案1】:

这是因为内存中的脏页配给。由于传入数据速率高于本地 IO 刷新吞吐量,因此数据在内存中累积。一旦达到允许的最大脏页比率,接收器就不再接受更多数据。因此,系统受限于本地 IO,在这种情况下不是网络。因此,收益递减发生在大约 15GB 处。您可以在

中更改一些设置

/etc/sysctl.conf

如:

vm.dirty_background_ratio = 2
vm.dirty_ratio = 80
vm.dirty_expire_centisecs = 3000
vm.dirty_writeback_centisecs = 500

This 可能是有用的阅读材料。

系统性能仍然受到本地 IO 和最大允许脏页比率的限制。您可以增加增加脏页比率以仅推迟返回时间的减少。如果文件/数据更大,它将再次到达这一点。新结果:

【讨论】: