using System.Collections.Generic;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
public class CrudMessage { }
public class PointerCrudMessage: CrudMessage {
public class ItemCrudMessage : CrudMessage
public class CreateMessage : ItemCrudMessage {}
public class ReadMessage : PointerCrudMessage{}
public class UpdateMessage : ItemCrudMessage {}
public class DeleteMessage : PointerCrudMessage{}
public class NotifyMessage
public static class CrudImplementation
public static IObservable<List<Report>> Core(IObservable<CrudMessage> messages, Action<NotifyMessage> publish, Func<long> genId, Func<List<Report>> getBackup)
var fork = messages.Publish().RefCount();
var create = fork.Select(m => m as CreateMessage).Where(m => m != null);
var update = fork.Select(m => m as UpdateMessage).Where(m => m != null);
var read = fork.Select(m => m as ReadMessage ).Where(m => m != null);
var delete = fork.Select(m => m as DeleteMessage).Where(m => m != null);
create.Select(m => new { m, id = genId() })
.Select(a => new { a.id, type = "C", action = new Action<List<Report>>(list => list.Add(Apply(a.m.Value, v => v.Id = a.id))) }),
read .Select(m => new { id = m.Id, type = "R", action = new Action<List<Report>>(list => { }) }),
update.Select(m => new { id = m.Value.Id, type = "U", action = new Action<List<Report>>(list => If(list, l => l.Any(x => x.Id == m.Value.Id), l => l[l.FindIndex(x => x.Id == m.Value.Id)] = m.Value)) }),
delete.Select(m => new { id = m.Id, type = "D", action = new Action<List<Report>>(list => If(list, l => l.Any(x => x.Id == m.Id), l => l.RemoveAt(l.FindIndex(x => x.Id == m.Id)))) })
.Scan(getBackup(), (cached, modification) =>
log("list.Count before [" + modification.type + "]: " + cached.Count);
var ret = Apply(cached, modification.action);
var found = ret.FirstOrDefault(x => x.Id == modification.id);
publish(new NotifyMessage() { Valid = found != null, Value = found !=null ? found : new Report{Id =-1L, Field = 0} });
log("list.Count after [" + modification.type + "]: " + cached.Count);
public static T Apply<T>(T v, Action<T> a)
public static T If<T>(this T target, Func<T, bool> predicate, Action<T> modify)
public static void log(string s) { Console.WriteLine(s); }
public static void Test_Cached_CRUD()
var scheduler = new TestScheduler();
const long START_ID = 54321L;
Func<List<Report>> dbRead = () => new List<Report>(){new Report( ) { Field = 123, Id = START_ID}};
var currentId = START_ID+1;
OnNext(TimeSpan.FromSeconds(11).Ticks, new UpdateMessage{ Value = new Report( ) { Field = 124, Id = START_ID} } as CrudMessage),
OnNext(TimeSpan.FromSeconds(12).Ticks, new CreateMessage{ Value = new Report( ) { Field = 125} } as CrudMessage),
OnNext(TimeSpan.FromSeconds(13).Ticks, new UpdateMessage{ Value = new Report( ) { Field = 126, Id = START_ID+1} } as CrudMessage),
OnNext(TimeSpan.FromSeconds(14).Ticks, new DeleteMessage{ Id = START_ID+1 } as CrudMessage),
var l = scheduler.CreateHotObservable(list);
var notifs = new List<Timestamped<NotifyMessage>>();
var result = scheduler.Start(
() => CrudImplementation.Core(scheduler.CreateHotObservable(list),
x => notifs.Add(new Timestamped<NotifyMessage>(x, scheduler.Now)),
0, 0, TimeSpan.FromMinutes(3).Ticks);
new { report= new Report( ) { Field = 124, Id = START_ID }, valid = true},
new { report= new Report( ) { Field = 125, Id = START_ID+1}, valid = true},
new { report= new Report( ) { Field = 126, Id = START_ID+1}, valid = true},
new { report= new Report( ) { Field = 0 , Id = -1L }, valid = false},
n => "" + n.Value.Value.Id + "-" + n.Value.Value.Field + "-" + n.Value.Valid ,
e => "" + e.report.Id + "-" + e.report.Field + "-" + e.valid
private static Recorded<Notification<T>> OnCompleted<T>(long virtualTime) { return new Recorded<Notification<T>>(virtualTime, Notification.CreateOnCompleted<T>()); }
private static Recorded<Notification<T>> OnNext<T>(long virtualTime, T message) { return new Recorded<Notification<T>>(virtualTime, Notification.CreateOnNext(message)); }
public static void Main()
MethodInfo[] methodInfos = typeof(Test).GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.Static);
foreach (var info in methodInfos)
if (info.IsPublic && info.Name.StartsWith("Test_"))
Console.WriteLine(">> Entering " + info.Name + " <<");
public static class Utils
public static void Assert(Func<bool> expr, string message, bool skipWhenPass = false)
if (!expr()) Console.WriteLine("FAILED > difference on " + message);
else if (!skipWhenPass) Console.WriteLine("PASSED > same " + message);
public static void Assert2<T, U>(IEnumerable<T> output, IEnumerable<U> expec, Func<T, string> projectT, Func<U, string> projectU)
var expected = expec.ToArray();
var received = output.ToArray();
Assert(() => received.Count() == expected.Count(), "count of messages: expected " + expected.Count() + " vs " + received.Count() + " received");
if (received.Count() != expected.Count()) return;
foreach (var i in Enumerable.Range(0, received.Count()))
Assert(() => projectT(received[i]) == projectU(expected[i]), "kind of message for entry " + i + ": expected " + projectU(expected[i])
+ " vs " + projectT(received[i])