using System.Collections;
using System.Collections.Generic;
using System.Runtime.Serialization.Formatters;
using System.ComponentModel.DataAnnotations;
using System.Globalization;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Serialization;
public static partial class JsonExtensions
public static async IAsyncEnumerable<T?> DeserializeAsyncEnumerable<T>(Stream stream, JsonSerializerSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
var serializer = JsonSerializer.CreateDefault(settings);
var loadSettings = new JsonLoadSettings { LineInfoHandling = LineInfoHandling.Ignore };
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
await foreach (var token in LoadAsyncEnumerable(reader, loadSettings, cancellationToken ).ConfigureAwait(false))
yield return token.ToObject<T>(serializer);
public static async IAsyncEnumerable<JToken> LoadJTokenAsyncEnumerable(Stream stream, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
await foreach (var token in LoadAsyncEnumerable(reader, settings, cancellationToken).ConfigureAwait(false))
public static async IAsyncEnumerable<JToken> LoadAsyncEnumerable(JsonTextReader reader, JsonLoadSettings? settings = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
(await reader.MoveToContentAndAssertAsync().ConfigureAwait(false)).AssertTokenType(JsonToken.StartArray);
cancellationToken.ThrowIfCancellationRequested();
while ((await reader.ReadToContentAndAssert(cancellationToken).ConfigureAwait(false)).TokenType != JsonToken.EndArray)
cancellationToken.ThrowIfCancellationRequested();
yield return await JToken.LoadAsync(reader, settings, cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
public static JsonReader AssertTokenType(this JsonReader reader, JsonToken tokenType) =>
reader.TokenType == tokenType ? reader : throw new JsonSerializationException(string.Format("Unexpected token {0}, expected {1}", reader.TokenType, tokenType));
public static async Task<JsonReader> ReadToContentAndAssert(this JsonReader reader, CancellationToken cancellationToken = default) =>
await (await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false)).MoveToContentAndAssertAsync(cancellationToken).ConfigureAwait(false);
public static async Task<JsonReader> MoveToContentAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
throw new ArgumentNullException();
if (reader.TokenType == JsonToken.None)
await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
while (reader.TokenType == JsonToken.Comment)
await reader.ReadAndAssertAsync(cancellationToken).ConfigureAwait(false);
public static async Task<JsonReader> ReadAndAssertAsync(this JsonReader reader, CancellationToken cancellationToken = default)
throw new ArgumentNullException();
if (!await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
throw new JsonReaderException("Unexpected end of JSON stream.");
public class SurrogateStream : Stream
Stream BaseStream => baseStream ?? throw new ObjectDisposedException(nameof(BaseStream));
public SurrogateStream(Stream baseStream) => this.baseStream = baseStream ?? throw new ArgumentNullException(nameof(baseStream));
public override long Length => BaseStream.Length;
public override long Position { get => BaseStream.Position; set => BaseStream.Position = value; }
public override bool CanRead => BaseStream.CanRead;
public override bool CanWrite => BaseStream.CanWrite;
public override bool CanSeek => BaseStream.CanSeek;
public override bool CanTimeout => BaseStream.CanTimeout;
public override IAsyncResult BeginRead (byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => BaseStream.BeginRead(buffer, offset, count, callback, state);
public override IAsyncResult BeginWrite (byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => BaseStream.BeginWrite(buffer, offset, count, callback, state);
public override void CopyTo (System.IO.Stream destination, int bufferSize) => BaseStream.CopyTo(destination, bufferSize);
public override Task CopyToAsync (System.IO.Stream destination, int bufferSize, System.Threading.CancellationToken cancellationToken) => BaseStream.CopyToAsync(destination, bufferSize, cancellationToken);
protected override void Dispose (bool disposing)
Interlocked.Exchange(ref baseStream, null)?.Dispose();
public override ValueTask DisposeAsync() => Interlocked.Exchange(ref baseStream, null)?.DisposeAsync() ?? ValueTask.CompletedTask;
public override int EndRead (IAsyncResult asyncResult) => BaseStream.EndRead(asyncResult);
public override void EndWrite (IAsyncResult asyncResult) => BaseStream.EndWrite(asyncResult);
public override void Flush() => BaseStream.Flush();
public override Task FlushAsync (CancellationToken cancellationToken) => BaseStream.FlushAsync(cancellationToken);
public override int Read (byte[] buffer, int offset, int count) => BaseStream.Read(buffer, offset, count);
public override int Read (Span<byte> buffer) => BaseStream.Read(buffer);
public override Task<int> ReadAsync (byte[] buffer, int offset, int count, CancellationToken cancellationToken) => BaseStream.ReadAsync(buffer, offset, count, cancellationToken);
public override ValueTask<int> ReadAsync (Memory<byte> buffer, System.Threading.CancellationToken cancellationToken = default) => BaseStream.ReadAsync(buffer, cancellationToken);
public override int ReadByte () => BaseStream.ReadByte();
public override long Seek (long offset, System.IO.SeekOrigin origin) => BaseStream.Seek(offset, origin);
public override void SetLength (long value) => BaseStream.SetLength(value);
public override void Write (byte[] buffer, int offset, int count) => BaseStream.Write(buffer, offset, count);
public override void Write (ReadOnlySpan<byte> buffer) => BaseStream.Write(buffer);
public override System.Threading.Tasks.Task WriteAsync (byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken) => BaseStream.WriteAsync(buffer, offset, count, cancellationToken);
public override System.Threading.Tasks.ValueTask WriteAsync (ReadOnlyMemory<byte> buffer, System.Threading.CancellationToken cancellationToken = default) => BaseStream.WriteAsync(buffer, cancellationToken);
public override void WriteByte (byte value) => BaseStream.WriteByte(value);
public class AsyncOnlySurogateStream : SurrogateStream
public AsyncOnlySurogateStream(Stream baseStream) : base(baseStream) { }
public override void CopyTo (System.IO.Stream destination, int bufferSize) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
protected override void Dispose (bool disposing)
throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override void Flush() => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override int Read (byte[] buffer, int offset, int count) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override int Read (Span<byte> buffer) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override int ReadByte () => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override void Write (byte[] buffer, int offset, int count) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override void Write (ReadOnlySpan<byte> buffer) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public override void WriteByte (byte value) => throw new InvalidOperationException(string.Format("Non async method {0} called", System.Reflection.MethodBase.GetCurrentMethod()?.Name));
public int Id { get; set; }
public string? Name { get; set; }
public static async Task Test()
await TestDeserializeAsync();
await TestLoadJTokenAsync();
Assert.ThrowsAsync<InvalidOperationException>(() => TestReadAsJsonNonAsyncFail());
Assert.ThrowsAsync<InvalidOperationException>(() => TestReadAsTextNonAsyncFail());
Assert.DoesNotThrowAsync(() => TestReadAsTextAsync());
Assert.DoesNotThrowAsync(() => TestReadAsJsonAsync());
public static async Task TestDeserializeAsync(CancellationToken cancellationToken = default)
var inputs = Enumerable.Range(0,12).Select(i => new Model { Id = i, Name = $"Model {i}" }).ToList();
using var innerStream = new MemoryStream();
using (var textWriter = new StreamWriter(innerStream, leaveOpen : true))
using (var writer = new JsonTextWriter(textWriter) { CloseOutput = false })
JsonSerializer.CreateDefault().Serialize(writer, inputs);
innerStream.Position = 0;
await using var stream = new AsyncOnlySurogateStream(innerStream);
await foreach (var token in JsonExtensions.LoadJTokenAsyncEnumerable(stream, cancellationToken : cancellationToken))
Console.WriteLine(token.ToString(Formatting.None));
public static async Task TestLoadJTokenAsync(CancellationToken cancellationToken = default)
var inputs = Enumerable.Range(0,10).Select(i => new Model { Id = i, Name = $"Token {i}" }).ToList();
using var innerStream = new MemoryStream();
using (var textWriter = new StreamWriter(innerStream, leaveOpen : true))
using (var writer = new JsonTextWriter(textWriter) { CloseOutput = false })
JsonSerializer.CreateDefault().Serialize(writer, inputs);
innerStream.Position = 0;
await using var stream = new AsyncOnlySurogateStream(innerStream);
await foreach (var model in JsonExtensions.DeserializeAsyncEnumerable<Model>(stream, cancellationToken : cancellationToken))
Console.WriteLine(JsonConvert.SerializeObject(model, Formatting.None));
public static async Task TestReadAsTextAsync()
using var innerStream = new MemoryStream(Encoding.UTF8.GetBytes(GetJson()));
await using (var stream = new AsyncOnlySurogateStream(innerStream))
using (var textReader = new StreamReader(stream, leaveOpen : true))
while ((line = await textReader.ReadLineAsync()) != null)
public static async Task TestReadAsTextNonAsyncFail()
using var innerStream = new MemoryStream(Encoding.UTF8.GetBytes(GetJson()));
await using (var stream = new AsyncOnlySurogateStream(innerStream))
using (var textReader = new StreamReader(stream, leaveOpen : false))
while ((line = await textReader.ReadLineAsync()) != null)
public static async Task TestReadAsJsonAsync()
var inputs = Enumerable.Range(0,12000).Select(i => new Model { Id = i, Name = $"Model {i}" }).ToList();
using var innerStream = new MemoryStream();
using (var textWriter = new StreamWriter(innerStream, leaveOpen : true))
using (var writer = new JsonTextWriter(textWriter) { CloseOutput = false })
JsonSerializer.CreateDefault().Serialize(writer, inputs);
Console.WriteLine($"\ninnerStream.Length = {innerStream.Length}");
innerStream.Position = 0;
await using (var stream = new AsyncOnlySurogateStream(innerStream))
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
while (await reader.ReadAsync())
public static async Task TestReadAsJsonNonAsyncFail()
using var innerStream = new MemoryStream(Encoding.UTF8.GetBytes(GetJson()));
await using (var stream = new AsyncOnlySurogateStream(innerStream))
using (var textReader = new StreamReader(stream, leaveOpen : true))
using (var reader = new JsonTextReader(textReader) { CloseInput = false })
Console.WriteLine($"TokenType={reader.TokenType}, Value={reader.Value}");
static string GetJson() => @"[{
""title"": ""example glossary"",
""GlossTerm"": ""Standard Generalized Markup Language"",
""Abbrev"": ""ISO 8879:1986"",
""para"": ""A meta-markup language, used to create markup languages such as DocBook."",
""GlossSeeAlso"": [""GML"", ""XML""]
""title"": ""example glossary"",
""title"": ""example glossary"",
null,true,false,1,2,""hello"",
public static async Task Main(string[] args)
Console.WriteLine("Environment version: {0} ({1})", System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription , Environment.Version);
Console.WriteLine("{0} version: {1}", typeof(JsonSerializer).Assembly.GetName().Name, typeof(JsonSerializer).Assembly.FullName);
Console.WriteLine("Failed with unhandled exception: ");