Skip to content

Latest commit

 

History

History
42 lines (30 loc) · 1.37 KB

File metadata and controls

42 lines (30 loc) · 1.37 KB

Allow creating observable over eventstore connection, and add some reactive extensions to handle events. Created observables are hot and refcount, so that subscription stay alive until their is subscribers.

nuget id :

EventStore.Client.Reactive

Create observable :

var observable = connection.CreateObservable("stream-id"); //subribe from end of stream

Then deserialize events :

var subscription = connection.CreateObservable(StreamId)
                             .Deserialize<MyEvent>()
                             .Subscribe(e => Console.WriteLine("event handled : " + e.ToString())

If position matter (eg to handle checkpoints) :

 var subscription = connection.CreateObservable(StreamId)
                              .DeserializeWithPosition<MyEvent>()
                              .HandleEvent(e => Console.WriteLine("handled : " + e.ToString()))
                              .Subscribe(pos => Console.WriteLine("last position handled : " + pos));

If you want to subscribe from the beginning of stream :

var observable = connection.CreateObservable("stream-id", 0);

Custom serializer

var subscription = connection.CreateObservable(StreamId)
                             .Deserialize<MyEvent>(new CustomEventSerializer())
			     .Subscribe(e => Console.WriteLine("event handled : " + e.ToString())