using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public interface IAgent<TState, TMessage>
Task Send(TMessage message);
void Post(TMessage message);
ActionBlock<TMessage> ActionBlock {get;}
class StatefulDataFlowAgent<TState, TMessage> : IAgent<TState, TMessage>
public TState State {get;set;}
public Task Completion => ActionBlock.Completion;
public ActionBlock<TMessage> ActionBlock {get;}
public StatefulDataFlowAgent(
Func<TState, TMessage, Task<TState>> action,
CancellationTokenSource cts = null)
var options = new ExecutionDataflowBlockOptions {
CancellationToken = cts != null ?
cts.Token : CancellationToken.None
ActionBlock = new ActionBlock<TMessage>(
async msg => State = await action(State, msg), options);
public Task Send(TMessage message) => ActionBlock.SendAsync(message);
public void Post(TMessage message) => ActionBlock.Post(message);
public static class Agent
public static IAgent<TState, TMessage> Start<TState, TMessage>(TState initialState,
Func<TState, TMessage, Task<TState>> action,
CancellationTokenSource cts = null) =>
new StatefulDataFlowAgent<TState, TMessage>(initialState, action, cts);
enum DirectionOfDictionaryResizing
public static int atomicRemovedIndexAfterAddition = -1;
public static int atomicRemovedIndexAfterRemoval = -1;
static Func<int, Task<int>> addItemAtIndex = (int i) =>
return Task.FromResult(ii);
static Func<int, Task<int>> removeItemAtIndex = (int i) =>
return Task.FromResult(ii);
public static void Main()
IAgent<int, DirectionOfDictionaryResizing> agentStateful = Agent.Start(0,
async (int state, DirectionOfDictionaryResizing direction) => {
if(direction.Equals(DirectionOfDictionaryResizing.Addition))
Interlocked.Increment(ref atomicRemovedIndexAfterAddition);
return await addItemAtIndex(0);
Interlocked.Increment(ref atomicRemovedIndexAfterRemoval);
return await removeItemAtIndex(0);
agentStateful.Post(DirectionOfDictionaryResizing.Addition);
agentStateful.Post(DirectionOfDictionaryResizing.Addition);
agentStateful.Post(DirectionOfDictionaryResizing.Removal);
agentStateful.ActionBlock.Complete();
agentStateful.ActionBlock.Completion.Wait();