【问题标题】:Read UInt32 from InputStream从 InputStream 中读取 UInt32
【发布时间】:2025-12-27 19:45:12
【问题描述】:

我需要与具有特殊消息格式的服务器进行通信:每条消息以 4 个字节(以及大端格式的 unsigned long / UInt32 一起)开始,这决定了以下消息的长度。在这 4 个字节之后,消息作为普通字符串发送

所以我首先需要将 4 个字节读入一个整数(32 位无符号)。在 Java 中,我这样做:

DataInputStream is;
...
int len = is.readInt();

如何在 Swift 4 中做到这一点?

我现在使用

var lengthbuffer = [UInt8](repeating: 0, count: 4)
let bytecount = istr.read(&lengthbuffer, maxLength: 4)
let lengthbytes = lengthbuffer[0...3]
let bigEndianValue = lengthbytes.withUnsafeBufferPointer {
            ($0.baseAddress!.withMemoryRebound(to: UInt32.self, capacity: 1) { $0 })
            }.pointee
let bytes_expected = Int(UInt32(bigEndian: bigEndianValue))

但这看起来并不是最优雅的方式。此外,有时(我无法可靠地复制它)读取错误的值(太大)。当我尝试为以下消息分配内存时,应用程序崩溃:

let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_expected)
let bytes_read = istr.read(buffer, maxLength: bytes_expected)

那么从 InputStream 中读取 UInt32 的快捷方式是什么?


编辑:

我当前的代码(从 cmets 实现的东西。谢谢!)看起来像这样:

private let inputStreamAccessQueue  = DispatchQueue(label: "SynchronizedInputStreamAccess")  // NOT concurrent!!!

// This is called on Stream.Event.hasBytesAvailable
func handleInput() {
    self.inputStreamAccessQueue.sync(flags: .barrier) {            
        guard let istr = self.inputStream, istr.hasBytesAvailable else {
            log.error(self.buildLogMessage("handleInput() called when inputstream has no bytes available"))
            return
        }

        let lengthbuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
        defer { lengthbuffer.deallocate(capacity: 4) }
        let lenbytes_read = istr.read(lengthbuffer, maxLength: 4)

        guard lenbytes_read == 4 else {
            self.errorHandler(NetworkingError.InputError("Input Stream received \(lenbytes_read) (!=4) bytes"))
            return
        }

        let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)
        log.info(self.buildLogMessage("expect \(bytes_expected) bytes"))

        let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_expected)
        let bytes_read = istr.read(buffer, maxLength: bytes_expected)
        guard bytes_read == bytes_expected else {
            print("Error: Expected \(bytes_expected) bytes, read \(bytes_read)")
            return
        }

        guard let message = String(bytesNoCopy: buffer, length: bytes_expected, encoding: .utf8, freeWhenDone: true) else {
            log.error("ERROR WHEN READING")
            return
        }

        self.handleMessage(message)
    }
}

这在大多数情况下都有效,但有时 istr.read() 不会读取 bytes_expected 字节而是 bytes_read 分配太多内存 => 崩溃

我认为这是对错误的解释。但是如何解决呢? 在 hasBytesAvailable = true 时在流上调用 read()?有没有更好的解决方案?

我会假设当我循环时,hasBytesAvailableEvent 仍然会在每次 read() => 之后再次发生,handleInput 仍然会过早地再次被调用......我该如何避免这种情况?


编辑 2:我现在已经实现了循环,不幸的是它仍然因相同的错误而崩溃(并且可能是相同的原因)。相关代码:

let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)

var message = ""
var bytes_missing = bytes_expected
while bytes_missing > 0 {
    print("missing", bytes_missing)
    let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_missing)
    let bytes_read = istr.read(buffer, maxLength: bytes_missing)

    guard bytes_read > 0 else {
        print("bytes_read not <= 0: \(bytes_read)")
        return
    }

    guard bytes_read <= bytes_missing else {
        print("Read more bytes than expected. missing=\(bytes_missing), read=\(bytes_read)")
        return
    }

    guard let partial_message = String(bytesNoCopy: buffer, length: bytes_expected, encoding: .utf8, freeWhenDone: true) else {
        log.error("ERROR WHEN READING")
        return
    }

    message = message + partial_message
    bytes_missing -= bytes_read
}

崩溃时我的控制台输出:

缺少 1952807028 malloc:* mach_vm_map(size=1952808960) 失败 (错误代码=3) * 错误:无法分配区域 *** 在 malloc_error_break 中设置断点进行调试

所以看起来整个handleInput()方法调用得太早了,虽然我使用了屏障!我做错了什么?

