How do you manage asynchronous equipment?
-
I have equipment connected to the PK through a sequential port. I'm using an object to work with him.
Stream
fromSerialPort.BaseStream
♪ So the interface logic will be identical to everything that works through.Stream
♪
There are two ways of communicating with the device:- The generality of the Requesting State.
- Comparison of the Requesting State and possibility of obtaining data on the event of equipment.
How do you put the logic of communication through asynchronicity? I wrote two options for implementation, but I don't like anyone.
Option 1 through ManualResetEvent:public class TestSerialPortV1 { private enum AnswerType { Executed, Busy, HardwareError, Disconnected, InterferenceInLine, } // Мой класс, для возвращения ответа потоку, запросившему инфу. private class HybridMRE<TResult> { private int _isReset; private ManualResetEvent _mre; private TResult _result;
public bool IsReset { get { return _isReset == 1; } } public HybridMRE() { _isReset = 0; _mre = new ManualResetEvent(false); } public void Reset() { if (Interlocked.CompareExchange(ref _isReset, 1, 0) == 0) _mre.Reset(); } public void Set(TResult result) { if (Interlocked.CompareExchange(ref _isReset, 0, 1) == 0) { _result = result; _mre.Set(); } } public bool Wait(out TResult result) { if (_isReset == 1) { var resultOfWait = _mre.WaitOne(); result = _result; return resultOfWait; } result = default(TResult); return true; } public bool Wait(int millisecondsTimeout, out TResult result) { if (_isReset == 1) { var resultOfWait = _mre.WaitOne(millisecondsTimeout); result = _result; return resultOfWait; } result = default(TResult); return true; } public bool Wait(TimeSpan timeout, out TResult result) { if (_isReset == 1) { var resultOfWait = _mre.WaitOne(timeout); result = _result; return resultOfWait; } result = default(TResult); return true; } } private SerialPort _port; // Буфер для хранения информации. private byte[] _bytesForBuffer; private int _inIndex; private int _outIndex; private HybridMRE<AnswerType> _mreForCommand1; public TestSerialPortV1() { _port = new SerialPort("COM1"); _port.Open(); _bytesForBuffer = new byte[200]; _inIndex = 0; _outIndex = 0; _mreForCommand1 = new HybridMRE<AnswerType>(); } private async Task ReceiveData() { try { while (true) { _outIndex += await _port.BaseStream.ReadAsync(_bytesForBuffer, _outIndex, _bytesForBuffer.Length - _outIndex); // Логика проверки на наличие целого ответа, иначе читаем далее. if (_mreForCommand1.IsReset) { // Проверяем полученный ответ. AnswerType answer = TestAnswerForCommand1(); _mreForCommand1.Set(answer); } } } catch (IOException) { } } private AnswerType TestAnswerForCommand1() { // Некая работа. return AnswerType.InterferenceInLine; } public async Task<bool> ExecuteComman1() { for (int i = 0; i < 3; i++) { _mreForCommand1.Reset(); byte[] command1 = new byte[5]; command1[0] = 0x5A; command1[2] = 0x3; await _port.BaseStream.WriteAsync(command1, 0, 5); AnswerType answer; if (_mreForCommand1.Wait(1000, out answer)) { switch(answer) { case AnswerType.Executed: return true; case AnswerType.Busy: case AnswerType.HardwareError: return false; case AnswerType.InterferenceInLine: continue; } } else { answer = AnswerType.Disconnected; continue; } } return false; }
}
Example 2 through Task channel:
public class TestSerialPortV2
{
private enum AnswerType
{
Executed,
Busy,
HardwareError,
Disconnected,
InterferenceInLine,
}private SerialPort _port; private byte[] _bytesForBuffer; private int _inIndex; private int _outIndex; private Task<AnswerType> _taskForCommand1; public TestSerialPortV2() { _port = new SerialPort("COM3", 115200); _port.Open(); _bytesForBuffer = new byte[200]; _inIndex = 0; _outIndex = 0; _taskForCommand1 = Task.FromResult(AnswerType.Executed); var task = ReceiveData(); } private async Task ReceiveData() { try { while (true) { int readedbytes = await _port.BaseStream.ReadAsync(_bytesForBuffer, _outIndex, _bytesForBuffer.Length - _outIndex); _outIndex += readedbytes; // Логика проверки на наличие целого ответа, иначе читаем далее. if (_taskForCommand1.Status == TaskStatus.Created) { // Проверяем полученный ответ. _taskForCommand1.Start(); await _taskForCommand1; } Console.WriteLine("Read something " + readedbytes); } } catch (IOException) { } } private AnswerType TestAnswerForCommand1() { Thread.Sleep(100); Console.WriteLine("CalculatingBytes"); // Некая работа. return AnswerType.InterferenceInLine; } public async Task<bool> ExecuteComman1() { for (int i = 0; i < 3; i++) { CancellationTokenSource cts = new CancellationTokenSource(); _taskForCommand1 = new Task<AnswerType>(TestAnswerForCommand1, cts.Token); byte[] command1 = new byte[5]; command1[0] = 0x5A; command1[2] = 0x3; await _port.BaseStream.WriteAsync(command1, 0, 5); //await _port.BaseStream.FlushAsync(); cts.CancelAfter(10000); AnswerType answer; try { answer = await _taskForCommand1; switch (answer) { case AnswerType.Executed: return true; case AnswerType.Busy: case AnswerType.HardwareError: return false; case AnswerType.InterferenceInLine: continue; } } catch (OperationCanceledException) { answer = AnswerType.Disconnected; } } return false; }
}
-
In SerialPort, there's a DataReceived event that occurs when data from a consistent port are produced. Asynchronous port communication in 2 (Task) can be used https://msdn.microsoft.com/ru-ru/library/dd449174 and https://msdn.microsoft.com/ru-ru/library/system.threading.cancellationtokensource(v=vs.110).aspx ♪ Order of work:
- TaskCompletionSource and CancellationTokenSource shall be established upon request to the port
- For CancellationTokenSource via Token.Register() the revocation processor shall be added and time shall be displayed.
- The data are recorded at the port.
- The task is expected to be completed in the meantime for the TaskCompletionSource.
- When we get the data from the port, we try to process them.
- If successful, we will complete the task through the TaskCompletionSource.TrySetResult()
- In case of error, the task is completed through the TaskCompletionSource.TrySetException()
- In the case of a timemaut, Cancellation TokenSource will work.
Example of class with the above-mentioned conduct:
class ModemConnection : IDisposable { private readonly SerialPort _serialPort; private bool _disposed;
private TaskCompletionSource<Frame> _taskCompletionSource = new TaskCompletionSource<Frame>(); public TimeSpan Timeout { get; set; } public ModemConnection(string portName, int baudRate, Parity parity, int dataBits, StopBits stopBits) { _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits); _serialPort.DataReceived += _serialPort_DataReceived; } private async Task<Frame> SendFrame(Frame frame) { var cts = new CancellationTokenSource(); _taskCompletionSource = new TaskCompletionSource<Frame>(); cts.Token.Register(tcs => ((TaskCompletionSource<Frame>)tcs).TrySetCanceled(), _taskCompletionSource, false); cts.CancelAfter(Timeout); var bw = new BinaryWriter(_serialPort.BaseStream, Encoding.ASCII, true); frame.Write(bw); bw.Flush(); Task.Run(() => OnOnFrameSent(frame)); return await _taskCompletionSource.Task.ConfigureAwait(false); } private void _serialPort_DataReceived(object sender, SerialDataReceivedEventArgs e) { try { var r = new BinaryReader(new BufferedStream(_serialPort.BaseStream), Encoding.ASCII, true); var frame = new Frame(); frame.Read(r); _taskCompletionSource?.TrySetResult(frame); Task.Run(() => OnOnFrameReceived(frame)); } catch (Exception ex) { _taskCompletionSource?.TrySetException(ex); } } public event EventHandler<Frame> FrameReceived; protected virtual void OnOnFrameReceived(Frame e) { FrameReceived?.Invoke(this, e); } public event EventHandler<Frame> FrameSent; protected virtual void OnOnFrameSent(Frame e) { FrameSent?.Invoke(this, e); } public void Open() { if (_serialPort.IsOpen) return; _serialPort.Open(); } public void Dispose() { if (_disposed) return; _disposed = true; _serialPort.Dispose(); }
}