using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace Plane.Communication { /// /// 提供采用 UDP 协议通信的能力。 /// public abstract class UdpConnectionBase : ExceptionThrownEventSource, IConnection { private byte[] _currentByteArray; private int _currentIndex; private ConcurrentQueue _inputQueue = new ConcurrentQueue(); public bool IsOpen { get; protected set; } public abstract void Close(); public abstract Task OpenAsync(); public int BytesToRead() { return 0; } public async Task ReadAsync(byte[] buffer, int offset, int size) { if (!IsOpen) { return 0; } try { var indexLessThan = offset + size; for (int i = offset; i < indexLessThan; i++, _currentIndex++) { while (_currentByteArray == null || _currentIndex >= _currentByteArray.Length) { while (_inputQueue.Count <= 0) { if (!IsOpen) { return 0; } await Task.Delay(5).ConfigureAwait(false); } while (!_inputQueue.TryDequeue(out _currentByteArray)) { if (!IsOpen) { return 0; } await Task.Delay(5).ConfigureAwait(false); } _currentIndex = 0; } buffer[i] = _currentByteArray[_currentIndex]; } return size; } catch (Exception ex) { RaiseExceptionThrown(ex); return 0; } } public async Task WriteAsync(byte[] buffer, int offset, int count) { if (IsOpen) { try { if (offset == 0) { await SendAsync(buffer, count); } else { var data = new byte[count]; for (int i = 0, j = offset; i < count; i++, j++) { data[i] = buffer[j]; } await SendAsync(data, count); } } catch (Exception ex) { RaiseExceptionThrown(ex); } } } #if DEBUG && LOG_PACKETS private System.Text.StringBuilder _log = new System.Text.StringBuilder(); #endif internal void EnqueueDatagram(byte[] datagram) { if (datagram != null && IsOpen) { #if DEBUG && LOG_PACKETS _log.AppendLine("------------"); for (int i = 0; i < datagram.Length; i++) { _log.Append(datagram[i]).Append(" "); } _log.AppendLine(); #endif _inputQueue.Enqueue(datagram); } } protected abstract Task SendAsync(byte[] datagram, int bytes); } }