【问题讨论】:

  • 这有点太复杂了,但我找不到明显的错误。因此,了解具体问题会很有帮助。对于哪些数据,您会得到什么“错误值”?
  • 有时(对我来说不可重现,因为我无权访问服务器代码)bytes_expected 包含非常大的值,例如194394543565 然后应用程序崩溃,因为无法分配那么多内存。我不期望这么大的值。通常我的消息小于 5000 字节并且服务器发送正确的长度(其他客户端工作,只有我的 iOS 客户端有时会崩溃)
  • 我唯一的解释是,对 istr.read 的两次调用可能是“重叠的”,因此第二次调用在消息中间的某处读取 4 个字节并将其解释为长度。这可能是因为第一次调用 read() 像往常一样需要更长的时间。我试图通过将消息读取包装到 self.inputStreamAccessQueue.sync(flags: .barrier) { ... } where private let inputStreamAccessQueue = DispatchQueue(label: "SynchronizedInputStreamAccess") // 不并发来避免这种情况!如果我是对的,这应该避免同时访问流。
  • 它又崩溃了。 bytes_expected contains 1886348902 控制台说---- malloc: *** mach_vm_map(size=1886351360) failed (error code=3) *** error: can't allocate region *** set a breakpoint in malloc_error_break to debug -- -- 并显示此错误:“线程 1:EXC_BREAKPOINT (code=EXC_ARM_BREAKPOINT, subcode=0xdefe)”在此行中:let buffer = UnsafeMutablePointer.allocate(capacity: bytes_expected)
  • 是因为你读错了数据,还是因为转换为UInt32错误? (我假设是前者。)如果实际读取了预期的字节数(它可能比您在套接字等通信通道上请求的字节数短),您应该检查 read() 的返回值。您还可以转储 4 个原始长度字节。

标签: ios swift sockets inputstream endianness


【解决方案1】:

我会这样做(准备粘贴到游乐场):

import Foundation

var stream = InputStream(data: Data([0,1,0,0]))
stream.open()
defer { stream.close() }

var buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
defer { buffer.deallocate(capacity: 4) }
guard stream.read(buffer, maxLength: 4) >= 4 else {
    // handle all cases: end of stream, error, waiting for more data to arrive...
    fatalError()
}
let number = UnsafeRawPointer(buffer).load(as: UInt32.self)
number // 256
number.littleEndian // 256
number.bigEndian // 65536

直接使用UnsafeRawPointer.load(无需显式重新绑定)对于琐碎类型according to the documentation 是安全的。 Trivial 类型通常是那些不需要 ARC 操作的类型。

或者,只要绑定类型和目标类型是普通类型,您就可以将相同的内存作为不同类型访问,而无需通过无类型内存访问重新绑定。

【讨论】:

  • 谢谢,这是一个优雅的解决方案!我也不知道 defer 但这似乎非常有用:) 从输入流中读取 UInt32 现在可以工作了。但是崩溃仍然发生,我想我找到了原因。有时(这只发生在“长”消息中,例如 5000 字节)长度被正确读取并存储到 bytes_expected 中。然后我调用 isr.read(maxLength: bytes_expected) 但这不会返回我预期的字节数(但是,可以说只有 2000)。所以我只阅读了消息的第一部分,然后返回。
  • 下次我从流中读取 bytes_expected 时,我会得到最后一条消息的其余部分并将其解释为下一条消息的长度(导致非常大的 bytes_expected 值),这会导致我的应用程序崩溃.所以我看到了两种可能的解决方案:1)找到一个不读取 MAXlength 字节但 EXACTlength 字节的方法。这样的方法存在吗? 2) 执行一个循环读取流,直到准确地读取 bytes_expected 字节。
  • 是的,您需要使用循环。这就是为什么该方法被命名为 read(:maxLength:) 而不是 read(:length:)。这是流 API 的一种非常常见的模式。该流在内部只有一个有限的缓冲区,因此如果您要求它提供比可用字节更多的字节,它将返回一个较小的数字。这也适用于我们在这里所做的较小的读取,因此您甚至应该使用循环来读取 4 个长度字节以确保。
  • 如何避免在每次 read() 后调用 InputStream.hasBytesAvailable 事件?如果我不能避免这种情况,那么我认为循环解决方案仍然不起作用,因为可以先处理其他调用。我不确定我的屏障在这里是否有帮助......
  • 我还编辑了我的问题并添加了我的完整当前代码
【解决方案2】:

我建议load(as:) 将缓冲区转换为UInt32,并且我会确保您明确说明字节顺序,例如

let value = try stream.read(type: UInt32.self, endianness: .little)

地点:

enum InputStreamError: Error {
    case readFailure
}

enum Endianness {
    case little
    case big
}

extension InputStream {
    func read<T: FixedWidthInteger>(type: T.Type, endianness: Endianness = .little) throws -> T {
        let size = MemoryLayout<T>.size
        var buffer = [UInt8](repeating: 0, count: size)
        let count = read(&buffer, maxLength: size)
        guard count == size else {
            throw InputStreamError.readFailure
        }

        return buffer.withUnsafeBytes { pointer -> T in
            switch endianness {
            case .little: return T(littleEndian: pointer.load(as: T.self))
            case .big:    return T(bigEndian: pointer.load(as: T.self))
            }
        }
    }

    func readFloat(endianness: Endianness) throws -> Float {
        return try Float(bitPattern: read(type: UInt32.self, with: endianness))
    }

    func readDouble(endianness: Endianness) throws -> Double {
        return try Double(bitPattern: read(type: UInt64.self, with: endianness))
    }
}

注意,我将read(type:endianness:) 设为泛型,因此它可以与任何标准整数类型一起重用。我还投了readFloatreadDouble 以作好衡量。

【讨论】: