Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion StackExchange.Redis.sln.DotSettings
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@
<s:Boolean x:Key="/Default/UserDictionary/Words/=xreadgroup/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=xrevrange/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=zcard/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=zscan/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:Boolean x:Key="/Default/UserDictionary/Words/=zscan/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=zset/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
15 changes: 15 additions & 0 deletions docs/exp/SER004.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# RESPite

RESPite is an experimental library that provides high-performance low-level RESP (Redis, etc) parsing and serialization.
It is used as the IO core for StackExchange.Redis v3+. You should not (yet) use it directly unless you have a very
good reason to do so.

```xml
<NoWarn>$(NoWarn);SER004</NoWarn>
```

or more granularly / locally in C#:

``` c#
#pragma warning disable SER004
```
21 changes: 21 additions & 0 deletions docs/exp/SER005.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Unit Testing

Unit testing is great! Yay, do more of that!

This type is provided for external unit testing, in particular by people using modules or server features
not directly implemented by SE.Redis - for example to verify messsage parsing or formatting without
talking to a RESP server.

These types are considered slightly more... *mercurial*. We encourage you to use them, but *occasionally*
(not just for fun) you might need to update your test code if we tweak something. This should not impact
"real" library usage.

```xml
<NoWarn>$(NoWarn);SER005</NoWarn>
```

or more granularly / locally in C#:

``` c#
#pragma warning disable SER005
```
1 change: 1 addition & 0 deletions src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#nullable enable
1 change: 1 addition & 0 deletions src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#nullable enable
10 changes: 0 additions & 10 deletions src/RESPite/Shared/AsciiHash.Public.cs

This file was deleted.

7 changes: 2 additions & 5 deletions src/RESPite/Shared/AsciiHash.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
Expand All @@ -7,8 +6,6 @@

namespace RESPite;

#pragma warning disable SA1205 // deliberately omit accessibility - see AsciiHash.Public.cs

