K
You've got the NET 1.0 code that slightly evolved with the lymbd.Delegate mechanically replaced Actionand objectives (continued)new Thread().Start() mechanically replaced Task.Run()but in fact, there has been no change in the TPL with async/await, but the existence of concurrent collections has been overlooked.NET 4.5, this is all unnecessary in principle. If you need to perform the tasks strictly in one flow, you can simply put tasks in a single-point task planner (task scheduler). In fact, on this first line, the whole code ends. If the code is required for all operations and after all operations, the code before and after this line is written. I'll note that in this one-time exercise, you still have the possibility of using the token of annulment (cancellation token) and other joys of life.Let's see the example. Let us say that two sets of tasks must be carried out: in each line, the tasks are carried out consistently, but the priorities are carried out in parallel. We set up two planners with a limitation of parallelism, then we set up tasks with the planner we need.class Program
{
static readonly Random _rnd = new Random();
static readonly LimitedConcurrencyTaskScheduler _schedulerFoo =
new LimitedConcurrencyTaskScheduler(1);
static readonly LimitedConcurrencyTaskScheduler _schedulerBar =
new LimitedConcurrencyTaskScheduler(1);
static void Main () => new Program().Run().Wait();
async Task Run ()
{
Task queueFoo = RunQueue("Foo", _schedulerFoo,
Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Foo"))));
Task queueBar = RunQueue("Bar", _schedulerBar,
Enumerable.Range(0, 3).Select(i => (Action)(() => Foo("Bar"))));
await Task.WhenAll(queueFoo, queueBar);
Console.WriteLine("Done!");
Console.ReadKey();
}
async Task RunQueue (string name, TaskScheduler scheduler,
IEnumerable<Action> commands)
{
Console.WriteLine($"{name}: Start");
await Task.WhenAll(commands.Select(c => RunTask(c, scheduler)));
Console.WriteLine($"{name}: Finish");
}
async Task RunTask (Action task, TaskScheduler scheduler)
{
await Task.Factory.StartNew(task, CancellationToken.None,
TaskCreationOptions.None, scheduler);
}
void Foo (string name)
{
int timeout = _rnd.Next(200);
Console.WriteLine($"{name}: Start {timeout}");
Thread.Sleep(timeout);
Console.WriteLine($"{name}: Finish {timeout}");
}
}
Example of conclusion:Foo: Start
Bar: Start
Foo: Start 165
Bar: Start 50
Bar: Finish 50
Bar: Start 39
Bar: Finish 39
Bar: Start 115
Foo: Finish 165
Foo: Start 116
Bar: Finish 115
Bar: Finish
Foo: Finish 116
Foo: Start 0
Foo: Finish 0
Foo: Finish
Done!
While all tasks and challenges are to be pursued simultaneously in this example, tasks can be added at any time. The truth is then the meaning of the beginning and the end is a little lost. If you explain what you're going to do with them, then you can add.I used the planner here. LimitedConcurrencyTaskScheduler Examples of MSDN: https://code.msdn.microsoft.com/ParExtSamples (art. describing: http://blogs.msdn.com/b/pfxteam/archive/2010/04/09/9990424.aspx )/// <summary>
/// Provides a task scheduler that ensures a maximum concurrency level while running on top of the ThreadPool.
/// Source: http://code.msdn.microsoft.com/ParExtSamples
/// Documentation: http://blogs.msdn.com/b/pfxteam/archive/2010/04/09/9990424.aspx
/// License: MS-LPL
/// </summary>
public class LimitedConcurrencyTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
private readonly int _maxDegreeOfParallelism;
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
/// <summary>Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the specified degree of parallelism.</summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedConcurrencyTaskScheduler (int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1)
throw new ArgumentOutOfRangeException(nameof(maxDegreeOfParallelism));
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public override sealed int MaximumConcurrencyLevel => _maxDegreeOfParallelism;
protected override sealed IEnumerable<Task> GetScheduledTasks ()
{
bool lockTaken = false;
try {
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken)
return _tasks.ToArray();
else
throw new NotSupportedException();
}
finally {
if (lockTaken)
Monitor.Exit(_tasks);
}
}
protected override sealed void QueueTask (Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks) {
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism) {
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
/// <summary>Informs the ThreadPool that there's work to be executed for this scheduler.</summary>
private void NotifyThreadPoolOfPendingWork ()
{
ThreadPool.UnsafeQueueUserWorkItem(_ => {
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try {
// Process all available items in the queue.
while (true) {
Task item;
lock (_tasks) {
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0) {
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
TryExecuteTask(item);
}
}
finally {
// We're done processing items on the current thread
_currentThreadIsProcessingItems = false;
}
}, null);
}
protected override sealed bool TryExecuteTaskInline (Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems)
return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued)
TryDequeue(task);
// Try to run the task.
return TryExecuteTask(task);
}
protected override sealed bool TryDequeue (Task task)
{
lock (_tasks)
return _tasks.Remove(task);
}
}
Still available BlockingCollection<> c ConcurrentQueue<> inside. At the end of the appendix, it will not be forgotten. CompleteAdding♪There's still TPL Dataflow. There are parallels and others built without caste-based planners.You can add Rx as a cherry on a cake.Sea options.Actually, now. @Vlad He'll come in, tell us about the smart brains that can do this. :