using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Reflection;
using System.Resources;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Runtime.Versioning;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using FxResources.System.IO.Pipelines;
using Microsoft.CodeAnalysis;
[assembly: CompilationRelaxations(8)]
[assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)]
[assembly: Debuggable(DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints)]
[assembly: InternalsVisibleTo("System.IO.Pipelines.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001004b86c4cb78549b34bab61a3b1800e23bfeb5b3ec390074041536a7e3cbd97f5f04cf0f857155a8928eaa29ebfd11cfbbad3ba70efea7bda3226c6a8d370a4cd303f714486b6ebc225985a638471e6ef571cc92a4613c00b8fa65d61ccee0cbe5f36330c9a01f4183559f1bef24cc2917c6d913e3a541333a1d05d9bed22b38cb")]
[assembly: TargetFramework(".NETStandard,Version=v2.0", FrameworkDisplayName = ".NET Standard 2.0")]
[assembly: AssemblyMetadata("Serviceable", "True")]
[assembly: AssemblyMetadata("PreferInbox", "True")]
[assembly: AssemblyDefaultAlias("System.IO.Pipelines")]
[assembly: NeutralResourcesLanguage("en-US")]
[assembly: CLSCompliant(true)]
[assembly: AssemblyMetadata("IsTrimmable", "True")]
[assembly: AssemblyMetadata("IsAotCompatible", "True")]
[assembly: DefaultDllImportSearchPaths(DllImportSearchPath.System32 | DllImportSearchPath.AssemblyDirectory)]
[assembly: AssemblyCompany("Microsoft Corporation")]
[assembly: AssemblyCopyright("© Microsoft Corporation. All rights reserved.")]
[assembly: AssemblyDescription("Single producer single consumer byte buffer management.\r\n\r\nCommonly Used Types:\r\nSystem.IO.Pipelines.Pipe\r\nSystem.IO.Pipelines.PipeWriter\r\nSystem.IO.Pipelines.PipeReader")]
[assembly: AssemblyFileVersion("10.0.726.21808")]
[assembly: AssemblyInformationalVersion("10.0.7+b16286c2284fecf303dbc12a0bb152476d662e44")]
[assembly: AssemblyProduct("Microsoft® .NET")]
[assembly: AssemblyTitle("System.IO.Pipelines")]
[assembly: AssemblyMetadata("RepositoryUrl", "https://github.com/dotnet/dotnet")]
[assembly: AssemblyVersion("10.0.0.0")]
[module: RefSafetyRules(11)]
[module: System.Runtime.CompilerServices.NullablePublicOnly(true)]
namespace Microsoft.CodeAnalysis
{
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
internal sealed class EmbeddedAttribute : Attribute
{
}
}
namespace System.Runtime.CompilerServices
{
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
internal sealed class IsReadOnlyAttribute : Attribute
{
}
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Parameter | AttributeTargets.ReturnValue | AttributeTargets.GenericParameter, AllowMultiple = false, Inherited = false)]
internal sealed class NullableAttribute : Attribute
{
public readonly byte[] NullableFlags;
public NullableAttribute(byte P_0)
{
NullableFlags = new byte[1] { P_0 };
}
public NullableAttribute(byte[] P_0)
{
NullableFlags = P_0;
}
}
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Method | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)]
internal sealed class NullableContextAttribute : Attribute
{
public readonly byte Flag;
public NullableContextAttribute(byte P_0)
{
Flag = P_0;
}
}
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
[AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)]
internal sealed class NullablePublicOnlyAttribute : Attribute
{
public readonly bool IncludesInternals;
public NullablePublicOnlyAttribute(bool P_0)
{
IncludesInternals = P_0;
}
}
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
[AttributeUsage(AttributeTargets.Module, AllowMultiple = false, Inherited = false)]
internal sealed class RefSafetyRulesAttribute : Attribute
{
public readonly int Version;
public RefSafetyRulesAttribute(int P_0)
{
Version = P_0;
}
}
[CompilerGenerated]
[Microsoft.CodeAnalysis.Embedded]
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Enum | AttributeTargets.Method | AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Event | AttributeTargets.Interface | AttributeTargets.Delegate, AllowMultiple = false, Inherited = false)]
internal sealed class ExtensionMarkerAttribute : Attribute
{
public ExtensionMarkerAttribute(string name)
{
}
}
}
namespace FxResources.System.IO.Pipelines
{
internal static class SR
{
}
}
namespace System
{
internal static class SR
{
private static readonly bool s_usingResourceKeys = GetUsingResourceKeysSwitchValue();
private static ResourceManager s_resourceManager;
internal static ResourceManager ResourceManager => s_resourceManager ?? (s_resourceManager = new ResourceManager(typeof(SR)));
internal static string AdvanceToInvalidCursor => GetResourceString("AdvanceToInvalidCursor");
internal static string ArgumentOutOfRange_NeedPosNum => GetResourceString("ArgumentOutOfRange_NeedPosNum");
internal static string ConcurrentOperationsNotSupported => GetResourceString("ConcurrentOperationsNotSupported");
internal static string FlushCanceledOnPipeWriter => GetResourceString("FlushCanceledOnPipeWriter");
internal static string GetResultBeforeCompleted => GetResourceString("GetResultBeforeCompleted");
internal static string InvalidExaminedOrConsumedPosition => GetResourceString("InvalidExaminedOrConsumedPosition");
internal static string InvalidExaminedPosition => GetResourceString("InvalidExaminedPosition");
internal static string InvalidZeroByteRead => GetResourceString("InvalidZeroByteRead");
internal static string ObjectDisposed_StreamClosed => GetResourceString("ObjectDisposed_StreamClosed");
internal static string NoReadingOperationToComplete => GetResourceString("NoReadingOperationToComplete");
internal static string NotSupported_UnreadableStream => GetResourceString("NotSupported_UnreadableStream");
internal static string NotSupported_UnwritableStream => GetResourceString("NotSupported_UnwritableStream");
internal static string ReadCanceledOnPipeReader => GetResourceString("ReadCanceledOnPipeReader");
internal static string ReaderAndWriterHasToBeCompleted => GetResourceString("ReaderAndWriterHasToBeCompleted");
internal static string ReadingAfterCompleted => GetResourceString("ReadingAfterCompleted");
internal static string ReadingIsInProgress => GetResourceString("ReadingIsInProgress");
internal static string WritingAfterCompleted => GetResourceString("WritingAfterCompleted");
internal static string UnflushedBytesNotSupported => GetResourceString("UnflushedBytesNotSupported");
private static bool GetUsingResourceKeysSwitchValue()
{
if (!AppContext.TryGetSwitch("System.Resources.UseSystemResourceKeys", out var isEnabled))
{
return false;
}
return isEnabled;
}
internal static bool UsingResourceKeys()
{
return s_usingResourceKeys;
}
private static string GetResourceString(string resourceKey)
{
if (UsingResourceKeys())
{
return resourceKey;
}
string result = null;
try
{
result = ResourceManager.GetString(resourceKey);
}
catch (MissingManifestResourceException)
{
}
return result;
}
private static string GetResourceString(string resourceKey, string defaultString)
{
string resourceString = GetResourceString(resourceKey);
if (!(resourceKey == resourceString) && resourceString != null)
{
return resourceString;
}
return defaultString;
}
internal static string Format(string resourceFormat, object? p1)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1);
}
return string.Format(resourceFormat, p1);
}
internal static string Format(string resourceFormat, object? p1, object? p2)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1, p2);
}
return string.Format(resourceFormat, p1, p2);
}
internal static string Format(string resourceFormat, object? p1, object? p2, object? p3)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1, p2, p3);
}
return string.Format(resourceFormat, p1, p2, p3);
}
internal static string Format(string resourceFormat, params object?[]? args)
{
if (args != null)
{
if (UsingResourceKeys())
{
return resourceFormat + ", " + string.Join(", ", args);
}
return string.Format(resourceFormat, args);
}
return resourceFormat;
}
internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1);
}
return string.Format(provider, resourceFormat, p1);
}
internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1, p2);
}
return string.Format(provider, resourceFormat, p1, p2);
}
internal static string Format(IFormatProvider? provider, string resourceFormat, object? p1, object? p2, object? p3)
{
if (UsingResourceKeys())
{
return string.Join(", ", resourceFormat, p1, p2, p3);
}
return string.Format(provider, resourceFormat, p1, p2, p3);
}
internal static string Format(IFormatProvider? provider, string resourceFormat, params object?[]? args)
{
if (args != null)
{
if (UsingResourceKeys())
{
return resourceFormat + ", " + string.Join(", ", args);
}
return string.Format(provider, resourceFormat, args);
}
return resourceFormat;
}
}
internal static class ExceptionPolyfills
{
[SpecialName]
public sealed class <G>$E6188BA5B951F1F7AA9135E0EBB76F2B
{
[SpecialName]
public static class <M>$96F0261AC622664B8B003966835C0332
{
}
[ExtensionMarker("<M>$96F0261AC622664B8B003966835C0332")]
public static void ThrowIfNull([NotNull] object? argument, [CallerArgumentExpression("argument")] string? paramName = null)
{
throw null;
}
}
[SpecialName]
public sealed class <G>$3F30F31B33543D5FB8E174FB4FD780B9
{
[SpecialName]
public static class <M>$1F10CFA08738E6D8AF61CBECC6763DBC
{
}
[ExtensionMarker("<M>$1F10CFA08738E6D8AF61CBECC6763DBC")]
public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance)
{
throw null;
}
[ExtensionMarker("<M>$1F10CFA08738E6D8AF61CBECC6763DBC")]
public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type)
{
throw null;
}
}
public static void ThrowIfNull([NotNull] object? argument, [CallerArgumentExpression("argument")] string? paramName = null)
{
if (argument == null)
{
ThrowArgumentNullException(paramName);
}
}
[DoesNotReturn]
private static void ThrowArgumentNullException(string paramName)
{
throw new ArgumentNullException(paramName);
}
public static void ThrowIf([DoesNotReturnIf(true)] bool condition, object instance)
{
if (condition)
{
ThrowObjectDisposedException(instance);
}
}
public static void ThrowIf([DoesNotReturnIf(true)] bool condition, Type type)
{
if (condition)
{
ThrowObjectDisposedException(type);
}
}
[DoesNotReturn]
private static void ThrowObjectDisposedException(object instance)
{
throw new ObjectDisposedException(instance?.GetType().FullName);
}
[DoesNotReturn]
private static void ThrowObjectDisposedException(Type type)
{
throw new ObjectDisposedException(type?.FullName);
}
}
}
namespace System.Diagnostics.CodeAnalysis
{
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)]
internal sealed class AllowNullAttribute : Attribute
{
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter, Inherited = false)]
internal sealed class DisallowNullAttribute : Attribute
{
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)]
internal sealed class MaybeNullAttribute : Attribute
{
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Field | AttributeTargets.Parameter | AttributeTargets.ReturnValue, Inherited = false)]
internal sealed class NotNullAttribute : Attribute
{
}
[AttributeUsage(AttributeTargets.Parameter, Inherited = false)]
internal sealed class MaybeNullWhenAttribute : Attribute
{
public bool ReturnValue { get; }
public MaybeNullWhenAttribute(bool returnValue)
{
ReturnValue = returnValue;
}
}
[AttributeUsage(AttributeTargets.Parameter, Inherited = false)]
internal sealed class NotNullWhenAttribute : Attribute
{
public bool ReturnValue { get; }
public NotNullWhenAttribute(bool returnValue)
{
ReturnValue = returnValue;
}
}
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Parameter | AttributeTargets.ReturnValue, AllowMultiple = true, Inherited = false)]
internal sealed class NotNullIfNotNullAttribute : Attribute
{
public string ParameterName { get; }
public NotNullIfNotNullAttribute(string parameterName)
{
ParameterName = parameterName;
}
}
[AttributeUsage(AttributeTargets.Method, Inherited = false)]
internal sealed class DoesNotReturnAttribute : Attribute
{
}
[AttributeUsage(AttributeTargets.Parameter, Inherited = false)]
internal sealed class DoesNotReturnIfAttribute : Attribute
{
public bool ParameterValue { get; }
public DoesNotReturnIfAttribute(bool parameterValue)
{
ParameterValue = parameterValue;
}
}
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)]
internal sealed class MemberNotNullAttribute : Attribute
{
public string[] Members { get; }
public MemberNotNullAttribute(string member)
{
Members = new string[1] { member };
}
public MemberNotNullAttribute(params string[] members)
{
Members = members;
}
}
[AttributeUsage(AttributeTargets.Method | AttributeTargets.Property, Inherited = false, AllowMultiple = true)]
internal sealed class MemberNotNullWhenAttribute : Attribute
{
public bool ReturnValue { get; }
public string[] Members { get; }
public MemberNotNullWhenAttribute(bool returnValue, string member)
{
ReturnValue = returnValue;
Members = new string[1] { member };
}
public MemberNotNullWhenAttribute(bool returnValue, params string[] members)
{
ReturnValue = returnValue;
Members = members;
}
}
}
namespace System.Runtime.CompilerServices
{
[AttributeUsage(AttributeTargets.Parameter, AllowMultiple = false, Inherited = false)]
internal sealed class CallerArgumentExpressionAttribute : Attribute
{
public string ParameterName { get; }
public CallerArgumentExpressionAttribute(string parameterName)
{
ParameterName = parameterName;
}
}
}
namespace System.Runtime.InteropServices
{
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false, Inherited = false)]
internal sealed class LibraryImportAttribute : Attribute
{
public string LibraryName { get; }
public string? EntryPoint { get; set; }
public StringMarshalling StringMarshalling { get; set; }
public Type? StringMarshallingCustomType { get; set; }
public bool SetLastError { get; set; }
public LibraryImportAttribute(string libraryName)
{
LibraryName = libraryName;
}
}
internal enum StringMarshalling
{
Custom,
Utf8,
Utf16
}
}
namespace System.Threading
{
internal static class CancellationTokenExtensions
{
internal static CancellationTokenRegistration UnsafeRegister(this CancellationToken cancellationToken, Action<object> callback, object state)
{
return cancellationToken.Register(callback, state);
}
}
}
namespace System.Threading.Tasks
{
internal static class TaskToAsyncResult
{
private sealed class TaskAsyncResult : IAsyncResult
{
internal readonly Task _task;
private readonly AsyncCallback _callback;
public object AsyncState { get; }
public bool CompletedSynchronously { get; }
public bool IsCompleted => _task.IsCompleted;
public WaitHandle AsyncWaitHandle => ((IAsyncResult)_task).AsyncWaitHandle;
internal TaskAsyncResult(Task task, object state, AsyncCallback callback)
{
_task = task;
AsyncState = state;
if (task.IsCompleted)
{
CompletedSynchronously = true;
callback?.Invoke(this);
}
else if (callback != null)
{
_callback = callback;
_task.ConfigureAwait(continueOnCapturedContext: false).GetAwaiter().OnCompleted(delegate
{
_callback(this);
});
}
}
}
public static IAsyncResult Begin(Task task, AsyncCallback? callback, object? state)
{
ExceptionPolyfills.ThrowIfNull(task, "task");
return new TaskAsyncResult(task, state, callback);
}
public static void End(IAsyncResult asyncResult)
{
Unwrap(asyncResult).GetAwaiter().GetResult();
}
public static TResult End<TResult>(IAsyncResult asyncResult)
{
return Unwrap<TResult>(asyncResult).GetAwaiter().GetResult();
}
public static Task Unwrap(IAsyncResult asyncResult)
{
ExceptionPolyfills.ThrowIfNull(asyncResult, "asyncResult");
return (asyncResult as TaskAsyncResult)?._task ?? throw new ArgumentException(null, "asyncResult");
}
public static Task<TResult> Unwrap<TResult>(IAsyncResult asyncResult)
{
ExceptionPolyfills.ThrowIfNull(asyncResult, "asyncResult");
return ((asyncResult as TaskAsyncResult)?._task as Task<TResult>) ?? throw new ArgumentException(null, "asyncResult");
}
}
}
namespace System.IO
{
internal static class StreamHelpers
{
public static void ValidateCopyToArgs(Stream source, Stream destination, int bufferSize)
{
ExceptionPolyfills.ThrowIfNull(destination, "destination");
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize", bufferSize, System.SR.ArgumentOutOfRange_NeedPosNum);
}
bool canRead = source.CanRead;
if (!canRead && !source.CanWrite)
{
throw new ObjectDisposedException(null, System.SR.ObjectDisposed_StreamClosed);
}
bool canWrite = destination.CanWrite;
if (!canWrite && !destination.CanRead)
{
throw new ObjectDisposedException("destination", System.SR.ObjectDisposed_StreamClosed);
}
if (!canRead)
{
throw new NotSupportedException(System.SR.NotSupported_UnreadableStream);
}
if (!canWrite)
{
throw new NotSupportedException(System.SR.NotSupported_UnwritableStream);
}
}
}
internal static class StreamExtensions
{
public static ValueTask<int> ReadAsync(this Stream stream, Memory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
if (MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)buffer, out ArraySegment<byte> segment))
{
return new ValueTask<int>(stream.ReadAsync(segment.Array, segment.Offset, segment.Count, cancellationToken));
}
byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length);
return FinishReadAsync(stream.ReadAsync(array, 0, buffer.Length, cancellationToken), array, buffer);
static async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
{
try
{
int num = await readTask.ConfigureAwait(continueOnCapturedContext: false);
new Span<byte>(localBuffer, 0, num).CopyTo(localDestination.Span);
return num;
}
finally
{
ArrayPool<byte>.Shared.Return(localBuffer);
}
}
}
public static void Write(this Stream stream, ReadOnlyMemory<byte> buffer)
{
if (MemoryMarshal.TryGetArray(buffer, out var segment))
{
stream.Write(segment.Array, segment.Offset, segment.Count);
return;
}
byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length);
try
{
buffer.Span.CopyTo(array);
stream.Write(array, 0, buffer.Length);
}
finally
{
ArrayPool<byte>.Shared.Return(array);
}
}
public static ValueTask WriteAsync(this Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
{
if (MemoryMarshal.TryGetArray(buffer, out var segment))
{
return new ValueTask(stream.WriteAsync(segment.Array, segment.Offset, segment.Count, cancellationToken));
}
byte[] array = ArrayPool<byte>.Shared.Rent(buffer.Length);
buffer.Span.CopyTo(array);
return new ValueTask(FinishWriteAsync(stream.WriteAsync(array, 0, buffer.Length, cancellationToken), array));
}
private static async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
{
try
{
await writeTask.ConfigureAwait(continueOnCapturedContext: false);
}
finally
{
ArrayPool<byte>.Shared.Return(localBuffer);
}
}
public static Task CopyToAsync(this Stream source, Stream destination, CancellationToken cancellationToken = default(CancellationToken))
{
return source.CopyToAsync(destination, 81920, cancellationToken);
}
}
}
namespace System.IO.Pipelines
{
internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
private IMemoryOwner<byte> _memoryOwner;
private byte[] _array;
private BufferSegment _next;
private int _end;
public int End
{
get
{
return _end;
}
set
{
_end = value;
base.Memory = AvailableMemory.Slice(0, value);
}
}
public BufferSegment? NextSegment
{
get
{
return _next;
}
set
{
base.Next = value;
_next = value;
}
}
internal object? MemoryOwner => ((object)_memoryOwner) ?? ((object)_array);
public Memory<byte> AvailableMemory { get; private set; }
public int Length => End;
public int WritableBytes
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get
{
return AvailableMemory.Length - End;
}
}
public void SetOwnedMemory(IMemoryOwner<byte> memoryOwner)
{
_memoryOwner = memoryOwner;
AvailableMemory = memoryOwner.Memory;
}
public void SetOwnedMemory(byte[] arrayPoolBuffer)
{
_array = arrayPoolBuffer;
AvailableMemory = arrayPoolBuffer;
}
public void Reset()
{
ResetMemory();
base.Next = null;
base.RunningIndex = 0L;
_next = null;
}
public void ResetMemory()
{
IMemoryOwner<byte> memoryOwner = _memoryOwner;
if (memoryOwner != null)
{
_memoryOwner = null;
memoryOwner.Dispose();
}
else
{
ArrayPool<byte>.Shared.Return(_array);
_array = null;
}
base.Memory = default(ReadOnlyMemory<byte>);
_end = 0;
AvailableMemory = default(Memory<byte>);
}
public void SetNext(BufferSegment segment)
{
NextSegment = segment;
segment = this;
while (segment.Next != null)
{
segment.NextSegment.RunningIndex = segment.RunningIndex + segment.Length;
segment = segment.NextSegment;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static long GetLength(BufferSegment startSegment, int startIndex, BufferSegment endSegment, int endIndex)
{
return endSegment.RunningIndex + (uint)endIndex - (startSegment.RunningIndex + (uint)startIndex);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static long GetLength(long startPosition, BufferSegment endSegment, int endIndex)
{
return endSegment.RunningIndex + (uint)endIndex - startPosition;
}
}
internal readonly struct CompletionData
{
public Action<object?> Completion { get; }
public object? CompletionState { get; }
public ExecutionContext? ExecutionContext { get; }
public SynchronizationContext? SynchronizationContext { get; }
public CompletionData(Action<object?> completion, object? completionState, ExecutionContext? executionContext, SynchronizationContext? synchronizationContext)
{
Completion = completion;
CompletionState = completionState;
ExecutionContext = executionContext;
SynchronizationContext = synchronizationContext;
}
}
public struct FlushResult
{
internal ResultFlags _resultFlags;
public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0;
public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0;
public FlushResult(bool isCanceled, bool isCompleted)
{
_resultFlags = ResultFlags.None;
if (isCanceled)
{
_resultFlags |= ResultFlags.Canceled;
}
if (isCompleted)
{
_resultFlags |= ResultFlags.Completed;
}
}
}
internal sealed class InlineScheduler : PipeScheduler
{
public override void Schedule(Action<object?> action, object? state)
{
action(state);
}
internal override void UnsafeSchedule(Action<object?> action, object? state)
{
action(state);
}
}
public interface IDuplexPipe
{
PipeReader Input { get; }
PipeWriter Output { get; }
}
internal struct BufferSegmentStack
{
private readonly struct SegmentAsValueType
{
private readonly BufferSegment _value;
private SegmentAsValueType(BufferSegment value)
{
_value = value;
}
public static implicit operator SegmentAsValueType(BufferSegment s)
{
return new SegmentAsValueType(s);
}
public static implicit operator BufferSegment(SegmentAsValueType s)
{
return s._value;
}
}
private SegmentAsValueType[] _array;
private int _size;
public int Count => _size;
public BufferSegmentStack(int size)
{
_array = new SegmentAsValueType[size];
_size = 0;
}
public bool TryPop([NotNullWhen(true)] out BufferSegment? result)
{
int num = _size - 1;
SegmentAsValueType[] array = _array;
if ((uint)num >= (uint)array.Length)
{
result = null;
return false;
}
_size = num;
result = array[num];
array[num] = default(SegmentAsValueType);
return true;
}
public void Push(BufferSegment item)
{
int size = _size;
SegmentAsValueType[] array = _array;
if ((uint)size < (uint)array.Length)
{
array[size] = item;
_size = size + 1;
}
else
{
PushWithResize(item);
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void PushWithResize(BufferSegment item)
{
Array.Resize(ref _array, 2 * _array.Length);
_array[_size] = item;
_size++;
}
}
public sealed class Pipe
{
private sealed class DefaultPipeReader : PipeReader, IValueTaskSource<ReadResult>
{
private readonly Pipe _pipe;
public DefaultPipeReader(Pipe pipe)
{
_pipe = pipe;
}
public override bool TryRead(out ReadResult result)
{
return _pipe.TryRead(out result);
}
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.ReadAsync(cancellationToken);
}
protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumBytes, CancellationToken cancellationToken)
{
return _pipe.ReadAtLeastAsync(minimumBytes, cancellationToken);
}
public override void AdvanceTo(SequencePosition consumed)
{
_pipe.AdvanceReader(in consumed);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_pipe.AdvanceReader(in consumed, in examined);
}
public override void CancelPendingRead()
{
_pipe.CancelPendingRead();
}
public override void Complete(Exception exception = null)
{
_pipe.CompleteReader(exception);
}
public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
_pipe.OnWriterCompleted(callback, state);
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _pipe.GetReadAsyncStatus();
}
public ReadResult GetResult(short token)
{
return _pipe.GetReadAsyncResult();
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_pipe.OnReadAsyncCompleted(continuation, state, flags);
}
}
private sealed class DefaultPipeWriter : PipeWriter, IValueTaskSource<FlushResult>
{
private readonly Pipe _pipe;
public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _pipe.GetUnflushedBytes();
public DefaultPipeWriter(Pipe pipe)
{
_pipe = pipe;
}
public override void Complete(Exception exception = null)
{
_pipe.CompleteWriter(exception);
}
public override void CancelPendingFlush()
{
_pipe.CancelPendingFlush();
}
public override void OnReaderCompleted(Action<Exception, object> callback, object state)
{
_pipe.OnReaderCompleted(callback, state);
}
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.FlushAsync(cancellationToken);
}
public override void Advance(int bytes)
{
_pipe.Advance(bytes);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
return _pipe.GetMemory(sizeHint);
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
return _pipe.GetSpan(sizeHint);
}
public ValueTaskSourceStatus GetStatus(short token)
{
return _pipe.GetFlushAsyncStatus();
}
public FlushResult GetResult(short token)
{
return _pipe.GetFlushAsyncResult();
}
public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
_pipe.OnFlushAsyncCompleted(continuation, state, flags);
}
public override ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
return _pipe.WriteAsync(source, cancellationToken);
}
}
private static readonly Action<object> s_signalReaderAwaitable = delegate(object state)
{
((Pipe)state).ReaderCancellationRequested();
};
private static readonly Action<object> s_signalWriterAwaitable = delegate(object state)
{
((Pipe)state).WriterCancellationRequested();
};
private static readonly Action<object> s_invokeCompletionCallbacks = delegate(object state)
{
((PipeCompletionCallbacks)state).Execute();
};
private static readonly ContextCallback s_executionContextRawCallback = ExecuteWithoutExecutionContext;
private static readonly SendOrPostCallback s_syncContextExecutionContextCallback = ExecuteWithExecutionContext;
private static readonly SendOrPostCallback s_syncContextExecuteWithoutExecutionContextCallback = ExecuteWithoutExecutionContext;
private static readonly Action<object> s_scheduleWithExecutionContextCallback = ExecuteWithExecutionContext;
private BufferSegmentStack _bufferSegmentPool;
private readonly DefaultPipeReader _reader;
private readonly DefaultPipeWriter _writer;
private readonly PipeOptions _options;
private readonly object _sync = new object();
private long _unconsumedBytes;
private long _unflushedBytes;
private PipeAwaitable _readerAwaitable;
private PipeAwaitable _writerAwaitable;
private PipeCompletion _writerCompletion;
private PipeCompletion _readerCompletion;
private long _lastExaminedIndex = -1L;
private BufferSegment _readHead;
private int _readHeadIndex;
private bool _disposed;
private BufferSegment _readTail;
private int _readTailIndex;
private int _minimumReadBytes;
private BufferSegment _writingHead;
private Memory<byte> _writingHeadMemory;
private int _writingHeadBytesBuffered;
private PipeOperationState _operationState;
private bool UseSynchronizationContext => _options.UseSynchronizationContext;
private int MinimumSegmentSize => _options.MinimumSegmentSize;
private long PauseWriterThreshold => _options.PauseWriterThreshold;
private long ResumeWriterThreshold => _options.ResumeWriterThreshold;
private PipeScheduler ReaderScheduler => _options.ReaderScheduler;
private PipeScheduler WriterScheduler => _options.WriterScheduler;
private object SyncObj => _sync;
internal long Length => _unconsumedBytes;
public PipeReader Reader => _reader;
public PipeWriter Writer => _writer;
public Pipe()
: this(PipeOptions.Default)
{
}
public Pipe(PipeOptions options)
{
if (options == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
}
_bufferSegmentPool = new BufferSegmentStack(options.InitialSegmentPoolSize);
_operationState = default(PipeOperationState);
_readerCompletion = default(PipeCompletion);
_writerCompletion = default(PipeCompletion);
_options = options;
_readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext);
_writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext);
_reader = new DefaultPipeReader(this);
_writer = new DefaultPipeWriter(this);
}
private void ResetState()
{
_readerCompletion.Reset();
_writerCompletion.Reset();
_readerAwaitable = new PipeAwaitable(completed: false, UseSynchronizationContext);
_writerAwaitable = new PipeAwaitable(completed: true, UseSynchronizationContext);
_readTailIndex = 0;
_readHeadIndex = 0;
_lastExaminedIndex = -1L;
_unflushedBytes = 0L;
_unconsumedBytes = 0L;
}
internal Memory<byte> GetMemory(int sizeHint)
{
if (_writerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
}
AllocateWriteHeadIfNeeded(sizeHint);
return _writingHeadMemory;
}
internal Span<byte> GetSpan(int sizeHint)
{
if (_writerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
}
AllocateWriteHeadIfNeeded(sizeHint);
return _writingHeadMemory.Span;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AllocateWriteHeadIfNeeded(int sizeHint)
{
if (!_operationState.IsWritingActive || _writingHeadMemory.Length == 0 || _writingHeadMemory.Length < sizeHint)
{
AllocateWriteHeadSynchronized(sizeHint);
}
}
private void AllocateWriteHeadSynchronized(int sizeHint)
{
lock (SyncObj)
{
_operationState.BeginWrite();
if (_writingHead == null)
{
BufferSegment readTail = AllocateSegment(sizeHint);
_writingHead = (_readHead = (_readTail = readTail));
_lastExaminedIndex = 0L;
return;
}
int length = _writingHeadMemory.Length;
if (length == 0 || length < sizeHint)
{
if (_writingHeadBytesBuffered > 0)
{
_writingHead.End += _writingHeadBytesBuffered;
_writingHeadBytesBuffered = 0;
}
if (_writingHead.Length == 0)
{
_writingHead.ResetMemory();
RentMemory(_writingHead, sizeHint);
}
else
{
BufferSegment bufferSegment = AllocateSegment(sizeHint);
_writingHead.SetNext(bufferSegment);
_writingHead = bufferSegment;
}
}
}
}
private BufferSegment AllocateSegment(int sizeHint)
{
BufferSegment bufferSegment = CreateSegmentUnsynchronized();
RentMemory(bufferSegment, sizeHint);
return bufferSegment;
}
private void RentMemory(BufferSegment segment, int sizeHint)
{
MemoryPool<byte> memoryPool = null;
int num = -1;
if (!_options.IsDefaultSharedMemoryPool)
{
memoryPool = _options.Pool;
num = memoryPool.MaxBufferSize;
}
if (sizeHint <= num)
{
segment.SetOwnedMemory(memoryPool.Rent(GetSegmentSize(sizeHint, num)));
}
else
{
int segmentSize = GetSegmentSize(sizeHint);
segment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize));
}
_writingHeadMemory = segment.AvailableMemory;
}
private int GetSegmentSize(int sizeHint, int maxBufferSize = int.MaxValue)
{
sizeHint = Math.Max(MinimumSegmentSize, sizeHint);
return Math.Min(maxBufferSize, sizeHint);
}
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment result))
{
return result;
}
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
if (_bufferSegmentPool.Count < _options.MaxSegmentPoolSize)
{
_bufferSegmentPool.Push(segment);
}
}
internal bool CommitUnsynchronized()
{
_operationState.EndWrite();
if (_unflushedBytes == 0L)
{
return false;
}
_writingHead.End += _writingHeadBytesBuffered;
_readTail = _writingHead;
_readTailIndex = _writingHead.End;
long unconsumedBytes = _unconsumedBytes;
_unconsumedBytes += _unflushedBytes;
bool result = true;
if (_unconsumedBytes < _minimumReadBytes)
{
result = false;
}
else if (PauseWriterThreshold > 0 && unconsumedBytes < PauseWriterThreshold && _unconsumedBytes >= PauseWriterThreshold && !_readerCompletion.IsCompleted)
{
_writerAwaitable.SetUncompleted();
}
_unflushedBytes = 0L;
_writingHeadBytesBuffered = 0;
return result;
}
internal void Advance(int bytes)
{
lock (SyncObj)
{
if ((uint)bytes > (uint)_writingHeadMemory.Length)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}
if (!_readerCompletion.IsCompleted)
{
AdvanceCore(bytes);
}
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void AdvanceCore(int bytesWritten)
{
_unflushedBytes += bytesWritten;
_writingHeadBytesBuffered += bytesWritten;
_writingHeadMemory = _writingHeadMemory.Slice(bytesWritten);
}
internal ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
}
CompletionData completionData;
ValueTask<FlushResult> result;
lock (SyncObj)
{
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, in completionData);
return result;
}
private void PrepareFlushUnsynchronized(out CompletionData completionData, out ValueTask<FlushResult> result, CancellationToken cancellationToken)
{
bool num = CommitUnsynchronized();
_writerAwaitable.BeginOperation(cancellationToken, s_signalWriterAwaitable, this);
if (_writerAwaitable.IsCompleted)
{
FlushResult result2 = default(FlushResult);
GetFlushResult(ref result2);
result = new ValueTask<FlushResult>(result2);
}
else
{
result = new ValueTask<FlushResult>(_writer, 0);
}
if (num)
{
_readerAwaitable.Complete(out completionData);
}
else
{
completionData = default(CompletionData);
}
}
internal void CompleteWriter(Exception? exception)
{
PipeCompletionCallbacks pipeCompletionCallbacks;
CompletionData completionData;
bool isCompleted;
lock (SyncObj)
{
CommitUnsynchronized();
pipeCompletionCallbacks = _writerCompletion.TryComplete(exception);
_readerAwaitable.Complete(out completionData);
isCompleted = _readerCompletion.IsCompleted;
}
if (isCompleted)
{
CompletePipe();
}
if (pipeCompletionCallbacks != null)
{
ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks);
}
TrySchedule(ReaderScheduler, in completionData);
}
internal void AdvanceReader(in SequencePosition consumed)
{
AdvanceReader(in consumed, in consumed);
}
internal void AdvanceReader(in SequencePosition consumed, in SequencePosition examined)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
AdvanceReader((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}
private void AdvanceReader(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
if (consumedSegment != null && examinedSegment != null && BufferSegment.GetLength(consumedSegment, consumedIndex, examinedSegment, examinedIndex) < 0)
{
ThrowHelper.ThrowInvalidOperationException_InvalidExaminedOrConsumedPosition();
}
BufferSegment bufferSegment = null;
BufferSegment returnEnd = null;
CompletionData completionData = default(CompletionData);
lock (SyncObj)
{
bool flag = false;
if (examinedSegment == _readTail)
{
flag = examinedIndex == _readTailIndex;
}
if (examinedSegment != null && _lastExaminedIndex >= 0)
{
long length = BufferSegment.GetLength(_lastExaminedIndex, examinedSegment, examinedIndex);
long unconsumedBytes = _unconsumedBytes;
_unconsumedBytes -= length;
_lastExaminedIndex = examinedSegment.RunningIndex + examinedIndex;
if (unconsumedBytes >= ResumeWriterThreshold && _unconsumedBytes < ResumeWriterThreshold)
{
_writerAwaitable.Complete(out completionData);
}
}
if (consumedSegment != null)
{
if (_readHead == null)
{
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
return;
}
bufferSegment = _readHead;
returnEnd = consumedSegment;
if (consumedIndex == returnEnd.Length)
{
if (_writingHead != returnEnd)
{
MoveReturnEndToNextBlock();
}
else if (_writingHeadBytesBuffered == 0 && !_operationState.IsWritingActive)
{
_writingHead = null;
_writingHeadMemory = default(Memory<byte>);
MoveReturnEndToNextBlock();
}
else
{
_readHead = consumedSegment;
_readHeadIndex = consumedIndex;
}
}
else
{
_readHead = consumedSegment;
_readHeadIndex = consumedIndex;
}
}
if (flag && !_writerCompletion.IsCompleted)
{
_readerAwaitable.SetUncompleted();
}
while (bufferSegment != null && bufferSegment != returnEnd)
{
BufferSegment? nextSegment = bufferSegment.NextSegment;
bufferSegment.Reset();
ReturnSegmentUnsynchronized(bufferSegment);
bufferSegment = nextSegment;
}
_operationState.EndRead();
}
TrySchedule(WriterScheduler, in completionData);
void MoveReturnEndToNextBlock()
{
BufferSegment nextSegment2 = returnEnd.NextSegment;
if (_readTail == returnEnd)
{
_readTail = nextSegment2;
_readTailIndex = 0;
}
_readHead = nextSegment2;
_readHeadIndex = 0;
returnEnd = nextSegment2;
}
}
internal void CompleteReader(Exception? exception)
{
PipeCompletionCallbacks pipeCompletionCallbacks;
CompletionData completionData;
bool isCompleted;
lock (SyncObj)
{
if (_operationState.IsReadingActive)
{
_operationState.EndRead();
}
pipeCompletionCallbacks = _readerCompletion.TryComplete(exception);
_writerAwaitable.Complete(out completionData);
isCompleted = _writerCompletion.IsCompleted;
}
if (isCompleted)
{
CompletePipe();
}
if (pipeCompletionCallbacks != null)
{
ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks);
}
TrySchedule(WriterScheduler, in completionData);
}
internal void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
{
if (callback == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
}
PipeCompletionCallbacks pipeCompletionCallbacks;
lock (SyncObj)
{
pipeCompletionCallbacks = _writerCompletion.AddCallback(callback, state);
}
if (pipeCompletionCallbacks != null)
{
ScheduleCallbacks(ReaderScheduler, pipeCompletionCallbacks);
}
}
internal void CancelPendingRead()
{
CompletionData completionData;
lock (SyncObj)
{
_readerAwaitable.Cancel(out completionData);
}
TrySchedule(ReaderScheduler, in completionData);
}
internal void CancelPendingFlush()
{
CompletionData completionData;
lock (SyncObj)
{
_writerAwaitable.Cancel(out completionData);
}
TrySchedule(WriterScheduler, in completionData);
}
internal void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
{
if (callback == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callback);
}
PipeCompletionCallbacks pipeCompletionCallbacks;
lock (SyncObj)
{
pipeCompletionCallbacks = _readerCompletion.AddCallback(callback, state);
}
if (pipeCompletionCallbacks != null)
{
ScheduleCallbacks(WriterScheduler, pipeCompletionCallbacks);
}
}
internal ValueTask<ReadResult> ReadAtLeastAsync(int minimumBytes, CancellationToken token)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
if (token.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
}
CompletionData completionData = default(CompletionData);
ValueTask<ReadResult> result2;
lock (SyncObj)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out var result);
if (_unconsumedBytes >= minimumBytes || result.IsCanceled || result.IsCompleted)
{
return new ValueTask<ReadResult>(result);
}
_readerAwaitable.SetUncompleted();
_operationState.EndRead();
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
}
if (!_writerAwaitable.IsCompleted)
{
_writerAwaitable.Complete(out completionData);
}
_minimumReadBytes = minimumBytes;
result2 = new ValueTask<ReadResult>(_reader, 0);
}
TrySchedule(WriterScheduler, in completionData);
return result2;
}
internal ValueTask<ReadResult> ReadAsync(CancellationToken token)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
if (token.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(token));
}
lock (SyncObj)
{
_readerAwaitable.BeginOperation(token, s_signalReaderAwaitable, this);
if (_readerAwaitable.IsCompleted)
{
GetReadResult(out var result);
return new ValueTask<ReadResult>(result);
}
return new ValueTask<ReadResult>(_reader, 0);
}
}
internal bool TryRead(out ReadResult result)
{
lock (SyncObj)
{
if (_readerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
if (_unconsumedBytes > 0 || _readerAwaitable.IsCompleted)
{
GetReadResult(out result);
return true;
}
if (_readerAwaitable.IsRunning)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
_operationState.BeginReadTentative();
result = default(ReadResult);
return false;
}
}
private static void ScheduleCallbacks(PipeScheduler scheduler, PipeCompletionCallbacks completionCallbacks)
{
scheduler.UnsafeSchedule(s_invokeCompletionCallbacks, completionCallbacks);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void TrySchedule(PipeScheduler scheduler, in CompletionData completionData)
{
Action<object> completion = completionData.Completion;
if (completion != null)
{
if (completionData.SynchronizationContext == null && completionData.ExecutionContext == null)
{
scheduler.UnsafeSchedule(completion, completionData.CompletionState);
}
else
{
ScheduleWithContext(scheduler, in completionData);
}
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private static void ScheduleWithContext(PipeScheduler scheduler, in CompletionData completionData)
{
if (completionData.SynchronizationContext == null)
{
scheduler.UnsafeSchedule(s_scheduleWithExecutionContextCallback, completionData);
}
else if (completionData.ExecutionContext == null)
{
completionData.SynchronizationContext.Post(s_syncContextExecuteWithoutExecutionContextCallback, completionData);
}
else
{
completionData.SynchronizationContext.Post(s_syncContextExecutionContextCallback, completionData);
}
}
private static void ExecuteWithoutExecutionContext(object state)
{
CompletionData completionData = (CompletionData)state;
completionData.Completion(completionData.CompletionState);
}
private static void ExecuteWithExecutionContext(object state)
{
ExecutionContext.Run(((CompletionData)state).ExecutionContext, s_executionContextRawCallback, state);
}
private void CompletePipe()
{
lock (SyncObj)
{
if (!_disposed)
{
_disposed = true;
BufferSegment bufferSegment = _readHead ?? _readTail;
while (bufferSegment != null)
{
BufferSegment bufferSegment2 = bufferSegment;
bufferSegment = bufferSegment.NextSegment;
bufferSegment2.Reset();
}
_writingHead = null;
_writingHeadMemory = default(Memory<byte>);
_readHead = null;
_readTail = null;
_lastExaminedIndex = -1L;
}
}
}
internal ValueTaskSourceStatus GetReadAsyncStatus()
{
if (_readerAwaitable.IsCompleted)
{
if (_writerCompletion.IsFaulted)
{
return ValueTaskSourceStatus.Faulted;
}
return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}
internal void OnReadAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
lock (SyncObj)
{
_readerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
}
if (doubleCompletion)
{
Writer.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
}
TrySchedule(ReaderScheduler, in completionData);
}
internal ReadResult GetReadAsyncResult()
{
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
CancellationToken cancellationToken = default(CancellationToken);
ReadResult result;
try
{
lock (SyncObj)
{
if (!_readerAwaitable.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
}
cancellationTokenRegistration = _readerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
GetReadResult(out result);
}
}
finally
{
cancellationTokenRegistration.Dispose();
}
if (result.IsCanceled)
{
cancellationToken.ThrowIfCancellationRequested();
}
return result;
}
private void GetReadResult(out ReadResult result)
{
bool isCompleted = _writerCompletion.IsCompletedOrThrow();
bool flag = _readerAwaitable.ObserveCancellation();
BufferSegment readHead = _readHead;
if (readHead != null)
{
ReadOnlySequence<byte> buffer = new ReadOnlySequence<byte>(readHead, _readHeadIndex, _readTail, _readTailIndex);
result = new ReadResult(buffer, flag, isCompleted);
}
else
{
result = new ReadResult(default(ReadOnlySequence<byte>), flag, isCompleted);
}
if (flag)
{
_operationState.BeginReadTentative();
}
else
{
_operationState.BeginRead();
}
_minimumReadBytes = 0;
}
internal ValueTaskSourceStatus GetFlushAsyncStatus()
{
if (_writerAwaitable.IsCompleted)
{
if (_readerCompletion.IsFaulted)
{
return ValueTaskSourceStatus.Faulted;
}
return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}
internal FlushResult GetFlushAsyncResult()
{
FlushResult result = default(FlushResult);
CancellationToken cancellationToken = default(CancellationToken);
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
try
{
lock (SyncObj)
{
if (!_writerAwaitable.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_GetResultNotCompleted();
}
GetFlushResult(ref result);
cancellationTokenRegistration = _writerAwaitable.ReleaseCancellationTokenRegistration(out cancellationToken);
return result;
}
}
finally
{
cancellationTokenRegistration.Dispose();
cancellationToken.ThrowIfCancellationRequested();
}
}
internal long GetUnflushedBytes()
{
return _unflushedBytes;
}
private void GetFlushResult(ref FlushResult result)
{
if (_writerAwaitable.ObserveCancellation())
{
result._resultFlags |= ResultFlags.Canceled;
}
if (_readerCompletion.IsCompletedOrThrow())
{
result._resultFlags |= ResultFlags.Completed;
}
}
internal ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
if (_writerCompletion.IsCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (_readerCompletion.IsCompletedOrThrow())
{
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: true));
}
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
}
CompletionData completionData;
ValueTask<FlushResult> result;
lock (SyncObj)
{
AllocateWriteHeadIfNeeded(0);
if (source.Length <= _writingHeadMemory.Length)
{
source.CopyTo(_writingHeadMemory);
AdvanceCore(source.Length);
}
else
{
WriteMultiSegment(source.Span);
}
PrepareFlushUnsynchronized(out completionData, out result, cancellationToken);
}
TrySchedule(ReaderScheduler, in completionData);
return result;
}
private void WriteMultiSegment(ReadOnlySpan<byte> source)
{
Span<byte> span = _writingHeadMemory.Span;
while (true)
{
int num = Math.Min(span.Length, source.Length);
source.Slice(0, num).CopyTo(span);
source = source.Slice(num);
AdvanceCore(num);
if (source.Length != 0)
{
_writingHead.End += _writingHeadBytesBuffered;
_writingHeadBytesBuffered = 0;
BufferSegment bufferSegment = AllocateSegment(0);
_writingHead.SetNext(bufferSegment);
_writingHead = bufferSegment;
span = _writingHeadMemory.Span;
continue;
}
break;
}
}
internal void OnFlushAsyncCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;
lock (SyncObj)
{
_writerAwaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
}
if (doubleCompletion)
{
Reader.Complete(ThrowHelper.CreateInvalidOperationException_NoConcurrentOperation());
}
TrySchedule(WriterScheduler, in completionData);
}
private void ReaderCancellationRequested()
{
CompletionData completionData;
lock (SyncObj)
{
_readerAwaitable.CancellationTokenFired(out completionData);
}
TrySchedule(ReaderScheduler, in completionData);
}
private void WriterCancellationRequested()
{
CompletionData completionData;
lock (SyncObj)
{
_writerAwaitable.CancellationTokenFired(out completionData);
}
TrySchedule(WriterScheduler, in completionData);
}
public void Reset()
{
lock (SyncObj)
{
if (!_disposed)
{
ThrowHelper.ThrowInvalidOperationException_ResetIncompleteReaderWriter();
}
_disposed = false;
ResetState();
}
}
}
[DebuggerDisplay("CanceledState = {_awaitableState}, IsCompleted = {IsCompleted}")]
internal struct PipeAwaitable
{
[Flags]
private enum AwaitableState
{
None = 0,
Completed = 1,
Running = 2,
Canceled = 4,
UseSynchronizationContext = 8
}
private sealed class SchedulingContext
{
public SynchronizationContext SynchronizationContext { get; set; }
public ExecutionContext ExecutionContext { get; set; }
}
private AwaitableState _awaitableState;
private Action<object> _completion;
private object _completionState;
private SchedulingContext _schedulingContext;
private CancellationTokenRegistration _cancellationTokenRegistration;
private CancellationToken _cancellationToken;
private CancellationToken CancellationToken => _cancellationToken;
public bool IsCompleted => (_awaitableState & (AwaitableState.Completed | AwaitableState.Canceled)) != 0;
public bool IsRunning => (_awaitableState & AwaitableState.Running) != 0;
public PipeAwaitable(bool completed, bool useSynchronizationContext)
{
_awaitableState = (completed ? AwaitableState.Completed : AwaitableState.None) | (useSynchronizationContext ? AwaitableState.UseSynchronizationContext : AwaitableState.None);
_completion = null;
_completionState = null;
_cancellationTokenRegistration = default(CancellationTokenRegistration);
_schedulingContext = null;
_cancellationToken = CancellationToken.None;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginOperation(CancellationToken cancellationToken, Action<object?> callback, object? state)
{
if (cancellationToken.CanBeCanceled && !IsCompleted)
{
_cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, callback, state);
if (_cancellationTokenRegistration == default(CancellationTokenRegistration))
{
cancellationToken.ThrowIfCancellationRequested();
}
_cancellationToken = cancellationToken;
}
_awaitableState |= AwaitableState.Running;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Complete(out CompletionData completionData)
{
ExtractCompletion(out completionData);
_awaitableState |= AwaitableState.Completed;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ExtractCompletion(out CompletionData completionData)
{
Action<object> completion = _completion;
object completionState = _completionState;
SchedulingContext schedulingContext = _schedulingContext;
ExecutionContext executionContext = schedulingContext?.ExecutionContext;
SynchronizationContext synchronizationContext = schedulingContext?.SynchronizationContext;
_completion = null;
_completionState = null;
_schedulingContext = null;
completionData = ((completion != null) ? new CompletionData(completion, completionState, executionContext, synchronizationContext) : default(CompletionData));
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetUncompleted()
{
_awaitableState &= ~AwaitableState.Completed;
}
public void OnCompleted(Action<object?> continuation, object? state, ValueTaskSourceOnCompletedFlags flags, out CompletionData completionData, out bool doubleCompletion)
{
completionData = default(CompletionData);
doubleCompletion = _completion != null;
if (IsCompleted | doubleCompletion)
{
completionData = new CompletionData(continuation, state, _schedulingContext?.ExecutionContext, _schedulingContext?.SynchronizationContext);
return;
}
_completion = continuation;
_completionState = state;
if ((_awaitableState & AwaitableState.UseSynchronizationContext) != 0 && (flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
{
SynchronizationContext current = SynchronizationContext.Current;
if (current != null && current.GetType() != typeof(SynchronizationContext))
{
if (_schedulingContext == null)
{
_schedulingContext = new SchedulingContext();
}
_schedulingContext.SynchronizationContext = current;
}
}
if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
if (_schedulingContext == null)
{
_schedulingContext = new SchedulingContext();
}
_schedulingContext.ExecutionContext = ExecutionContext.Capture();
}
}
public void Cancel(out CompletionData completionData)
{
ExtractCompletion(out completionData);
_awaitableState |= AwaitableState.Canceled;
}
public void CancellationTokenFired(out CompletionData completionData)
{
if (CancellationToken.IsCancellationRequested)
{
Cancel(out completionData);
}
else
{
completionData = default(CompletionData);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool ObserveCancellation()
{
bool result = (_awaitableState & AwaitableState.Canceled) == AwaitableState.Canceled;
_awaitableState &= ~(AwaitableState.Running | AwaitableState.Canceled);
return result;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public CancellationTokenRegistration ReleaseCancellationTokenRegistration(out CancellationToken cancellationToken)
{
cancellationToken = CancellationToken;
CancellationTokenRegistration cancellationTokenRegistration = _cancellationTokenRegistration;
_cancellationToken = default(CancellationToken);
_cancellationTokenRegistration = default(CancellationTokenRegistration);
return cancellationTokenRegistration;
}
}
[DebuggerDisplay("IsCompleted = {IsCompleted}")]
internal struct PipeCompletion
{
private static readonly object s_completedSuccessfully = new object();
private object _state;
private List<PipeCompletionCallback> _callbacks;
public bool IsCompleted => _state != null;
public bool IsFaulted => _state is ExceptionDispatchInfo;
public PipeCompletionCallbacks? TryComplete(Exception? exception = null)
{
if (_state == null)
{
if (exception != null)
{
_state = ExceptionDispatchInfo.Capture(exception);
}
else
{
_state = s_completedSuccessfully;
}
}
return GetCallbacks();
}
public PipeCompletionCallbacks? AddCallback(Action<Exception?, object?> callback, object? state)
{
if (_callbacks == null)
{
_callbacks = new List<PipeCompletionCallback>();
}
_callbacks.Add(new PipeCompletionCallback(callback, state));
if (IsCompleted)
{
return GetCallbacks();
}
return null;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsCompletedOrThrow()
{
if (!IsCompleted)
{
return false;
}
if (_state is ExceptionDispatchInfo exceptionDispatchInfo)
{
exceptionDispatchInfo.Throw();
}
return true;
}
private PipeCompletionCallbacks GetCallbacks()
{
List<PipeCompletionCallback> callbacks = _callbacks;
if (callbacks == null)
{
return null;
}
_callbacks = null;
return new PipeCompletionCallbacks(callbacks, _state as ExceptionDispatchInfo);
}
public void Reset()
{
_state = null;
}
public override string ToString()
{
return string.Format("{0}: {1}", "IsCompleted", IsCompleted);
}
}
internal readonly struct PipeCompletionCallback
{
public readonly Action<Exception?, object?> Callback;
public readonly object? State;
public PipeCompletionCallback(Action<Exception?, object?> callback, object? state)
{
Callback = callback;
State = state;
}
}
internal sealed class PipeCompletionCallbacks
{
private readonly List<PipeCompletionCallback> _callbacks;
private readonly Exception _exception;
public PipeCompletionCallbacks(List<PipeCompletionCallback> callbacks, ExceptionDispatchInfo? edi)
{
_callbacks = callbacks;
_exception = edi?.SourceException;
}
public void Execute()
{
int count = _callbacks.Count;
if (count != 0)
{
List<Exception> exceptions = null;
for (int i = 0; i < count; i++)
{
PipeCompletionCallback callback = _callbacks[i];
Execute(callback, ref exceptions);
}
if (exceptions != null)
{
throw new AggregateException(exceptions);
}
}
}
private void Execute(PipeCompletionCallback callback, ref List<Exception> exceptions)
{
try
{
callback.Callback(_exception, callback.State);
}
catch (Exception item)
{
if (exceptions == null)
{
exceptions = new List<Exception>();
}
exceptions.Add(item);
}
}
}
public class PipeOptions
{
private const int DefaultMinimumSegmentSize = 4096;
public static PipeOptions Default { get; } = new PipeOptions(null, null, null, -1L, -1L);
public bool UseSynchronizationContext { get; }
public long PauseWriterThreshold { get; }
public long ResumeWriterThreshold { get; }
public int MinimumSegmentSize { get; }
public PipeScheduler WriterScheduler { get; }
public PipeScheduler ReaderScheduler { get; }
public MemoryPool<byte> Pool { get; }
internal bool IsDefaultSharedMemoryPool { get; }
internal int InitialSegmentPoolSize { get; }
internal int MaxSegmentPoolSize { get; }
public PipeOptions(MemoryPool<byte>? pool = null, PipeScheduler? readerScheduler = null, PipeScheduler? writerScheduler = null, long pauseWriterThreshold = -1L, long resumeWriterThreshold = -1L, int minimumSegmentSize = -1, bool useSynchronizationContext = true)
{
MinimumSegmentSize = ((minimumSegmentSize == -1) ? 4096 : minimumSegmentSize);
InitialSegmentPoolSize = 4;
MaxSegmentPoolSize = 256;
if (pauseWriterThreshold == -1)
{
pauseWriterThreshold = 65536L;
}
else if (pauseWriterThreshold < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.pauseWriterThreshold);
}
switch (resumeWriterThreshold)
{
case -1L:
resumeWriterThreshold = 32768L;
break;
case 0L:
resumeWriterThreshold = 1L;
break;
}
if (resumeWriterThreshold < 0 || (pauseWriterThreshold > 0 && resumeWriterThreshold > pauseWriterThreshold))
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.resumeWriterThreshold);
}
Pool = pool ?? MemoryPool<byte>.Shared;
IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;
ReaderScheduler = readerScheduler ?? PipeScheduler.ThreadPool;
WriterScheduler = writerScheduler ?? PipeScheduler.ThreadPool;
PauseWriterThreshold = pauseWriterThreshold;
ResumeWriterThreshold = resumeWriterThreshold;
UseSynchronizationContext = useSynchronizationContext;
}
}
public abstract class PipeReader
{
private PipeReaderStream _stream;
public abstract bool TryRead(out ReadResult result);
public abstract ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken));
public ValueTask<ReadResult> ReadAtLeastAsync(int minimumSize, CancellationToken cancellationToken = default(CancellationToken))
{
if (minimumSize < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.minimumSize);
}
return ReadAtLeastAsyncCore(minimumSize, cancellationToken);
}
protected virtual async ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
{
ReadResult result;
while (true)
{
result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
ReadOnlySequence<byte> buffer = result.Buffer;
if (buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled)
{
break;
}
AdvanceTo(buffer.Start, buffer.End);
}
return result;
}
public abstract void AdvanceTo(SequencePosition consumed);
public abstract void AdvanceTo(SequencePosition consumed, SequencePosition examined);
public virtual Stream AsStream(bool leaveOpen = false)
{
if (_stream == null)
{
_stream = new PipeReaderStream(this, leaveOpen);
}
else if (leaveOpen)
{
_stream.LeaveOpen = leaveOpen;
}
return _stream;
}
public abstract void CancelPendingRead();
public abstract void Complete(Exception? exception = null);
public virtual ValueTask CompleteAsync(Exception? exception = null)
{
try
{
Complete(exception);
return default(ValueTask);
}
catch (Exception exception2)
{
return new ValueTask(Task.FromException(exception2));
}
}
[Obsolete("OnWriterCompleted has been deprecated and may not be invoked on all implementations of PipeReader.")]
public virtual void OnWriterCompleted(Action<Exception?, object?> callback, object? state)
{
}
public static PipeReader Create(Stream stream, StreamPipeReaderOptions? readerOptions = null)
{
return new StreamPipeReader(stream, readerOptions ?? StreamPipeReaderOptions.s_default);
}
public static PipeReader Create(ReadOnlySequence<byte> sequence)
{
return new SequencePipeReader(sequence);
}
public virtual Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken))
{
if (destination == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination);
}
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
return CopyToAsyncCore(destination, (PipeWriter destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken) => destination.WriteAsync(memory, cancellationToken), cancellationToken);
}
public virtual Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken))
{
if (destination == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination);
}
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
return CopyToAsyncCore(destination, delegate(Stream destination, ReadOnlyMemory<byte> memory, CancellationToken cancellationToken)
{
ValueTask writeTask2 = StreamExtensions.WriteAsync(destination, memory, cancellationToken);
if (writeTask2.IsCompletedSuccessfully)
{
writeTask2.GetAwaiter().GetResult();
return new ValueTask<FlushResult>(new FlushResult(isCanceled: false, isCompleted: false));
}
return Awaited(writeTask2);
}, cancellationToken);
static async ValueTask<FlushResult> Awaited(ValueTask writeTask)
{
await writeTask.ConfigureAwait(continueOnCapturedContext: false);
return new FlushResult(isCanceled: false, isCompleted: false);
}
}
private async Task CopyToAsyncCore<TStream>(TStream destination, Func<TStream, ReadOnlyMemory<byte>, CancellationToken, ValueTask<FlushResult>> writeAsync, CancellationToken cancellationToken)
{
while (true)
{
ReadResult result = await ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition position = buffer.Start;
SequencePosition consumed = position;
try
{
if (result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
ReadOnlyMemory<byte> memory;
while (buffer.TryGet(ref position, out memory))
{
if (memory.IsEmpty)
{
consumed = position;
continue;
}
FlushResult flushResult = await writeAsync(destination, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
if (flushResult.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
consumed = position;
if (!flushResult.IsCompleted)
{
continue;
}
return;
}
consumed = buffer.End;
if (result.IsCompleted)
{
break;
}
}
finally
{
AdvanceTo(consumed);
}
}
}
}
[DebuggerDisplay("State = {_state}")]
internal struct PipeOperationState
{
[Flags]
internal enum State : byte
{
Reading = 1,
ReadingTentative = 2,
Writing = 4
}
private State _state;
public bool IsWritingActive => (_state & State.Writing) == State.Writing;
public bool IsReadingActive => (_state & State.Reading) == State.Reading;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginRead()
{
if ((_state & State.Reading) == State.Reading)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
_state |= State.Reading;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginReadTentative()
{
if ((_state & State.Reading) == State.Reading)
{
ThrowHelper.ThrowInvalidOperationException_AlreadyReading();
}
_state |= State.ReadingTentative;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndRead()
{
if ((_state & State.Reading) != State.Reading && (_state & State.ReadingTentative) != State.ReadingTentative)
{
ThrowHelper.ThrowInvalidOperationException_NoReadToComplete();
}
_state &= ~(State.Reading | State.ReadingTentative);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void BeginWrite()
{
_state |= State.Writing;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void EndWrite()
{
_state &= ~State.Writing;
}
}
internal sealed class PipeReaderStream : Stream
{
private readonly PipeReader _pipeReader;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
internal bool LeaveOpen { get; set; }
public PipeReaderStream(PipeReader pipeReader, bool leaveOpen)
{
_pipeReader = pipeReader;
LeaveOpen = leaveOpen;
}
protected override void Dispose(bool disposing)
{
if (!LeaveOpen)
{
_pipeReader.Complete();
}
base.Dispose(disposing);
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
}
return ReadInternal(new Span<byte>(buffer, offset, count));
}
public override int ReadByte()
{
Span<byte> buffer = stackalloc byte[1];
if (ReadInternal(buffer) != 0)
{
return buffer[0];
}
return -1;
}
private int ReadInternal(Span<byte> buffer)
{
ValueTask<ReadResult> valueTask = _pipeReader.ReadAsync();
ReadResult result = (valueTask.IsCompletedSuccessfully ? valueTask.Result : valueTask.AsTask().GetAwaiter().GetResult());
return HandleReadResult(result, buffer);
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count, default(CancellationToken)), callback, state);
}
public sealed override int EndRead(IAsyncResult asyncResult)
{
return TaskToAsyncResult.End<int>(asyncResult);
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
}
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}
private async ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
return HandleReadResult(await _pipeReader.ReadAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false), buffer.Span);
}
private int HandleReadResult(ReadResult result, Span<byte> buffer)
{
if (result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
ReadOnlySequence<byte> buffer2 = result.Buffer;
long length = buffer2.Length;
SequencePosition consumed = buffer2.Start;
try
{
if (length != 0L)
{
int num = (int)Math.Min(length, buffer.Length);
ReadOnlySequence<byte> source = ((num == length) ? buffer2 : buffer2.Slice(0, num));
consumed = source.End;
source.CopyTo(buffer);
return num;
}
if (result.IsCompleted)
{
return 0;
}
}
finally
{
_pipeReader.AdvanceTo(consumed);
}
ThrowHelper.ThrowInvalidOperationException_InvalidZeroByteRead();
return 0;
}
public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
return _pipeReader.CopyToAsync(destination, cancellationToken);
}
}
public abstract class PipeScheduler
{
private static readonly ThreadPoolScheduler s_threadPoolScheduler = new ThreadPoolScheduler();
private static readonly InlineScheduler s_inlineScheduler = new InlineScheduler();
public static PipeScheduler ThreadPool => s_threadPoolScheduler;
public static PipeScheduler Inline => s_inlineScheduler;
public abstract void Schedule(Action<object?> action, object? state);
internal virtual void UnsafeSchedule(Action<object?> action, object? state)
{
Schedule(action, state);
}
}
public abstract class PipeWriter : IBufferWriter<byte>
{
private PipeWriterStream _stream;
public virtual bool CanGetUnflushedBytes => false;
public virtual long UnflushedBytes
{
get
{
throw ThrowHelper.CreateNotSupportedException_UnflushedBytes();
}
}
public abstract void Complete(Exception? exception = null);
public virtual ValueTask CompleteAsync(Exception? exception = null)
{
try
{
Complete(exception);
return default(ValueTask);
}
catch (Exception exception2)
{
return new ValueTask(Task.FromException(exception2));
}
}
public abstract void CancelPendingFlush();
[Obsolete("OnReaderCompleted has been deprecated and may not be invoked on all implementations of PipeWriter.")]
public virtual void OnReaderCompleted(Action<Exception?, object?> callback, object? state)
{
}
public abstract ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default(CancellationToken));
public abstract void Advance(int bytes);
public abstract Memory<byte> GetMemory(int sizeHint = 0);
public abstract Span<byte> GetSpan(int sizeHint = 0);
public virtual Stream AsStream(bool leaveOpen = false)
{
if (_stream == null)
{
_stream = new PipeWriterStream(this, leaveOpen);
}
else if (leaveOpen)
{
_stream.LeaveOpen = leaveOpen;
}
return _stream;
}
public static PipeWriter Create(Stream stream, StreamPipeWriterOptions? writerOptions = null)
{
return new StreamPipeWriter(stream, writerOptions ?? StreamPipeWriterOptions.s_default);
}
public virtual ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken = default(CancellationToken))
{
this.Write(source.Span);
return FlushAsync(cancellationToken);
}
protected internal virtual async Task CopyFromAsync(Stream source, CancellationToken cancellationToken = default(CancellationToken))
{
FlushResult flushResult;
do
{
Memory<byte> memory = GetMemory();
int num = await StreamExtensions.ReadAsync(source, memory, cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
if (num != 0)
{
Advance(num);
flushResult = await FlushAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
if (flushResult.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
continue;
}
break;
}
while (!flushResult.IsCompleted);
}
}
internal sealed class PipeWriterStream : Stream
{
private readonly PipeWriter _pipeWriter;
internal bool LeaveOpen { get; set; }
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public PipeWriterStream(PipeWriter pipeWriter, bool leaveOpen)
{
_pipeWriter = pipeWriter;
LeaveOpen = leaveOpen;
}
protected override void Dispose(bool disposing)
{
if (!LeaveOpen)
{
_pipeWriter.Complete();
}
}
public override void Flush()
{
FlushAsync().GetAwaiter().GetResult();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
return TaskToAsyncResult.Begin(WriteAsync(buffer, offset, count, default(CancellationToken)), callback, state);
}
public sealed override void EndWrite(IAsyncResult asyncResult)
{
TaskToAsyncResult.End(asyncResult);
}
public override void Write(byte[] buffer, int offset, int count)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (buffer == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.buffer);
}
return GetFlushResultAsTask(_pipeWriter.WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken));
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
return GetFlushResultAsTask(_pipeWriter.FlushAsync(cancellationToken));
}
private static Task GetFlushResultAsTask(ValueTask<FlushResult> valueTask)
{
if (valueTask.IsCompletedSuccessfully)
{
if (valueTask.Result.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
return Task.CompletedTask;
}
return AwaitTask(valueTask);
static async Task AwaitTask(ValueTask<FlushResult> valueTask)
{
if ((await valueTask.ConfigureAwait(continueOnCapturedContext: false)).IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
}
}
}
public readonly struct ReadResult
{
internal readonly ReadOnlySequence<byte> _resultBuffer;
internal readonly ResultFlags _resultFlags;
public ReadOnlySequence<byte> Buffer => _resultBuffer;
public bool IsCanceled => (_resultFlags & ResultFlags.Canceled) != 0;
public bool IsCompleted => (_resultFlags & ResultFlags.Completed) != 0;
public ReadResult(ReadOnlySequence<byte> buffer, bool isCanceled, bool isCompleted)
{
_resultBuffer = buffer;
_resultFlags = ResultFlags.None;
if (isCompleted)
{
_resultFlags |= ResultFlags.Completed;
}
if (isCanceled)
{
_resultFlags |= ResultFlags.Canceled;
}
}
}
[Flags]
internal enum ResultFlags : byte
{
None = 0,
Canceled = 1,
Completed = 2
}
internal sealed class SequencePipeReader : PipeReader
{
private ReadOnlySequence<byte> _sequence;
private bool _isReaderCompleted;
private int _cancelNext;
public SequencePipeReader(ReadOnlySequence<byte> sequence)
{
_sequence = sequence;
}
public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
ThrowIfCompleted();
if (consumed.Equals(_sequence.End))
{
_sequence = ReadOnlySequence<byte>.Empty;
}
else
{
_sequence = _sequence.Slice(consumed);
}
}
public override void CancelPendingRead()
{
Interlocked.Exchange(ref _cancelNext, 1);
}
public override void Complete(Exception? exception = null)
{
if (!_isReaderCompleted)
{
_isReaderCompleted = true;
_sequence = ReadOnlySequence<byte>.Empty;
}
}
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
if (TryRead(out var result))
{
return new ValueTask<ReadResult>(result);
}
result = new ReadResult(ReadOnlySequence<byte>.Empty, isCanceled: false, isCompleted: true);
return new ValueTask<ReadResult>(result);
}
public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
bool flag = Interlocked.Exchange(ref _cancelNext, 0) == 1;
if (flag || _sequence.Length > 0)
{
result = new ReadResult(_sequence, flag, isCompleted: true);
return true;
}
result = default(ReadResult);
return false;
}
private void ThrowIfCompleted()
{
if (_isReaderCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
}
}
public static class StreamPipeExtensions
{
public static Task CopyToAsync(this Stream source, PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken))
{
if (source == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.source);
}
if (destination == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.destination);
}
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(cancellationToken);
}
return destination.CopyFromAsync(source, cancellationToken);
}
}
internal sealed class StreamPipeReader : PipeReader
{
internal const int InitialSegmentPoolSize = 4;
internal const int MaxSegmentPoolSize = 256;
private CancellationTokenSource _internalTokenSource;
private bool _isReaderCompleted;
private BufferSegment _readHead;
private int _readIndex;
private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private readonly object _lock = new object();
private BufferSegmentStack _bufferSegmentPool;
private readonly StreamPipeReaderOptions _options;
private bool LeaveOpen => _options.LeaveOpen;
private bool UseZeroByteReads => _options.UseZeroByteReads;
private int BufferSize => _options.BufferSize;
private int MaxBufferSize => _options.MaxBufferSize;
private int MinimumReadThreshold => _options.MinimumReadSize;
private MemoryPool<byte> Pool => _options.Pool;
public Stream InnerStream { get; }
private CancellationTokenSource InternalTokenSource
{
get
{
lock (_lock)
{
return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource());
}
}
}
public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
{
if (readingStream == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.readingStream);
}
if (options == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
}
InnerStream = readingStream;
_options = options;
_bufferSegmentPool = new BufferSegmentStack(4);
}
public override void AdvanceTo(SequencePosition consumed)
{
AdvanceTo(consumed, consumed);
}
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
ThrowIfCompleted();
AdvanceTo((BufferSegment)consumed.GetObject(), consumed.GetInteger(), (BufferSegment)examined.GetObject(), examined.GetInteger());
}
private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
if (consumedSegment != null && examinedSegment != null)
{
if (_readHead == null)
{
ThrowHelper.ThrowInvalidOperationException_AdvanceToInvalidCursor();
}
BufferSegment bufferSegment = _readHead;
BufferSegment bufferSegment2 = consumedSegment;
long length = BufferSegment.GetLength(bufferSegment, _readIndex, consumedSegment, consumedIndex);
_bufferedBytes -= length;
_examinedEverything = false;
if (examinedSegment == _readTail)
{
_examinedEverything = examinedIndex == _readTail.End;
}
if (_bufferedBytes == 0L)
{
bufferSegment2 = null;
_readHead = null;
_readTail = null;
_readIndex = 0;
}
else if (consumedIndex == bufferSegment2.Length)
{
BufferSegment bufferSegment3 = (_readHead = bufferSegment2.NextSegment);
_readIndex = 0;
bufferSegment2 = bufferSegment3;
}
else
{
_readHead = consumedSegment;
_readIndex = consumedIndex;
}
while (bufferSegment != bufferSegment2)
{
BufferSegment? nextSegment = bufferSegment.NextSegment;
ReturnSegmentUnsynchronized(bufferSegment);
bufferSegment = nextSegment;
}
}
}
public override void CancelPendingRead()
{
InternalTokenSource.Cancel();
}
public override void Complete(Exception? exception = null)
{
if (CompleteAndGetNeedsDispose())
{
InnerStream.Dispose();
}
}
private bool CompleteAndGetNeedsDispose()
{
if (_isReaderCompleted)
{
return false;
}
_isReaderCompleted = true;
BufferSegment bufferSegment = _readHead;
while (bufferSegment != null)
{
BufferSegment bufferSegment2 = bufferSegment;
bufferSegment = bufferSegment.NextSegment;
bufferSegment2.Reset();
}
return !LeaveOpen;
}
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return ReadInternalAsync(null, cancellationToken);
}
protected override ValueTask<ReadResult> ReadAtLeastAsyncCore(int minimumSize, CancellationToken cancellationToken)
{
return ReadInternalAsync(minimumSize, cancellationToken);
}
private ValueTask<ReadResult> ReadInternalAsync(int? minimumSize, CancellationToken cancellationToken)
{
ThrowIfCompleted();
if (cancellationToken.IsCancellationRequested)
{
return new ValueTask<ReadResult>(Task.FromCanceled<ReadResult>(cancellationToken));
}
CancellationTokenSource internalTokenSource = InternalTokenSource;
if (TryReadInternal(internalTokenSource, out var result) && (!minimumSize.HasValue || result.Buffer.Length >= minimumSize || result.IsCompleted || result.IsCanceled))
{
return new ValueTask<ReadResult>(result);
}
return Core(this, minimumSize, internalTokenSource, cancellationToken);
static async ValueTask<ReadResult> Core(StreamPipeReader reader, int? minimumSize, CancellationTokenSource tokenSource, CancellationToken cancellationToken)
{
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state)
{
((StreamPipeReader)state).Cancel();
}, reader);
}
using (cancellationTokenRegistration)
{
bool isCanceled = false;
bool isCompleted = false;
try
{
if (reader.UseZeroByteReads && reader._bufferedBytes == 0L)
{
await StreamExtensions.ReadAsync(reader.InnerStream, Memory<byte>.Empty, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
}
do
{
reader.AllocateReadTail(minimumSize);
Memory<byte> buffer = reader._readTail.AvailableMemory.Slice(reader._readTail.End);
int num = await StreamExtensions.ReadAsync(reader.InnerStream, buffer, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
reader._readTail.End += num;
reader._bufferedBytes += num;
if (num == 0)
{
isCompleted = true;
break;
}
}
while (minimumSize.HasValue && reader._bufferedBytes < minimumSize);
}
catch (OperationCanceledException ex)
{
reader.ClearCancellationToken();
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
}
if (!tokenSource.IsCancellationRequested)
{
throw;
}
isCanceled = true;
}
return new ReadResult(reader.GetCurrentReadOnlySequence(), isCanceled, isCompleted);
}
}
}
public override async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfCompleted();
CancellationTokenSource tokenSource = InternalTokenSource;
if (tokenSource.IsCancellationRequested)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state)
{
((StreamPipeReader)state).Cancel();
}, this);
}
using (cancellationTokenRegistration)
{
_ = 1;
try
{
BufferSegment segment = _readHead;
int start = _readIndex;
try
{
while (segment != null)
{
FlushResult flushResult = await destination.WriteAsync(segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
if (flushResult.IsCanceled)
{
ThrowHelper.ThrowOperationCanceledException_FlushCanceled();
}
segment = segment.NextSegment;
start = 0;
if (flushResult.IsCompleted)
{
return;
}
}
}
finally
{
if (segment != null)
{
AdvanceTo(segment, segment.End, segment, segment.End);
}
}
await InnerStream.CopyToAsync(destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
}
catch (OperationCanceledException)
{
ClearCancellationToken();
throw;
}
}
}
public override async Task CopyToAsync(Stream destination, CancellationToken cancellationToken = default(CancellationToken))
{
ThrowIfCompleted();
CancellationTokenSource tokenSource = InternalTokenSource;
if (tokenSource.IsCancellationRequested)
{
ThrowHelper.ThrowOperationCanceledException_ReadCanceled();
}
CancellationTokenRegistration cancellationTokenRegistration = default(CancellationTokenRegistration);
if (cancellationToken.CanBeCanceled)
{
cancellationTokenRegistration = CancellationTokenExtensions.UnsafeRegister(cancellationToken, delegate(object state)
{
((StreamPipeReader)state).Cancel();
}, this);
}
using (cancellationTokenRegistration)
{
_ = 1;
try
{
BufferSegment segment = _readHead;
int start = _readIndex;
try
{
while (segment != null)
{
await StreamExtensions.WriteAsync(destination, segment.Memory.Slice(start), tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
segment = segment.NextSegment;
start = 0;
}
}
finally
{
if (segment != null)
{
AdvanceTo(segment, segment.End, segment, segment.End);
}
}
await StreamExtensions.CopyToAsync(InnerStream, destination, tokenSource.Token).ConfigureAwait(continueOnCapturedContext: false);
}
catch (OperationCanceledException)
{
ClearCancellationToken();
throw;
}
}
}
private void ClearCancellationToken()
{
lock (_lock)
{
_internalTokenSource = null;
}
}
private void ThrowIfCompleted()
{
if (_isReaderCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoReadingAllowed();
}
}
public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();
return TryReadInternal(InternalTokenSource, out result);
}
private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
{
bool isCancellationRequested = source.IsCancellationRequested;
if (isCancellationRequested || (_bufferedBytes > 0 && !_examinedEverything))
{
if (isCancellationRequested)
{
ClearCancellationToken();
}
ReadOnlySequence<byte> currentReadOnlySequence = GetCurrentReadOnlySequence();
result = new ReadResult(currentReadOnlySequence, isCancellationRequested, isCompleted: false);
return true;
}
result = default(ReadResult);
return false;
}
private ReadOnlySequence<byte> GetCurrentReadOnlySequence()
{
if (_readHead != null)
{
return new ReadOnlySequence<byte>(_readHead, _readIndex, _readTail, _readTail.End);
}
return default(ReadOnlySequence<byte>);
}
private void AllocateReadTail(int? minimumSize = null)
{
if (_readHead == null)
{
_readHead = AllocateSegment(minimumSize);
_readTail = _readHead;
}
else if (_readTail.WritableBytes < MinimumReadThreshold)
{
BufferSegment bufferSegment = AllocateSegment(minimumSize);
_readTail.SetNext(bufferSegment);
_readTail = bufferSegment;
}
}
private BufferSegment AllocateSegment(int? minimumSize = null)
{
BufferSegment bufferSegment = CreateSegmentUnsynchronized();
int num = minimumSize ?? BufferSize;
int num2 = ((!_options.IsDefaultSharedMemoryPool) ? _options.Pool.MaxBufferSize : (-1));
if (num <= num2)
{
int segmentSize = GetSegmentSize(num, num2);
bufferSegment.SetOwnedMemory(_options.Pool.Rent(segmentSize));
}
else
{
int segmentSize2 = GetSegmentSize(num, MaxBufferSize);
bufferSegment.SetOwnedMemory(ArrayPool<byte>.Shared.Rent(segmentSize2));
}
return bufferSegment;
}
private int GetSegmentSize(int sizeHint, int maxBufferSize)
{
sizeHint = Math.Max(BufferSize, sizeHint);
return Math.Min(maxBufferSize, sizeHint);
}
private BufferSegment CreateSegmentUnsynchronized()
{
if (_bufferSegmentPool.TryPop(out BufferSegment result))
{
return result;
}
return new BufferSegment();
}
private void ReturnSegmentUnsynchronized(BufferSegment segment)
{
segment.Reset();
if (_bufferSegmentPool.Count < 256)
{
_bufferSegmentPool.Push(segment);
}
}
private void Cancel()
{
InternalTokenSource.Cancel();
}
}
public class StreamPipeReaderOptions
{
private const int DefaultBufferSize = 4096;
internal const int DefaultMaxBufferSize = 2097152;
private const int DefaultMinimumReadSize = 1024;
internal static readonly StreamPipeReaderOptions s_default = new StreamPipeReaderOptions();
public int BufferSize { get; }
internal int MaxBufferSize { get; } = 2097152;
public int MinimumReadSize { get; }
public MemoryPool<byte> Pool { get; }
public bool LeaveOpen { get; }
public bool UseZeroByteReads { get; }
internal bool IsDefaultSharedMemoryPool { get; }
public StreamPipeReaderOptions(MemoryPool<byte>? pool, int bufferSize, int minimumReadSize, bool leaveOpen)
: this(pool, bufferSize, minimumReadSize, leaveOpen, useZeroByteReads: false)
{
}
public StreamPipeReaderOptions(MemoryPool<byte>? pool = null, int bufferSize = -1, int minimumReadSize = -1, bool leaveOpen = false, bool useZeroByteReads = false)
{
Pool = pool ?? MemoryPool<byte>.Shared;
IsDefaultSharedMemoryPool = Pool == MemoryPool<byte>.Shared;
int num;
if (bufferSize != -1)
{
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException("bufferSize");
}
num = bufferSize;
}
else
{
num = 4096;
}
BufferSize = num;
int num2;
if (minimumReadSize != -1)
{
if (minimumReadSize <= 0)
{
throw new ArgumentOutOfRangeException("minimumReadSize");
}
num2 = minimumReadSize;
}
else
{
num2 = 1024;
}
MinimumReadSize = num2;
LeaveOpen = leaveOpen;
UseZeroByteReads = useZeroByteReads;
}
}
internal sealed class StreamPipeWriter : PipeWriter
{
internal const int InitialSegmentPoolSize = 4;
internal const int MaxSegmentPoolSize = 256;
private readonly int _minimumBufferSize;
private BufferSegment _head;
private BufferSegment _tail;
private Memory<byte> _tailMemory;
private int _tailBytesBuffered;
private long _bytesBuffered;
private readonly MemoryPool<byte> _pool;
private readonly int _maxPooledBufferSize;
private CancellationTokenSource _internalTokenSource;
private bool _isCompleted;
private readonly object _lockObject = new object();
private BufferSegmentStack _bufferSegmentPool;
private readonly bool _leaveOpen;
private CancellationTokenSource InternalTokenSource
{
get
{
lock (_lockObject)
{
return _internalTokenSource ?? (_internalTokenSource = new CancellationTokenSource());
}
}
}
public Stream InnerStream { get; }
public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _bytesBuffered;
public StreamPipeWriter(Stream writingStream, StreamPipeWriterOptions options)
{
if (writingStream == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.writingStream);
}
if (options == null)
{
ThrowHelper.ThrowArgumentNullException(ExceptionArgument.options);
}
InnerStream = writingStream;
_minimumBufferSize = options.MinimumBufferSize;
_pool = ((options.Pool == MemoryPool<byte>.Shared) ? null : options.Pool);
_maxPooledBufferSize = _pool?.MaxBufferSize ?? (-1);
_bufferSegmentPool = new BufferSegmentStack(4);
_leaveOpen = options.LeaveOpen;
}
public override void Advance(int bytes)
{
if ((uint)bytes > (uint)_tailMemory.Length)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.bytes);
}
_tailBytesBuffered += bytes;
_bytesBuffered += bytes;
_tailMemory = _tailMemory.Slice(bytes);
}
public override Memory<byte> GetMemory(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
}
AllocateMemory(sizeHint);
return _tailMemory;
}
public override Span<byte> GetSpan(int sizeHint = 0)
{
if (_isCompleted)
{
ThrowHelper.ThrowInvalidOperationException_NoWritingAllowed();
}
if (sizeHint < 0)
{
ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.sizeHint);
}
Allocat