MovGP0        Über mich        Hilfen        Artikel        Weblinks        Literatur        Zitate        Notizen        Programmierung        MSCert        Physik      


Reactive Extensions

Bearbeiten

Overview

Bearbeiten
LINQ vs. Rx[1]
Synchronous Asynchronous
Single value
// Synchronous call
int x = f(42);

// Synchronous call
int y = g(x);
// Asynchronous call
int x = await fAsync(42);

// Asynchronous call
int y = await gAsync(x);
Multi value
// Shopping database
var res = from o on orders
          from p in p.products
          where p.Price > 29.95m
          select p.Name

// Synchronous MoveNext
res.ToList().ForEach(p => Console.WriteLine(p));
// Stock trade events
var res = from t in trades
          from q in t.Symbols
          where q.QUote > 29.95m
          select q.Symbol

// Asynchronous OnNext
res.Subscribe(p => Console.WriteLine(p));

Interfaces

Bearbeiten
IObservable<T> vs. ISubject<T> vs. IObserver<T>
Wichtige Interfaces
Interface Bedeutung
IObservable<T>
  • IDisposable Subscribe(IObserver<T>)
  • IDisposable Subscribe(Action<T>)-Extension method

siehe auch: Disposable.Empty

IObserver<T> wird der Observablen übergeben
  • OnNext(T)
  • OnError(Exception)
  • OnCompleted()
ISubject<T> an Observer<T> and Observable<T> at the same time.
Subject<T> Observer bekommt alle Daten ab dem Zeitpuntk der Subscription
ReplaySubject<T>
  • Cached alle Daten und übergibt sie dem Observer zum Zeitpunkt der Registrierung
  • Cache kann über maximale Anzahl der zwischengespeicherten Elemente bzw. Cachedauer gesteuert werden
BehaviourSubject<T> Wie ReplySubject, hat zum Zeitpunkt der Subscription jedoch garantiert einen Wert
AsyncSubject<T> Immer genau ein Wert. Observer wird einmalig aufgerufen sobald ein Wert übergeben wird.

Factory Methods

Bearbeiten
Factory Methods to create an IObservable<T>
Observable.Return<T>(T) takes a value and returns only that value once.
Observable.Empty<T>() completes without return anything.
Observable.Never<T>() infinite sequence without notifications
Observable.Throw<T>(Exception) throws an exception without returning any value
Observable.Create<T>(observer => { ... }) Factoty method to create an observable

Observable.Create

Bearbeiten
Create Blocking Observable Create Non-Blocking Observable
private static IObservable<string> NonBlocking()
{
   var subject = new ReplaySubject<string>();
   subject.OnNext("a");
   subject.OnNext("b");
   subject.OnCompleted();
   Thread.Sleep(1000);
   return subject;
}
private static IObservable<string> NonBlocking()
{
   return Observable.Create<string>(subject =>{
      subject.OnNext("a");
      subject.OnNext("b");
      subject.OnCompleted();
      Thread.Sleep(1000);
      return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed."));
   });
}
Observable.Empty Observable.Return
public static IObservable<T> Empty<T>()
{
   return Observable.Create<T>(observer => {
      observer.OnCompleted();
      return Disposable.Empty;
   });
}
public static IObservable<T> Return<T>(T value)
{
   return Observable.Create<T>(observer => {
      observer.OnNext(value);
      observer.OnCompleted();
      return Disposable.Empty;
   });
}
Observable.Never Observable.Throws
public static IObservable<T> Never<T>()
{
   return Observable.Create<T>(observer => {
      return Disposable.Empty;
   });
}
public static IObservable<T> Throws<T>(Exception exception)
{
   return Observable.Create<T>(observer => {
      observer.OnError(exception);
      return Disposable.Empty;
   });
}
Observable.Intervall Observable.Timer
public static IObservable<long> Intervall(TimeSpan period)
{
   return Observable.Generate(0L, 
      i => true, 
      i => i + 1, 
      i => i, 
      i => i == 0 ? TimeSpan.Zero : period
   );
}
public static IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
{
   return Observable.Generate(0L, 
      i => true, 
      i => i + 1, 
      i => i, 
      i => i == 0 ? dueTime : period
   );
}

Examples

Bearbeiten
Timer Unfoding
var observable = Observable.Create<string>(observer => {
    ElapsedEventHandler handler = 
      (sender, args) => observer.OnNext(args.SignalTime.ToString());

    var timer = new Timer();
    timer.Interval = 1000;
    timer.Elapsed += handler;
    timer.Start();
    return Disposable.Create(() => { 
        timer.Elapsed -= handler;
        timer.Dispose();
        Console.WriteLine("Timer stopped.");
    });
});