/// <summary>
/// This type is intended to provide fast hashing functions for small ASCII strings, for example well-known
/// RESP literals that are usually identifiable by their length and initial bytes; it is not intended
Expand All @@ -22,7 +19,7 @@ namespace RESPite;
Inherited = false)]
[Conditional("DEBUG")] // evaporate in release
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
sealed partial class AsciiHashAttribute(string token = "") : Attribute
public sealed partial class AsciiHashAttribute(string token = "") : Attribute
{
/// <summary>
/// The token expected when parsing data, if different from the implied value. The implied
Expand All @@ -38,7 +35,7 @@ sealed partial class AsciiHashAttribute(string token = "") : Attribute

// note: instance members are in AsciiHash.Instance.cs.
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
readonly partial struct AsciiHash
public readonly partial struct AsciiHash
{
/// <summary>
/// In-place ASCII upper-case conversion.
Expand Down
17 changes: 7 additions & 10 deletions src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using RESPite.Messages;

namespace StackExchange.Redis;

Expand All @@ -11,18 +12,14 @@ public readonly struct LatencyHistoryEntry

private sealed class Processor : ArrayResultProcessor<LatencyHistoryEntry>
{
protected override bool TryParse(in RawResult raw, out LatencyHistoryEntry parsed)
protected override bool TryParse(ref RespReader reader, out LatencyHistoryEntry parsed)
{
if (raw.Resp2TypeArray == ResultType.Array)
if (reader.IsAggregate
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration))
{
var items = raw.GetItems();
if (items.Length >= 2
&& items[0].TryGetInt64(out var timestamp)
&& items[1].TryGetInt64(out var duration))
{
parsed = new LatencyHistoryEntry(timestamp, duration);
return true;
}
parsed = new LatencyHistoryEntry(timestamp, duration);
return true;
}
parsed = default;
return false;
Expand Down
17 changes: 9 additions & 8 deletions src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using RESPite.Messages;

namespace StackExchange.Redis;

Expand All @@ -11,17 +12,17 @@ public readonly struct LatencyLatestEntry

private sealed class Processor : ArrayResultProcessor<LatencyLatestEntry>
{
protected override bool TryParse(in RawResult raw, out LatencyLatestEntry parsed)
protected override bool TryParse(ref RespReader reader, out LatencyLatestEntry parsed)
{
if (raw.Resp2TypeArray == ResultType.Array)
if (reader.IsAggregate && reader.TryMoveNext() && reader.IsScalar)
{
var items = raw.GetItems();
if (items.Length >= 4
&& items[1].TryGetInt64(out var timestamp)
&& items[2].TryGetInt64(out var duration)
&& items[3].TryGetInt64(out var maxDuration))
var eventName = reader.ReadString()!;

if (reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration)
&& reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var maxDuration))
{
parsed = new LatencyLatestEntry(items[0].GetString()!, timestamp, duration, maxDuration);
parsed = new LatencyLatestEntry(eventName, timestamp, duration, maxDuration);
return true;
}
}
Expand Down
132 changes: 132 additions & 0 deletions src/StackExchange.Redis/BufferedStreamWriter.Async.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace StackExchange.Redis;

internal sealed class BufferedAsyncStreamWriter : CycleBufferStreamWriter, IValueTaskSource
{
private ManualResetValueTaskSourceCore<bool> _readerTask;

public BufferedAsyncStreamWriter(Stream target, CancellationToken cancellationToken = default)
: base(target, cancellationToken)
{
WriteComplete = Task.Run(CopyOutAsync, cancellationToken);
_readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying
}

public override Task WriteComplete { get; }

private async Task CopyOutAsync()
{
try
{
while (true)
{
ValueTask pending = AwaitWake();
if (!pending.IsCompleted)
{
lock (this)
{
// double-checked marking inactive
if (!pending.IsCompleted) OnWriterInactive(); // update state flags
}
}
// await activation and check status;
await pending.ConfigureAwait(false);

StateFlags stateFlags;
while (true)
{
ReadOnlyMemory<byte> memory;
lock (this)
{
stateFlags = State;
var minBytes = (stateFlags & StateFlags.Flush) == 0 ? -1 : 1;
if (!GetFirstChunkInsideLock(minBytes, out memory))
{
// out of data; remove flush flag and wait for more work
stateFlags &= ~StateFlags.Flush;
break;
}
}

if (IsFaulted) ThrowCompleteOrFaulted(); // this is cheap to check ongoing
if (!memory.IsEmpty)
{
OnWritten(memory.Length);
OnDebugBufferLog(memory);

await Target.WriteAsync(memory, CancellationToken).ConfigureAwait(false);
}

lock (this)
{
DiscardCommitted(memory.Length);
}
}
await Target.FlushAsync(CancellationToken).ConfigureAwait(false);

if ((stateFlags & StateFlags.Closed) != 0) break;
}

// recycle on clean exit (only), since we know the buffers aren't being used
lock (this)
{
ReleaseBuffer();
}
}
catch (Exception ex)
{
Complete(ex);
}
// note we do *not* close the stream here - we have to settle for flushing; Close is explicit
}

private ValueTask AwaitWake()
{
lock (this) // guard all transitions
{
return new(this, _readerTask.Version);
}
}

void IValueTaskSource.GetResult(short token)
{
lock (this) // guard all transitions
{
_readerTask.GetResult(token); // may throw, note
_readerTask.Reset();
}
}

ValueTaskSourceStatus IValueTaskSource.GetStatus(short token)
{
lock (this) // guard all transitions
{
return _readerTask.GetStatus(token);
}
}

void IValueTaskSource.OnCompleted(
Action<object?> continuation,
object? state,
short token,
ValueTaskSourceOnCompletedFlags flags)
{
lock (this) // guard all transitions
{
_readerTask.OnCompleted(continuation, state, token, flags);
}
}

protected override void OnWakeReader()
{
lock (this) // guard all transitions
{
_readerTask.SetResult(true);
}
}
}
53 changes: 53 additions & 0 deletions src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

namespace StackExchange.Redis;

internal sealed class PipeStreamWriter : BufferedStreamWriter
{
private readonly PipeWriter _writer;

public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default)
: base(target, cancellationToken)
{
var pipe = new Pipe();
WriteComplete = pipe.Reader.CopyToAsync(Target, cancellationToken);
_writer = pipe.Writer;
}

public override Task WriteComplete { get; }

private long _nonFlushed;
public override void Advance(int count)
{
_nonFlushed += count;
_writer.Advance(count);
}

public override void Flush()
{
var tmp = _nonFlushed;
_nonFlushed = 0;
OnWritten(tmp);
var pending = _writer.FlushAsync();
if (pending.IsCompleted)
{
pending.GetAwaiter().GetResult();
}
else
{
// this is bad, but: this type is a temporary kludge while I fix a bug;
// this only happens during back-pressure events, which should be rare
pending.AsTask().Wait(CancellationToken);
}
}

public override Memory<byte> GetMemory(int sizeHint = 0) => _writer.GetMemory(sizeHint);

public override Span<byte> GetSpan(int sizeHint = 0) => _writer.GetSpan(sizeHint);

public override void Complete(Exception? exception = null) => _writer.Complete(exception);
}
Loading
Loading