using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.IO.Pipelines;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using System.Threading.Tasks;
public static async Task Main(string[] args)
int readBufferCapacity = 2048;
int writeBatchSize = 100;
using var httpClient = new HttpClient();
await foreach (var buffer in httpClient
.GetAsJsonDelimitedFromAsync<ApiModel>("https://poc-jsond.free.beeceptor.com/json", readBufferCapacity)
Console.WriteLine($"Processing {buffer.Count} items");
foreach (var item in buffer)
Console.WriteLine($"Processed: {item}");
public record ApiModel(string Id, [property:JsonPropertyName("name")]string Name);
public static class AsyncEnumerableExtensions
public static async IAsyncEnumerable<IReadOnlyCollection<T>> Buffer<T>(this IAsyncEnumerable<T> source,
int batchSize, [EnumeratorCancellation] CancellationToken ct = default)
var buffer = new List<T>(batchSize);
await foreach (var item in source.WithCancellation(ct))
if (buffer.Count == batchSize)
public static class HttpClientJsonDelimitedExtensions
public static async IAsyncEnumerable<T> GetAsJsonDelimitedFromAsync<T>(this HttpClient httpClient,
string url, int bufferCapacity = 1024, [EnumeratorCancellation] CancellationToken ct = default)
using var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, ct);
await using var stream = await response.Content.ReadAsStreamAsync(ct);
await foreach (var json in stream.ReadAsJsonAsync<T>(bufferCapacity).WithCancellation(ct))
public static class StreamJsonDelimitedExtensions
private static readonly byte[] NewLineCharacters = "\n\r"u8.ToArray();
public static async IAsyncEnumerable<T> ReadAsJsonAsync<T>(this Stream source, int bufferCapacity = 1024, JsonSerializerOptions jsonSerializerOptions = null, [EnumeratorCancellation] CancellationToken ct = default)
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(bufferCapacity)
FullMode = BoundedChannelFullMode.Wait
var writerTask = source.WriteJsonToChannelAsync(channel.Writer, jsonSerializerOptions, ct);
await foreach (var item in channel.Reader.ReadAllAsync(ct))
public static async Task WriteJsonToChannelAsync<T>(this Stream stream, ChannelWriter<T> writer, JsonSerializerOptions jsonSerializerOptions = null, CancellationToken token = default)
var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
var items = new Queue<T>();
while (!token.IsCancellationRequested)
var result = await pipeReader.ReadAsync(token);
var buffer = result.Buffer;
var position = ReadItems(buffer, result.IsCompleted,items, jsonSerializerOptions, token);
while (items.TryDequeue(out var item))
await writer.WriteAsync(item, token);
pipeReader.AdvanceTo(position, buffer.End);
await pipeReader.CompleteAsync();
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static SequencePosition ReadItems<T>(in ReadOnlySequence<byte> sequence,
bool isCompleted, Queue<T> items, JsonSerializerOptions jsonSerializerOptions = default, CancellationToken token = default)
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested)
if (reader.TryReadToAny(out ReadOnlySequence<byte> itemBytes, NewLineCharacters, advancePastDelimiter: true))
var jsonReader = new Utf8JsonReader(itemBytes);
var item = JsonSerializer.Deserialize<T>(ref jsonReader, jsonSerializerOptions);
var itemSequence = sequence.Slice(reader.Position);
var jsonReader = new Utf8JsonReader(itemSequence);
var item = JsonSerializer.Deserialize<T>(ref jsonReader, jsonSerializerOptions);
reader.Advance(sequence.Length);