using(var subscription = observable.Subscribe(Console.WriteLine))
{
    Thread.Sleep(5000);
}
void Main()
{
   var integers = Unfold(1, i => i + 1);
   foreach(var value in integers.Take(20))
   {
      Console.WriteLine(value);
   }
}

private static IEnumerable<T> Unfold<T>(T seed, Func<T, T> accumulator)
{
   var nextValue = seed;
   while(true)
   {
      yield return nextValue;
	  nextValue = accumulator(nextValue);
   }
}

Anmerkung: Das .NET Framework stellt mehrere Timer zur Auswahl:

  • System.Timers.Timer
  • System.Threading.Timer
  • System.Windows.Threading.DispatcherTimer
  • (System.Diagnostics.Stopwatch)

Unfold Methods

Bearbeiten
Observable.Range Observable.Generate
void Main()
{
   Observable.Range(10,10).Subscribe(Console.WriteLine);
}
void Main()
{
   Range(10,10).Subscribe(Console.WriteLine);
}

public static IObservable<int> Range(int initial, int count)
{
   return Observable.Generate<int,int>(initial, i => (i <= initial + count), i => i + 1, i => i);
}
Observable.Interval Observable.Timer
Intervall every 250 milliseconds
var observable = Observable.Interval(TimeSpan.FromMilliseconds(250));
using(var subscription = observable.Subscribe(Console.WriteLine))
{
   // 0..18
   Thread.Sleep(5000);  
}
Event after 1 second
var observable = Observable.Timer(TimeSpan.FromSeconds(1));
using(var subscription = observable.Subscribe(Console.WriteLine, () => Console.WriteLine("fin")))
{
   // 0, fin
   Thread.Sleep(5000);  
}
Interval starting after 0 seconds
var observable = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
using(var subscription = observable.Subscribe(Console.WriteLine))
{
   // 0..19
   Thread.Sleep(5000);  
}

Paradigm Transition

Bearbeiten

From Action/Func

Bearbeiten
Observable.Start
void StartAction()
{
   var start = Observable.Start(() => {
      Console.WriteLine("working...");
      Enumerable.Range(0, 100).ToList().ForEach(i => {
         Thread.Sleep(100);
         Console.Write(".");
      });
   });

   start.Subscribe(
      unit => Console.WriteLine("\nUnit published."), 
      () => Console.WriteLine("Action completed."));
}
void StartFunc()
{
   var start = Observable.Start(() => {
      Console.WriteLine("working...");
      Enumerable.Range(0, 100).ToList().ForEach(i => {
         Thread.Sleep(100);
         Console.Write(".");
      });
      return "Value";
   });

   start.Subscribe(
      value => Console.WriteLine(string.format("\n{0} published.", value)), 
      () => Console.WriteLine("Action completed."));
}

From Events

Bearbeiten
// EventHandler delegate
var appActivated = Observable.FromEventPattern(
   handler => Application.Current.Activated += handler, 
   handler => Application.Current.Activated -= handler);

// Subclass of EventHandler
var propChanged = Observable.FromEventPattern<PropertyChangedEventHandler, PropertyChangedEventArgs>(
   handler => handler.Invoke, 
   handler => this.PropertyChanged += handler, 
   handler => this.PropertyChanged -= handler);

// EventHandler<TEventArgs>
var firstChangeException = Observable.FromEventPattern<firstChangeException>(
   handler => AppDomain.CurrentDomain.FirstChanceException += handler,
   handler => AppDomain.CurrentDomain.FirstChanceException -= handler);

From Tasks

Bearbeiten
var task = Task.Factory.StartNew(() => "Test");
var source = task.ToObservable();
source.Subscribe(Console.WriteLine, () => Console.WriteLine("completed"));

From IEnumerable<T>

Bearbeiten

Note: use StopWatch to test impact, when converting from synchronous IEnumerable to asynchronous IObservable!

// do not use; example code only
public static IObservable<T> ToObservable<T>(IEnumerable<T> source)
{
   return Observable.Create<T>(observer => {
      foreach(var item in source)
      {
         observer.OnNext(item);
      }
      return Disposable.Empty;
   });
}

From APM (Asynchronous Programming Model)

Bearbeiten

Note: .NET 3.5/.NET 4.0 only. Was replaced in .NET 4.5/Rx 2.0 by the async/await pattern. See Rxx for examples of current version.

