MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      

Parallel Tasks

Bearbeiten
  • Task.Factory to create and schedule new tasks
  • use task.StartNew() for fork
  • use task.Wait() for join
  • may not start immediately; depend on concurrent scheduler
  • exceptions are cached and rethrown when all tasks are finished

Parallel Invoke

Bearbeiten
Parallel.Invoke(Method1, Method2, /*...*/);

equals to

Task task1 = Task.Factory.StartNew(Method1);
Task task2 = TaskFactory.StartNew(Method2);
// ...
Task.WaitAll(task1, task2, /*...*/);

Task Cancellation Token

Bearbeiten
using(var cancellationTokenSource = new CancellationTokenSource())
{
   var cancellationToken = cancellationTokenSource.Token;
   
   var task = Task.Factory.StartNew(() => 
   {
      for(/*...*/)
      {
         cancellationToken.ThrowIfCancellationRequested();
         // ...
      }
   }, cancellationToken);
   
   cancellationTokenSource.Cancel();
}

Exception Handling

Bearbeiten
  • test with task.Status == TaskStatus.Faulted for exceptions
  • also test task.Status == TaskStatus.Canceled for cancellation
  • exceptions are stored in aggregateException.InnerExceptions
  • aggregateException.Handle is called for each InnerException
try
{
   var task = Task.Factory.StartNew( () => /*...*/ );
   // ...
   task.Wait(); // join
}
catch(AggregateException aggregateException)
{
   aggregateException.Handle(exception => 
   {
      if(exception is /* ... */)
      {
         // handle exception
         return true; // exception was handled
      }

      return false; // exception was not handled
   });
}
  • Task.Wait()
  • Task.WaitAll(task1, task2, /* ... */);
  • Task.WaitAny(task1, task2, /* ... */);
    Note: Task.WaitAny does not throw exceptions; tasks must be checked explicitly!

Speculative Execution

Bearbeiten
public static void SpeculativeInvoke(params Action<CancellationToken>[] actions)
{
   var cancellationTokenSource = new CancellationTokenSource();
   var token = cancellationTokenSource.Token();
   var tasks = actions.Select(action => Task.Factory.StartNew(() => action(token), token));
   Task.WaitAny(tasks);
   cancellationTokenSource.Cancel();
   
   try
   {
      Task.WaitAll(tasks);
   }
   catch (AggregateException aggregateException)
   {
      // remove OperationCanceledException
      aggregateException.Flatten().Handle(e => e is OperationCanceledException);
   }
   finally
   {
      if(cancellationTokenSource != null)
      {
         cancellationTokenSource.Dispose();
      }
   }
}

Custom Thread Scheduling

Bearbeiten
  • implement custom thread scheduling when
    • library impose thread affinity constraints (ie. WCF), or
    • load balancing does not work well for a given application
Default Schedulers
// current scheduler
TaskScheduler.Current 

// default scheduler
TaskScheduler.Default 

// scheduler for syncronizing with synchronization context of the current thread
// does not work with main thread
TaskScheduler.FromCurrentSyncronizationContext
Custom Scheduler

Example is part of the ParallelExtensionExtras[1]

Antipatterns

Bearbeiten

Variables Captured by Closures

Bearbeiten
Do Don't
for(int i = 0; i < 4; i++)
{
   var tmp = i;
   Task.Factory.StartNew(() => Console.WriteLine(tmp));
}
for(int i = 0; i < 4; i++)
{
   // thread may start later and output only 4's
   Task.Factory.StartNew(() => Console.WriteLine(i)); 
}

Disposing a Resource Needed by a Task

Bearbeiten
Task<string> task;
using(var file = new StringReader("text"))
{
   task = Task<string>.Factory.StartNew(() => file.ReadLine());
}

Console.WriteLine(task.Result);

Thread Abort

Bearbeiten
  • Terminating task with Thread.Abort may bring the application in a possibly unstable state

Task Design

Bearbeiten
  • Scheduler can invoke a delegate when a task is starting

Life Cycle

Bearbeiten
TaskStatus
  • WaitingToRun → Running, Canceled
  • Running → Canceled, Faulted, RunToCompletion

Thread Pool

Bearbeiten
  • Application has a singleton queue, which can only be accessed by one thread at a time. Does not scale well on many processors.
    • Implemented as FIFO
  • Each Task has its own thread pool, additional to the global thread pool. Does scale.
    • Implemented as work-stealing queue
    • private and public end
    • private end has no synchronisation (cheap)
    • public end has synchronization (costly)
    • Tasks steals threads from other tasks, when their private queue and global queue is empty

Performance considerations

Bearbeiten
  • Task switch takes 6000 to 8.000 CPU cycles
  • Task creation takes 200.000 CPU cycles
  • Task consumes ~1MB on the stack
  • Decompose application into short tasks, so that the thread pool can optimize heuristics
  • Use simple custom thread scheduler with limited concurrency when task is long running[2]
  • Use ThreadePool.SetMaxThreads as a last resort
    • ThreadePool.SetMaxThreads = Environment.ProcessorCount by default

Long Running Task

Bearbeiten
  • use TaskCreationOptions.LongRunning to create a new task that is not from the ThreadPool for lengthy I/O operations

Referenzen

Bearbeiten
  1. Stephen Toub: A Tour of ParallelExtensionsExtras. In: Parallel Programming with .NET, MSDN. Microsoft, 4. April 2010, abgerufen am 11. Juli 2014.
  2. How to: Create a Task Scheduler That Limits Concurrency. In: MSDN. Microsoft, abgerufen am 28. Juli 2014 (englisch).