How do you manage asynchronous equipment?



  • I have equipment connected to the PK through a sequential port. I'm using an object to work with him. Stream from SerialPort.BaseStream♪ So the interface logic will be identical to everything that works through. Stream
    There are two ways of communicating with the device:

    1. The generality of the Requesting State.
    2. Comparison of the Requesting State and possibility of obtaining data on the event of equipment.

    How do you put the logic of communication through asynchronicity? I wrote two options for implementation, but I don't like anyone.
    Option 1 through ManualResetEvent:

    public class TestSerialPortV1
    {
        private enum AnswerType
        {
            Executed,
            Busy,
            HardwareError,
            Disconnected,
            InterferenceInLine,
        }
        // Мой класс, для возвращения ответа потоку, запросившему инфу.
        private class HybridMRE<TResult>
        {
            private int _isReset;
            private ManualResetEvent _mre;
            private TResult _result;
    
        public bool IsReset
        {
            get { return _isReset == 1; }
        }
    
        public HybridMRE()
        {
            _isReset = 0;
            _mre = new ManualResetEvent(false);
        }
    
        public void Reset()
        {
            if (Interlocked.CompareExchange(ref _isReset, 1, 0) == 0)
                _mre.Reset();
        }
        public void Set(TResult result)
        {
            if (Interlocked.CompareExchange(ref _isReset, 0, 1) == 0)
            {
                _result = result;
                _mre.Set();
            }
        }
    
        public bool Wait(out TResult result)
        {
            if (_isReset == 1)
            {
                var resultOfWait = _mre.WaitOne();
                result = _result;
                return resultOfWait;
            }
            result = default(TResult);
            return true;
        }
        public bool Wait(int millisecondsTimeout, out TResult result)
        {
            if (_isReset == 1)
            {
                var resultOfWait = _mre.WaitOne(millisecondsTimeout);
                result = _result;
                return resultOfWait;
            }
            result = default(TResult);
            return true;
        }
        public bool Wait(TimeSpan timeout, out TResult result)
        {
            if (_isReset == 1)
            {
                var resultOfWait = _mre.WaitOne(timeout);
                result = _result;
                return resultOfWait;
            }
            result = default(TResult);
            return true;
        }
    }
    
    private SerialPort _port;
    
    // Буфер для хранения информации.
    private byte[] _bytesForBuffer;
    private int _inIndex;
    private int _outIndex;
    
    private HybridMRE&lt;AnswerType&gt; _mreForCommand1;
    
    public TestSerialPortV1()
    {
        _port = new SerialPort("COM1");
        _port.Open();
    
        _bytesForBuffer = new byte[200];
        _inIndex = 0;
        _outIndex = 0;
    
        _mreForCommand1 = new HybridMRE&lt;AnswerType&gt;();
    }
    
    private async Task ReceiveData()
    {
        try
        {
            while (true)
            {
                _outIndex += await _port.BaseStream.ReadAsync(_bytesForBuffer, _outIndex, _bytesForBuffer.Length - _outIndex);
    
                // Логика проверки на наличие целого ответа, иначе читаем далее.
    
                if (_mreForCommand1.IsReset)
                {
                    // Проверяем полученный ответ.
                    AnswerType answer = TestAnswerForCommand1();
                    _mreForCommand1.Set(answer);
                }
            }
        }
        catch (IOException) { }
    }
    
    private AnswerType TestAnswerForCommand1()
    {
        // Некая работа.
        return AnswerType.InterferenceInLine;
    }
    public async Task&lt;bool&gt; ExecuteComman1()
    {
        for (int i = 0; i &lt; 3; i++)
        {
            _mreForCommand1.Reset();
            byte[] command1 = new byte[5];
            command1[0] = 0x5A;
            command1[2] = 0x3;
            await _port.BaseStream.WriteAsync(command1, 0, 5);
            AnswerType answer;
            if (_mreForCommand1.Wait(1000, out answer))
            {
                switch(answer)
                {
                    case AnswerType.Executed:
                        return true;
                    case AnswerType.Busy:
                    case AnswerType.HardwareError:
                        return false;
                    case AnswerType.InterferenceInLine:
                        continue;
                }
            }
            else
            {
                answer = AnswerType.Disconnected;
                continue;
            }
        }
        return false;
    }
    

    }

    Example 2 through Task channel:

    public class TestSerialPortV2
    {
    private enum AnswerType
    {
    Executed,
    Busy,
    HardwareError,
    Disconnected,
    InterferenceInLine,
    }

    private SerialPort _port;
    
    private byte[] _bytesForBuffer;
    private int _inIndex;
    private int _outIndex;
    
    private Task&lt;AnswerType&gt; _taskForCommand1;
    
    public TestSerialPortV2()
    {
        _port = new SerialPort("COM3", 115200);
        _port.Open();
    
        _bytesForBuffer = new byte[200];
        _inIndex = 0;
        _outIndex = 0;
    
        _taskForCommand1 = Task.FromResult(AnswerType.Executed);
    
        var task = ReceiveData();
    }
    
    private async Task ReceiveData()
    {
        try
        {
            while (true)
            {
                int readedbytes = await _port.BaseStream.ReadAsync(_bytesForBuffer, _outIndex, _bytesForBuffer.Length - _outIndex);
                _outIndex += readedbytes;
    
                // Логика проверки на наличие целого ответа, иначе читаем далее.
    
                if (_taskForCommand1.Status == TaskStatus.Created)
                {
                    // Проверяем полученный ответ.
                    _taskForCommand1.Start();
                    await _taskForCommand1;
                }
    
                Console.WriteLine("Read something " + readedbytes);
            }
        }
        catch (IOException) { }
    }
    
    private AnswerType TestAnswerForCommand1()
    {
        Thread.Sleep(100);
        Console.WriteLine("CalculatingBytes");
        // Некая работа.
        return AnswerType.InterferenceInLine;
    }
    
    public async Task&lt;bool&gt; ExecuteComman1()
    {
        for (int i = 0; i &lt; 3; i++)
        {
            CancellationTokenSource cts = new CancellationTokenSource();
            _taskForCommand1 = new Task&lt;AnswerType&gt;(TestAnswerForCommand1, cts.Token);
            byte[] command1 = new byte[5];
            command1[0] = 0x5A;
            command1[2] = 0x3;
    
            await _port.BaseStream.WriteAsync(command1, 0, 5);
            //await _port.BaseStream.FlushAsync();
    
            cts.CancelAfter(10000);
            AnswerType answer;
            try
            {
                answer = await _taskForCommand1;
                switch (answer)
                {
                    case AnswerType.Executed:
                        return true;
                    case AnswerType.Busy:
                    case AnswerType.HardwareError:
                        return false;
                    case AnswerType.InterferenceInLine:
                        continue;
                }
            }
            catch (OperationCanceledException)
            {
                answer = AnswerType.Disconnected;
            }
        }
        return false;
    }
    

    }



  • In SerialPort, there's a DataReceived event that occurs when data from a consistent port are produced. Asynchronous port communication in 2 (Task) can be used https://msdn.microsoft.com/ru-ru/library/dd449174 and https://msdn.microsoft.com/ru-ru/library/system.threading.cancellationtokensource(v=vs.110).aspx ♪ Order of work:

    1. TaskCompletionSource and CancellationTokenSource shall be established upon request to the port
    2. For CancellationTokenSource via Token.Register() the revocation processor shall be added and time shall be displayed.
    3. The data are recorded at the port.
    4. The task is expected to be completed in the meantime for the TaskCompletionSource.
    5. When we get the data from the port, we try to process them.
    6. If successful, we will complete the task through the TaskCompletionSource.TrySetResult()
    7. In case of error, the task is completed through the TaskCompletionSource.TrySetException()
    8. In the case of a timemaut, Cancellation TokenSource will work.

    Example of class with the above-mentioned conduct:

    class ModemConnection : IDisposable
    {
        private readonly SerialPort _serialPort;
        private bool _disposed;
    
    private TaskCompletionSource&lt;Frame&gt; _taskCompletionSource = new TaskCompletionSource&lt;Frame&gt;();
    
    public TimeSpan Timeout { get; set; }
    
    public ModemConnection(string portName, int baudRate, Parity parity, int dataBits, StopBits stopBits)
    {
        _serialPort = new SerialPort(portName, baudRate, parity, dataBits, stopBits);
        _serialPort.DataReceived += _serialPort_DataReceived;
    
    }
    
    private async Task&lt;Frame&gt; SendFrame(Frame frame)
    {
        var cts = new CancellationTokenSource();
        _taskCompletionSource = new TaskCompletionSource&lt;Frame&gt;();
        cts.Token.Register(tcs =&gt; ((TaskCompletionSource&lt;Frame&gt;)tcs).TrySetCanceled(), _taskCompletionSource, false);
        cts.CancelAfter(Timeout);
    
        var bw = new BinaryWriter(_serialPort.BaseStream, Encoding.ASCII, true);
        frame.Write(bw);
        bw.Flush();
        Task.Run(() =&gt; OnOnFrameSent(frame));
    
        return await _taskCompletionSource.Task.ConfigureAwait(false);
    }
    
    private void _serialPort_DataReceived(object sender, SerialDataReceivedEventArgs e)
    {
        try
        {
            var r = new BinaryReader(new BufferedStream(_serialPort.BaseStream), Encoding.ASCII, true);
            var frame = new Frame();
            frame.Read(r);
            _taskCompletionSource?.TrySetResult(frame);
    
            Task.Run(() =&gt; OnOnFrameReceived(frame));
        }
        catch (Exception ex)
        {
            _taskCompletionSource?.TrySetException(ex);
        }
    }
    
    public event EventHandler&lt;Frame&gt; FrameReceived;
    
    protected virtual void OnOnFrameReceived(Frame e)
    {
        FrameReceived?.Invoke(this, e);
    }
    
    public event EventHandler&lt;Frame&gt; FrameSent;
    
    protected virtual void OnOnFrameSent(Frame e)
    {
        FrameSent?.Invoke(this, e);
    }
    
    public void Open()
    {
        if (_serialPort.IsOpen) return;
        _serialPort.Open();
    }
    
    public void Dispose()
    {
        if (_disposed) return;
        _disposed = true;
        _serialPort.Dispose();
    }
    

    }




Suggested Topics

  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2
  • 2