Web Request Stream
public class WebRequest
{
   public WebResponse GetResponse() { ... }
   public IAsyncResult BeginGetResponse(AsyncCallback callbac, object state) { ... }
   public WebResponse EndGetResponse(IAsyncResult asyncResult) { ... }
}
public class Stream
{
   public int Read(byte[] buffer, int offset, int count) { ... }
   public IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) { ... }
   public int EndRead(IAsyncResult asyncResult) { ... }
}
// BeginRead takes <byte[], int, int, ...> 
// EndRead returns <..., int>
var observableStream = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);

Operators

Bearbeiten

Sequence Reduction

Bearbeiten
ABC's of Functional Programming
Name alternative Names
Anamrophism Ana, Unfold, Generate
Bind Map, SelectMany, Projection, Transform
Catamorphism Cata, Fold, Reduce, Accumulate, Inject
Rx Extension Methods
Group Creation Reduction Inspection Aggregation Transformation
Operators  
  • Where
  • Distinct
  • DistinctUntilChanged
  • IgnoreElements
  • Skip
    • SkipWhile
    • SkipLast
    • SkipUntil
  • Take
    • TakeWhile
    • TakeLast
    • TakeUntil
  • Any
  • All
  • Contains
  • DefaultIfEmpty
  • ElementAt
  • SequenceEqual
  • Count
  • Min
  • Max
  • Sum
  • Average
  • First
  • Last
  • Single
Custom
  • Aggregate
  • Scan
Partitioning
  • MinBy
  • MaxBy
  • GroupBy
  • Select
  • Cast
  • OfType
  • Timestamp
  • TimeInterval
  • Materialize
  • Dematerialize
  • SelectMany

Error Handling

Bearbeiten
  • Catch
  • Finally
  • Using
  • OnErrorResumeNext
  • Retry

Sequence Combination

Bearbeiten
Sequential Concurrent Pairing
  • Concat
  • Repeat
  • StartWith
  • Amb
  • Merge
  • Switch
  • CombineLatest
  • Zip
  • And-Then-When

Scheduler

Bearbeiten
  • UI Application
    • handler blocks for < 50ms
      • use TaskPoolScheduler when available
      • use ThreadScheduler when TaskPoolScheduler is not available
    • handler blocks for > 50ms (ie. I/O)
      • use NewThreadScheduler
  • Service Application
    • Data from a Queue
      • EventLoopScheduler prevents order of events
    • handler blocks for > 50ms (ie. I/O)
      • use NewThreadScheduler
    • handler blocks for < 50ms
      • use TaskPoolScheduler if available
      • use ThreadScheduler when TaskPoolScheduler is not available

Cost:

  • Creating a Thread Pool takes 500ms
  • Creating a Thread takes 50ms and 2MB of RAM
Important Schedulers
Scheduler Usage
ImmediateScheduler Scheduler.Immediate
CurrentThreadScheduler Scheduler.CurrentThread
DispatcherScheduler DispatcherScheduler.Current
Scheduler.ThreadPool
EventLoopScheduler
NewThreadScheduler NewThreadScheduler.Default
Scheduler.NewThread
ThreadPoolScheduler ThreadPoolScheduler.Default
TaskPoolScheduler TaskPoolScheduler.Default
new WindowsFormsSynchronizationContext() WinForms UI Scheduler
new SynchronizationContextScheduler(SynchronizationContext.Current) WPF UI Scheduler
Handle ArgumentExceptions on EventSubscriptions

System.ArgumentException: Cannot bind to the target method because its signature or security transparency is not compatible with that of the delegate type.

Instead of:

return Observable.FromEvent<StrokeCollectionChangedEventHandler, StrokeCollectionChangedEventArgs>(
        eh => strokeCollection.StrokesChanged += eh,
        eh => strokeCollection.StrokesChanged -= eh);

try:

return Observable.FromEvent<StrokeCollectionChangedEventHandler, StrokeCollectionChangedEventArgs>(
        handler =>
        {
            StrokeCollectionChangedEventHandler eh = (sender, args) => handler(args);
            return eh;
        },
        eh => strokeCollection.StrokesChanged += eh,
        eh => strokeCollection.StrokesChanged -= eh);

Internetquellen

Bearbeiten

Rx on the server

Bearbeiten
TPL Dataflow

Referenzen

Bearbeiten
  1. Reactive Extensions v2.0 Beta available now! In: Reactive Extensions Team Blog. Microsoft, 12. März 2012, abgerufen am 26. Mai 2014.