
我正在接受来自不同客户端的多个tcp连接的ac#应用程序(.net 4)。 有一个接受套接字的tcp监听器。 B / W双工通讯。 数据使用Networkstream.Write方法发送,并使用Networkstream.read方法读取。 对于每个TCP连接,创build一个单独的线程。

问题是,前几天我们注意到其中一个客户停止了20分钟的数据读取(由于bug)。 由于连接没有中断,服务器上没有(IO)exception。 但是,我们注意到其他客户的数据也没有进行。 20分钟后,客户再次开始接收数据,很快其他客户也开始接收数据。

我知道networkingstream的写入方法是一种阻塞方法,我们没有使用任何超时。 所以写入已经被阻塞了( 在这里描述)。 但据我了解,每个TCP连接必须有一个单独的写入缓冲区,或者还有更多的东西在播放。 可以在TCP连接发送阻塞,影响同一应用程序中的其他TCP连接?

这里是写操作的伪代码。 对于每个连接,都有一个独立的线程进行单独的传出队列处理。

public class TCPServerListener : baseConnection { private readonly int _Port; private TcpListener _tcpListener; private Thread _thread; private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>(); private long _messageDiscardTimeout; private bool LoopForClientConnection = true; public TCPServerListener(int port, ThreadPriority threadPriority) { try { // init property } catch (Exception ex) { // log } } public void SendMessageToAll(int type) { base.EnqueueMessageToSend(type, _tcpClientDataList); } public void SendMessageToList(int type, IList<TcpClient> tcpClientList) { base.EnqueueMessageToSend(type, tcpClientList); } public void SendMessage(int type, TcpClient tcpClient) { base.EnqueueMessageToSend(type, tcpClient); } private void AcceptClientConnections() { while (LoopForClientConnection) { try { Socket socket = _tcpListener.AcceptSocket(); TcpClientData tcpClientData = new TcpClientData(); tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync)); tcpClientData.tcpClientThread.Priority = _threadPriority; tcpClientData.tcpClientThread.IsBackground = true; tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId; tcpClientData.tcpClient = new TcpClient(); tcpClientData.tcpClient.Client = socket; _tcpClientDataList.Add(tcpClientData); tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient); } catch (ThreadAbortException ex) { //log } catch (Exception ex) { //log } } } public override void Start() { base.Start(); _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port); _thread = new Thread(AcceptClientConnections); _thread.Priority = _threadPriority; _thread.IsBackground = true; _tcpListener.Start(); _thread.Start(); } public override void Stop() { // stop listener and terminate threads } } public class baseConnection { private Thread _InCommingThread; private Thread _OutGoingThread; protected ThreadPriority _threadPriority; protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>(); protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>(); public void StartAsync(Object oTcpClient) { TcpClient tcpClient = oTcpClient as TcpClient; if (tcpClient == null) return; using (tcpClient) { using (NetworkStream stream = tcpClient.GetStream()) { stream.ReadTimeout = Timeout.Infinite; stream.WriteTimeout = Timeout.Infinite; BinaryReader bodyReader = new BinaryReader(stream); while (tcpClient.Connected) { try { int messageType = bodyReader.ReadInt32(); // checks to verify messages // enqueue message in incoming queue _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient)); } catch (EndOfStreamException ex) { // log break; } catch (Exception ex) { // log Thread.Sleep(100); } } //RaiseDisconnected(tcpClient); } } } public virtual void Start() { _InCommingThread = new Thread(HandleInCommingMessnge); _InCommingThread.Priority = _threadPriority; _InCommingThread.IsBackground = true; _InCommingThread.Start(); _OutGoingThread = new Thread(HandleOutgoingQueue); _OutGoingThread.Priority = _threadPriority; _OutGoingThread.IsBackground = true; _OutGoingThread.Start(); } public virtual void Stop() { // stop the threads and free up resources } protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList) { tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient))); } protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList) { foreach (TcpClient tcpClient in tcpClientList) { _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient)); } } protected void EnqueueMessageToSend(int type, TcpClient tcpClient) { _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient)); } private void HandleOutgoingQueue() { while (true) { try { MessageToSend message = _OutgoingMessageQueue.Take(); if (message.tcpClient.Connected) { BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream()); writer.Write(message.type); } } catch (ThreadAbortException ex) { // log return; } catch (Exception ex) { //_logger.Error(ex.Message, ex); } } } private void HandleInCommingMessnge() { while (true) { try { MessageReceived messageReceived = _InComingMessageQueue.Take(); // handle message } catch (ThreadAbortException ex) { // log return; } catch (Exception ex) { // log //_logger.Error(ex.Message, ex); } } } public class MessageReceived { public MessageReceived(int type, TcpClient tcpClient) { this.tcpClient = tcpClient; this.type = type; } public int type; public TcpClient tcpClient; } public class MessageToSend { public MessageToSend(int type, TcpClient tcpClient) { this.tcpClient = tcpClient; this.type = type; } public int type; public TcpClient tcpClient; } public class TcpClientData { public Thread tcpClientThread; public TcpClient tcpClient; } } 


如果此代码在多个线程上运行,则当每个线程正试图向阻塞连接发送消息时,程序将立即阻止。 如果此循环在多个线程上运行,则可能会遇到的另一个问题是消息可能无法以相同连接的正确顺序到达。