using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.IO.Pipelines;
using System.Threading.Channels;
using System.Threading.Tasks;
public static async Task Main(string[] args)
int readBufferCapacity = 2048;
int writeBatchSize = 100;
using var httpClient = new HttpClient();
await foreach (var buffer in httpClient
.GetAsJsonDelimitedFromAsync("https://poc-jsond.free.beeceptor.com/json", readBufferCapacity)
Console.WriteLine($"Processing {buffer.Count} items");
public static class AsyncEnumerableExtensions
public static async IAsyncEnumerable<IReadOnlyCollection<T>> Buffer<T>(this IAsyncEnumerable<T> source,
int batchSize, [EnumeratorCancellation] CancellationToken ct = default)
var buffer = new List<T>(batchSize);
await foreach (var item in source.WithCancellation(ct))
if (buffer.Count == batchSize)
public static class HttpClientJsonDelimitedExtensions
public static async IAsyncEnumerable<JsonDocument> GetAsJsonDelimitedFromAsync(this HttpClient httpClient,
string url, int bufferCapacity = 1024, [EnumeratorCancellation] CancellationToken ct = default)
using var response = await httpClient.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, ct);
await using var stream = await response.Content.ReadAsStreamAsync(ct);
await foreach (var json in stream.ReadAsJsonAsync(bufferCapacity).WithCancellation(ct))
public static class StreamJsonDelimitedExtensions
private static readonly byte[] NewLineCharacters = "\n\r"u8.ToArray();
public static async IAsyncEnumerable<JsonDocument> ReadAsJsonAsync(this Stream source, int bufferCapacity = 1024, [EnumeratorCancellation] CancellationToken ct = default)
var channel = Channel.CreateBounded<JsonDocument>(new BoundedChannelOptions(bufferCapacity)
FullMode = BoundedChannelFullMode.Wait
var writerTask = source.WriteJsonToChannelAsync(channel.Writer, ct);
await foreach (var item in channel.Reader.ReadAllAsync(ct))
public static async Task WriteJsonToChannelAsync(this Stream stream, ChannelWriter<JsonDocument> writer, CancellationToken token)
var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(leaveOpen: true));
while (!token.IsCancellationRequested)
var result = await pipeReader.ReadAsync(token);
var buffer = result.Buffer;
var (position, items) = ReadItems(buffer, result.IsCompleted, token);
foreach (var item in items)
await writer.WriteAsync(item, token);
pipeReader.AdvanceTo(position, buffer.End);
await pipeReader.CompleteAsync();
private static (SequencePosition, List<JsonDocument>) ReadItems(in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
var reader = new SequenceReader<byte>(sequence);
var items = new List<JsonDocument>();
while (!reader.End && !token.IsCancellationRequested)
if (reader.TryReadToAny(out ReadOnlySequence<byte> itemBytes, NewLineCharacters, advancePastDelimiter: true))
var item = JsonDocument.Parse(itemBytes);
var item = JsonDocument.Parse(sequence.Slice(reader.Position));
reader.Advance(sequence.Length);
return (reader.Position, items);