【发布时间】:2020-03-02 08:56:31
【问题描述】:
编辑:非常感谢@oleksa,但下面发布的代码中也存在其他问题。使用 Socket.ReceiveFromAsync 和 Socket.SendToAsync 方法时,您传递的 SocketAsyncEventArgs 对象必须设置其 RemoteEndPoint 属性(最好是客户端套接字的 RemoteEndPoint)。尽管文档声明此属性被忽略,但仍必须设置它,否则会发生异常。
根据 Microsoft 的文档,socket.SendToAsync 和 socket.ReceiveFromAsync 方法可以分别与“面向连接的协议”(SendToAsync Docs)和“字节流式套接字”(ReceiveFromAsync Docs)一起使用。
我的问题是,这些方法是否支持 TCP,如果支持,需要哪些先决条件才能开始发送和接收数据?
我添加了代码示例,以便更好地解释我的问题。我无法让客户端处理程序回显从客户端接收到的字节。当使用同步 SendTo 方法和接受的客户端套接字时,它没有问题,我可以回显字节。当使用异步版本(与 UDP 完美配合)时,客户端处理程序永远不会收到任何字节。使用服务器侦听器套接字或接受的客户端套接字(来自 SocketOperations.AcceptAsync)时,此行为是相同的。
一些代码:
public async Task ClientHandler(object clientArgsObj)
{
SocketAsyncEventArgs clientArgs = (SocketAsyncEventArgs)clientArgsObj;
byte[] receiveBuffer = new byte[Constants.PacketSize];
Memory<byte> receiveBufferMemory = new Memory<byte>(receiveBuffer);
Socket clientSocket = clientArgs.AcceptSocket;
EndPoint remoteEndPoint = clientArgs.AcceptSocket.RemoteEndPoint;
while (true)
{
ReceiveResult result = await ReceiveAsync(clientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[{result.RemoteEndPoint} > Server] : {Encoding.UTF8.GetString(result.Contents.Span)}");
int sentBytes = await SendAsync(result.ClientArgs, SocketFlags.None, receiveBufferMemory);
Console.WriteLine($"[Server > {result.RemoteEndPoint}] Sent {sentBytes} bytes to {result.RemoteEndPoint}");
// This bottom stuff works just fine
//int receivedBytes = clientSocket.ReceiveFrom(receiveBuffer, SocketFlags.None, ref remoteEndPoint);
//int sentBytes = clientSocket.SendTo(receiveBuffer, receivedBytes, SocketFlags.None, remoteEndPoint);
}
}
/// <inheritdoc />
public override async Task StartAsync()
{
socket.Listen(5);
while (true)
{
SocketAsyncEventArgs clientArgs = await SocketOperations.AcceptAsync(socket);
await Task.Factory.StartNew(ClientHandler, clientArgs, CancellationToken.None,
TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
private Task<ReceiveResult> ReceiveAsync(SocketAsyncEventArgs args, SocketFlags socketFlags, Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.ReceiveAsync(args.AcceptSocket, args, socketFlags, outputBuffer, cancellationToken);
//return SocketOperations.ReceiveAsync(socket, args, socketFlags, outputBuffer, cancellationToken);
}
private Task<int> SendAsync(SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
return SocketOperations.SendAsync(args.AcceptSocket, args, socketFlags, inputBuffer, cancellationToken);
//return SocketOperations.SendAsync(socket, args, socketFlags, inputBuffer, cancellationToken);
}
public sealed class AsyncSocketOperations
{
private const int MaximumPooledObjects = 10;
private readonly ObjectPool<SocketAsyncEventArgs> connectAsyncArgsPool;
private readonly ArrayPool<byte> receiveBufferPool;
private readonly ArrayPool<byte> receiveFromBufferPool;
private readonly ArrayPool<byte> sendBufferPool;
private readonly ArrayPool<byte> sendToBufferPool;
private readonly ObjectPool<SocketAsyncEventArgs> socketAsyncArgsPool;
private void HandleIOCompleted(object? sender, SocketAsyncEventArgs eventArgs)
{
bool closed = false;
switch (eventArgs.LastOperation)
{
case SocketAsyncOperation.SendTo:
AsyncWriteToken asyncSendToToken = (AsyncWriteToken)eventArgs.UserToken;
if (asyncSendToToken.CancellationToken.IsCancellationRequested)
{
asyncSendToToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncSendToToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncSendToToken.CompletionSource.SetResult(eventArgs.BytesTransferred);
}
}
sendToBufferPool.Return(asyncSendToToken.RentedBuffer, true);
break;
case SocketAsyncOperation.ReceiveFrom:
AsyncReadToken asyncReceiveFromToken = (AsyncReadToken)eventArgs.UserToken;
if (asyncReceiveFromToken.CancellationToken.IsCancellationRequested)
{
asyncReceiveFromToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncReceiveFromToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
eventArgs.MemoryBuffer.CopyTo(asyncReceiveFromToken.UserBuffer);
ReceiveResult result = new ReceiveResult(eventArgs, asyncReceiveFromToken.UserBuffer,
eventArgs.BytesTransferred, eventArgs.RemoteEndPoint);
asyncReceiveFromToken.CompletionSource.SetResult(result);
}
}
receiveFromBufferPool.Return(asyncReceiveFromToken.RentedBuffer, true);
break;
case SocketAsyncOperation.Disconnect:
closed = true;
break;
case SocketAsyncOperation.Accept:
AsyncAcceptToken asyncAcceptToken = (AsyncAcceptToken)eventArgs.UserToken;
if (asyncAcceptToken.CancellationToken.IsCancellationRequested)
{
asyncAcceptToken.CompletionSource.SetCanceled();
}
else
{
if (eventArgs.SocketError != SocketError.Success)
{
asyncAcceptToken.CompletionSource.SetException(
new SocketException((int)eventArgs.SocketError));
}
else
{
asyncAcceptToken.CompletionSource.SetResult(eventArgs);
}
}
connectAsyncArgsPool.Return(eventArgs);
break;
case SocketAsyncOperation.Connect:
case SocketAsyncOperation.ReceiveMessageFrom:
case SocketAsyncOperation.SendPackets:
case SocketAsyncOperation.None:
throw new NotImplementedException();
default:
throw new ArgumentOutOfRangeException();
}
if (closed)
{
// handle the client closing the connection on tcp servers at some point
}
}
private readonly struct AsyncAcceptToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<SocketAsyncEventArgs> CompletionSource;
public AsyncAcceptToken(TaskCompletionSource<SocketAsyncEventArgs> tcs, CancellationToken cancellationToken = default)
{
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncReadToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<ReceiveResult> CompletionSource;
public readonly byte[] RentedBuffer;
public readonly Memory<byte> UserBuffer;
public AsyncReadToken(byte[] rentedBuffer, Memory<byte> userBuffer, TaskCompletionSource<ReceiveResult> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
UserBuffer = userBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
private readonly struct AsyncWriteToken
{
public readonly CancellationToken CancellationToken;
public readonly TaskCompletionSource<int> CompletionSource;
public readonly byte[] RentedBuffer;
public AsyncWriteToken(byte[] rentedBuffer, TaskCompletionSource<int> tcs,
CancellationToken cancellationToken = default)
{
RentedBuffer = rentedBuffer;
CompletionSource = tcs;
CancellationToken = cancellationToken;
}
}
public AsyncSocketOperations(int bufferSize, int maxPooledObjectCount = MaximumPooledObjects, bool preallocateBuffers = false)
{
MaxPooledObjects = maxPooledObjectCount;
BufferSize = bufferSize;
sendBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
sendToBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
receiveFromBufferPool = ArrayPool<byte>.Create(bufferSize, maxPooledObjectCount);
connectAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
socketAsyncArgsPool = new LeakTrackingObjectPool<SocketAsyncEventArgs>(
new DefaultObjectPool<SocketAsyncEventArgs>(new DefaultPooledObjectPolicy<SocketAsyncEventArgs>(),
maxPooledObjectCount));
for (int i = 0; i < MaxPooledObjects; i++)
{
SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
connectArgs.Completed += HandleIOCompleted;
connectAsyncArgsPool.Return(connectArgs);
SocketAsyncEventArgs socketArgs = new SocketAsyncEventArgs();
socketArgs.Completed += HandleIOCompleted;
socketAsyncArgsPool.Return(socketArgs);
}
if (preallocateBuffers)
{
// TODO: Allocate and return array pool buffers
}
}
public int BufferSize { get; }
public int MaxPooledObjects { get; }
public Task<SocketAsyncEventArgs> AcceptAsync(Socket socket, CancellationToken cancellationToken = default)
{
TaskCompletionSource<SocketAsyncEventArgs> tcs = new TaskCompletionSource<SocketAsyncEventArgs>();
SocketAsyncEventArgs args = connectAsyncArgsPool.Get();
args.AcceptSocket = null;
args.UserToken = new AsyncAcceptToken(tcs, cancellationToken);
if (socket.AcceptAsync(args)) return tcs.Task;
return Task.FromResult(args);
}
public Task<ReceiveResult> ReceiveFromAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> outputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<ReceiveResult> tcs = new TaskCompletionSource<ReceiveResult>();
byte[] rentedReceiveFromBuffer = receiveFromBufferPool.Rent(BufferSize);
Memory<byte> rentedReceiveFromBufferMemory = new Memory<byte>(rentedReceiveFromBuffer);
args.SetBuffer(rentedReceiveFromBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncReadToken(rentedReceiveFromBuffer, outputBuffer, tcs, cancellationToken);
// if the receive operation doesn't complete synchronously, returns the awaitable task
if (socket.ReceiveFromAsync(args)) return tcs.Task;
args.MemoryBuffer.CopyTo(outputBuffer);
ReceiveResult result = new ReceiveResult(args, outputBuffer, args.BytesTransferred, args.RemoteEndPoint);
receiveFromBufferPool.Return(rentedReceiveFromBuffer, true);
return Task.FromResult(result);
}
public Task<int> SendToAsync(Socket socket, SocketAsyncEventArgs args, SocketFlags socketFlags,
Memory<byte> inputBuffer, CancellationToken cancellationToken = default)
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
byte[] rentedSendToBuffer = sendToBufferPool.Rent(BufferSize);
Memory<byte> rentedSendToBufferMemory = new Memory<byte>(rentedSendToBuffer);
inputBuffer.CopyTo(rentedSendToBufferMemory);
args.SetBuffer(rentedSendToBufferMemory);
args.SocketFlags = socketFlags;
args.UserToken = new AsyncWriteToken(rentedSendToBuffer, tcs, cancellationToken);
// if the send operation doesn't complete synchronously, return the awaitable task
if (socket.SendToAsync(args)) return tcs.Task;
int result = args.BytesTransferred;
sendToBufferPool.Return(rentedSendToBuffer, true);
return Task.FromResult(result);
}
【问题讨论】:
-
即使只是阅读Wikipedia page 顶部的摘要 也是一个好的开始。
-
页面与这些方法有什么特别的关系?只需回答问题 将是一个好的开始。 :^)
-
是的,当构造函数看起来像这样时,套接字支持 TCP: Socket tempSocket = new Socket(ipe.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
标签: c# sockets asynchronous tcp