Plane.Sdk3/PlaneGcsSdk_Shared/Communication/UdpConnectionBase.cs
2017-02-27 02:02:19 +08:00

116 lines
3.4 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace Plane.Communication
{
/// <summary>
/// 提供采用 UDP 协议通信的能力。
/// </summary>
public abstract class UdpConnectionBase : ExceptionThrownEventSource, IConnection
{
private byte[] _currentByteArray;
private int _currentIndex;
private ConcurrentQueue<byte[]> _inputQueue = new ConcurrentQueue<byte[]>();
public bool IsOpen { get; protected set; }
public abstract void Close();
public abstract Task OpenAsync();
public async Task<int> ReadAsync(byte[] buffer, int offset, int size)
{
if (!IsOpen)
{
return 0;
}
try
{
var indexLessThan = offset + size;
for (int i = offset; i < indexLessThan; i++, _currentIndex++)
{
while (_currentByteArray == null || _currentIndex >= _currentByteArray.Length)
{
while (_inputQueue.Count <= 0)
{
if (!IsOpen)
{
return 0;
}
await Task.Delay(5).ConfigureAwait(false);
}
while (!_inputQueue.TryDequeue(out _currentByteArray))
{
if (!IsOpen)
{
return 0;
}
await Task.Delay(5).ConfigureAwait(false);
}
_currentIndex = 0;
}
buffer[i] = _currentByteArray[_currentIndex];
}
return size;
}
catch (Exception ex)
{
RaiseExceptionThrown(ex);
return 0;
}
}
public async Task WriteAsync(byte[] buffer, int offset, int count)
{
if (IsOpen)
{
try
{
if (offset == 0)
{
await SendAsync(buffer, count);
}
else
{
var data = new byte[count];
for (int i = 0, j = offset; i < count; i++, j++)
{
data[i] = buffer[j];
}
await SendAsync(data, count);
}
}
catch (Exception ex)
{
RaiseExceptionThrown(ex);
}
}
}
#if DEBUG && LOG_PACKETS
private System.Text.StringBuilder _log = new System.Text.StringBuilder();
#endif
internal void EnqueueDatagram(byte[] datagram)
{
if (datagram != null && IsOpen)
{
#if DEBUG && LOG_PACKETS
_log.AppendLine("------------");
for (int i = 0; i < datagram.Length; i++)
{
_log.Append(datagram[i]).Append(" ");
}
_log.AppendLine();
#endif
_inputQueue.Enqueue(datagram);
}
}
protected abstract Task SendAsync(byte[] datagram, int bytes);
}
}