MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      

Parallel Aggregation

Bearbeiten
  • aka. Parallel Reduction
  • operation must be associative
    • order of operation is not relevant
  • avoids synchronisation for aggregates as with Parallel.For or Parallel.ForEach
    • however, Parallel.For or Parallel.ForEach have overloaded versions for aggregation
Serial LINQ PLINQ
double[] sequence = ...
double sum = 0.0d;

for(var i = 0; i < sequence.Length; i++)
{
   sum += Normalize(sequence[i]);
}

return sum;
double[] sequence = ...

return sequence.Select(x => s.Normalize(x)).Sum();
double[] sequence = ...

return sequence.AsParallel().Select(x => s.Normalize(x)).Sum();
double[] sequence = ...

return sequence.AsParallel().Select(x => s.Normalize(x)).Aggregate(1.0d, (y1, y2) => y1 * y2);

Multiset Union

Bearbeiten
Serial PLINQ
public IdMulstisetItemList PotentialFriendsSequential(SubscriberId id, int maxCandidates)
{
   // Map
   var foafsList = new List<IdMultiset>();
   
   foreach(SubscriberId friend in subscribers[id].Friends)
   {
      var foafs = subscribers[friend].FriendsCopy();
      foafs.RemoveWhere(foaf => foaf == id || subscribers[id].Friends.Contains(foafs));
      foafsList.Add(Multiset.Create(foafs));
   }

   // Reduce
   var candidates = new IdMultiset();
   
   foreach(IdMultiset foafs in foafsList)
   {
      candidates = Multiset.Union(foafs, candidates);
   }

   // Postprocess
   return Multiset.MostNumerous(candidates, maxCandidates);
}
public IdMultisetItemList PotentialFriendsPlinq(SubscriberId id, int maxCandidates)
{
   var candidates = subscribers[id].Friends.AsParallel()
      .SelectMany(friend => subscribers[friend].Friends)
      .Where(foaf => !(foaf == id || subscribers[id].Friends.Contains(foaf)))
      .GroupBy(foaf => foaf)
      .Select(foafGroup => new IdMultisetItem(foafGroup.Key, foafGroup.Count()));

   return Multiset.MostNumerous(candidates, maxCandidates);

}

Parallel Loops for Aggregation

Bearbeiten
Parallel Loop PLINQ
double[] sequence = ...
var lockObject = new object();
var sum = 0.0d;

Parallel.ForEach(sequence, 
   () => 0.0d, // init
   (x, loopState, partialResult) => // partial sums
   { 
      return Normalize(x) + partialResult; 
   }, 
   localPartialSum => // sum of partial sums
   {
      lock(lockObject)
      {
         sum += localPartialSum;
      }
   });

return sum;
double[] sequence = ...

return sequence.AsParallel().Select(Normalize).Sum();

Partitioner

Bearbeiten
double[] sequence = ...
var lockObject = new object();
var sum = 0.0d;
var rangePartitioner = Partitioner.Create(0, sequence.Length);

Parallel.ForEach(rangePartitioner, 
   () => 0.0d, // init
   (range, loopState, initialValue) => // partial sums
   {
      double partialSum = initialValue;

      for(var i = range.Item1; i < range.Item2; i++)
      {
         partialSum += Normalize(sequence[i]);
      }
      
      return partialSum; 
   }, 
   localPartialSum => // sum of partial sums
   {
      lock(lockObject)
      {
         sum += localPartialSum;
      }
   });

return sum;

PLINQ Aggregation with Range

Bearbeiten
  • lock free
  • more scaleable
int[] histogramm = MakeEmptyHistogram();

return ParallelEnumerable.Range(0, count).Aggregate(
   // accumulator seed
   () => new Tuple<int[], Random>(MakeEmptyHistogram(), new Random(SampleUtilities.MakeRandomSeed())), 
   // run simulation and add result to local accumulator
   (localAccumulator, i) => 
   {
      var sample = localAccumulator.Item2.NextDouble(); 
      
      if(sample > 0.0 && sample < 1.0)
      {
         var simulationResult = DoSimulation(sample, mean, stdDev);
         int histogramBucket = (int)Math.Floor(simulationResult / BucketSize);
         
         if(0 <= histogramBucket && histrogramBucket < TableSize)
         {
            localAccumulator.Item1[histogramBucket] += 1;
         }
      }

      return localAccumulator;
   }, 
   // combine local results
   (localAccumulator1, localAccumulator2) => 
   {
      var combinedHistograms = CombineHistograms(localAccumulator1.Item1, localAccumulator2.Item1);
      return new Tuple<int[], Random>(combinedHistograms, null);
   }, 
   // global result
   finalAccumulator => finalAccumulator.Item1;
);