Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<PackageVersion Include="StyleCop.Analyzers" Version="1.2.0-beta.556" />
<PackageVersion Include="System.Collections.Immutable" Version="9.0.0" />
<PackageVersion Include="System.Reflection.Metadata" Version="9.0.0" />
<PackageVersion Include="System.Private.Uri" Version="4.3.2" />

<!-- For binding redirect testing, main package gets this transitively -->
<PackageVersion Include="System.IO.Pipelines" Version="9.0.0" />
Expand Down
24 changes: 21 additions & 3 deletions src/StackExchange.Redis/ChannelMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Buffers.Text;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -35,7 +36,7 @@ public sealed class ChannelMessageQueue : IAsyncEnumerable<ChannelMessage>
/// </summary>
public Task Completion => _queue.Reader.Completion;

internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber parent)
internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber? parent)
{
Channel = redisChannel;
_parent = parent;
Expand All @@ -49,8 +50,22 @@ internal ChannelMessageQueue(in RedisChannel redisChannel, RedisSubscriber paren

private void Write(in RedisChannel channel, in RedisValue value)
{
var writer = _queue.Writer;
writer.TryWrite(new ChannelMessage(this, channel, value));
try
{
_queue.Writer.TryWrite(new ChannelMessage(this, channel, value));
}
catch (Exception ex)
{
Debug.WriteLine("pub/sub ChannelWrite.TryWrite failed: " + ex.Message);
}
}

internal void SynchronizedWrite(in RedisChannel channel, in RedisValue value)
{
lock (this)
{
Write(channel, value);
}
}

/// <summary>
Expand Down Expand Up @@ -327,4 +342,7 @@ public async IAsyncEnumerator<ChannelMessage> GetAsyncEnumerator(CancellationTok
}
}
#endif

internal ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default)
=> _queue.Reader.WaitToReadAsync(cancellationToken);
}
13 changes: 10 additions & 3 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public static int ParseInt32(string key, string value, int minValue = int.MinVal
return tmp;
}

public static float ParseSingle(string key, string value)
{
if (!Format.TryParseDouble(value, out double tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a numeric value; the value '{value}' is not recognised.");
return (float)tmp;
}

internal static bool ParseBoolean(string key, string value)
{
if (!Format.TryParseBoolean(value, out bool tmp)) throw new ArgumentOutOfRangeException(key, $"Keyword '{key}' requires a boolean value; the value '{value}' is not recognised.");
Expand Down Expand Up @@ -944,9 +950,9 @@ public string ToString(bool includePassword)
};
}

private static void Append(StringBuilder sb, object value)
private static void Append(StringBuilder sb, object? value)
{
if (value == null) return;
if (value is null) return;
string s = Format.ToString(value);
if (!string.IsNullOrWhiteSpace(s))
{
Expand All @@ -957,7 +963,8 @@ private static void Append(StringBuilder sb, object value)

private static void Append(StringBuilder sb, string prefix, object? value)
{
string? s = value?.ToString();
if (value is null) return;
string? s = value.ToString();
if (!string.IsNullOrWhiteSpace(s))
{
if (sb.Length != 0) sb.Append(',');
Expand Down
30 changes: 28 additions & 2 deletions src/StackExchange.Redis/ConnectionMultiplexer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ public void UnRoot(int token)
}
}

private void OnHeartbeat()
internal void OnHeartbeat()
{
try
{
Expand Down Expand Up @@ -1131,7 +1131,7 @@ public IDatabase GetDatabase(int db = -1, object? asyncState = null)
}

// DB zero is stored separately, since 0-only is a massively common use-case
private const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
internal const int MaxCachedDatabaseInstance = 16; // 17 items - [0,16]
// Side note: "databases 16" is the default in redis.conf; happy to store one extra to get nice alignment etc
private IDatabase? dbCacheZero;
private IDatabase[]? dbCacheLow;
Expand Down Expand Up @@ -1284,6 +1284,8 @@ public long OperationCount
}
}

internal uint LatencyTicks { get; private set; } = uint.MaxValue;

// note that the RedisChannel->byte[] converter is always direct, so this is not an alloc
// (we deal with channels far less frequently, so pay the encoding cost up-front)
internal byte[] ChannelPrefix => ((byte[]?)RawConfig.ChannelPrefix) ?? [];
Expand Down Expand Up @@ -2363,5 +2365,29 @@ private Task[] QuitAllServers()

long? IInternalConnectionMultiplexer.GetConnectionId(EndPoint endpoint, ConnectionType type)
=> GetServerEndPoint(endpoint)?.GetBridge(type)?.ConnectionId;

internal uint UpdateLatency()
{
var snapshot = GetServerSnapshot();
uint max = uint.MaxValue;
foreach (var server in snapshot)
{
if (server.IsConnected)
{
var latency = server.LatencyTicks;
if (max is uint.MaxValue || latency > max)
{
max = latency;
}
}
}

if (max != uint.MaxValue)
{
LatencyTicks = max;
}

return LatencyTicks;
}
}
}
106 changes: 106 additions & 0 deletions src/StackExchange.Redis/Interfaces/IConnectionGroup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;

// ReSharper disable once CheckNamespace
namespace StackExchange.Redis;

/// <summary>
/// A group of connections to redis servers, that manages connections to multiple
/// servers, routing traffic based on the availability of the servers and their
/// relative <see cref="ConnectionGroupMember.Weight"/>.
/// </summary>
public interface IConnectionGroup : IConnectionMultiplexer
{
/// <summary>
/// A change occured to one of the connection groups.
/// </summary>
event EventHandler<GroupConnectionChangedEventArgs>? ConnectionChanged;

/// <summary>
/// Adds a new member to the group.
/// </summary>
Task AddAsync(ConnectionGroupMember group, TextWriter? log = null);

/// <summary>
/// Removes a member from the group.
/// </summary>
bool Remove(ConnectionGroupMember group);

/// <summary>
/// Get the members of the group.
/// </summary>
ReadOnlySpan<ConnectionGroupMember> GetMembers();
}

/// <summary>
/// Represents a change to a connection group.
/// </summary>
public class GroupConnectionChangedEventArgs(GroupConnectionChangedEventArgs.ChangeType type, ConnectionGroupMember group, ConnectionGroupMember? previousGroup = null) : EventArgs, ICompletable
{
/// <summary>
/// The group relating to the change. For <see cref="ChangeType.ActiveChanged"/>, this is the new group.
/// </summary>
public ConnectionGroupMember Group => group;

/// <summary>
/// The previous group relating to the change, if applicable.
/// </summary>
public ConnectionGroupMember? PreviousGroup => previousGroup;

/// <summary>
/// The type of change that occurred.
/// </summary>
public ChangeType Type => type;

private EventHandler<GroupConnectionChangedEventArgs>? _handler;
private object? _sender;

/// <summary>
/// The type of change that occurred.
/// </summary>
public enum ChangeType
{
/// <summary>
/// Unused.
/// </summary>
Unknown = 0,

/// <summary>
/// A new connection group was added.
/// </summary>
Added = 1,

/// <summary>
/// A connection group was removed.
/// </summary>
Removed = 2,

/// <summary>
/// A connection group became disconnected.
/// </summary>
Disconnected = 3,

/// <summary>
/// A connection group became reconnected.
/// </summary>
Reconnected = 4,

/// <summary>
/// The active connection group changed, changing how traffic is routed.
/// </summary>
ActiveChanged = 5,
}

internal void CompleteAsWorker(EventHandler<GroupConnectionChangedEventArgs> handler, object sender)
{
_handler = handler;
_sender = sender;
ConnectionMultiplexer.CompleteAsWorker(this);
}

void ICompletable.AppendStormLog(StringBuilder sb) { }

bool ICompletable.TryComplete(bool isAsync) => ConnectionMultiplexer.TryCompleteHandler(_handler, _sender!, this, isAsync);
}
66 changes: 66 additions & 0 deletions src/StackExchange.Redis/MultiGroupDatabase.Async.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Net;
using System.Threading.Tasks;

namespace StackExchange.Redis;

internal sealed partial class MultiGroupDatabase
{
// Async methods - Core operations
public Task<RedisValue> DebugObjectAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().DebugObjectAsync(key, flags);

public Task<EndPoint?> IdentifyEndpointAsync(RedisKey key = default, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().IdentifyEndpointAsync(key, flags);

public Task KeyMigrateAsync(RedisKey key, EndPoint toServer, int toDatabase = 0, int timeoutMilliseconds = 0, MigrateOptions migrateOptions = MigrateOptions.None, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().KeyMigrateAsync(key, toServer, toDatabase, timeoutMilliseconds, migrateOptions, flags);

public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().PingAsync(flags);

public Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().PublishAsync(channel, message, flags);

public Task<RedisResult> ExecuteAsync(string command, params object[] args)
=> GetActiveDatabase().ExecuteAsync(command, args);

public Task<RedisResult> ExecuteAsync(string command, System.Collections.Generic.ICollection<object>? args, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ExecuteAsync(command, args, flags);

public Task<RedisResult> ScriptEvaluateAsync(string script, RedisKey[]? keys = null, RedisValue[]? values = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateAsync(script, keys, values, flags);

public Task<RedisResult> ScriptEvaluateAsync(byte[] hash, RedisKey[]? keys = null, RedisValue[]? values = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateAsync(hash, keys, values, flags);

public Task<RedisResult> ScriptEvaluateAsync(LuaScript script, object? parameters = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateAsync(script, parameters, flags);

public Task<RedisResult> ScriptEvaluateAsync(LoadedLuaScript script, object? parameters = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateAsync(script, parameters, flags);

public Task<RedisResult> ScriptEvaluateReadOnlyAsync(string script, RedisKey[]? keys = null, RedisValue[]? values = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateReadOnlyAsync(script, keys, values, flags);

public Task<RedisResult> ScriptEvaluateReadOnlyAsync(byte[] hash, RedisKey[]? keys = null, RedisValue[]? values = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().ScriptEvaluateReadOnlyAsync(hash, keys, values, flags);

public Task<bool> LockExtendAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().LockExtendAsync(key, value, expiry, flags);

public Task<RedisValue> LockQueryAsync(RedisKey key, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().LockQueryAsync(key, flags);

public Task<bool> LockReleaseAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().LockReleaseAsync(key, value, flags);

public Task<bool> LockTakeAsync(RedisKey key, RedisValue value, TimeSpan expiry, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().LockTakeAsync(key, value, expiry, flags);

public Task<RedisValue[]> SortAsync(RedisKey key, long skip = 0, long take = -1, Order order = Order.Ascending, SortType sortType = SortType.Numeric, RedisValue by = default, RedisValue[]? get = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().SortAsync(key, skip, take, order, sortType, by, get, flags);

public Task<long> SortAndStoreAsync(RedisKey destination, RedisKey key, long skip = 0, long take = -1, Order order = Order.Ascending, SortType sortType = SortType.Numeric, RedisValue by = default, RedisValue[]? get = null, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().SortAndStoreAsync(destination, key, skip, take, order, sortType, by, get, flags);
}
52 changes: 52 additions & 0 deletions src/StackExchange.Redis/MultiGroupDatabase.Geo.Async.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Threading.Tasks;

namespace StackExchange.Redis;

internal sealed partial class MultiGroupDatabase
{
// Geo Async
public Task<bool> GeoAddAsync(RedisKey key, double longitude, double latitude, RedisValue member, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoAddAsync(key, longitude, latitude, member, flags);

public Task<bool> GeoAddAsync(RedisKey key, GeoEntry value, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoAddAsync(key, value, flags);

public Task<long> GeoAddAsync(RedisKey key, GeoEntry[] values, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoAddAsync(key, values, flags);

public Task<bool> GeoRemoveAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoRemoveAsync(key, member, flags);

public Task<double?> GeoDistanceAsync(RedisKey key, RedisValue member1, RedisValue member2, GeoUnit unit = GeoUnit.Meters, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoDistanceAsync(key, member1, member2, unit, flags);

public Task<string?[]> GeoHashAsync(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoHashAsync(key, members, flags);

public Task<string?> GeoHashAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoHashAsync(key, member, flags);

public Task<GeoPosition?[]> GeoPositionAsync(RedisKey key, RedisValue[] members, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoPositionAsync(key, members, flags);

public Task<GeoPosition?> GeoPositionAsync(RedisKey key, RedisValue member, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoPositionAsync(key, member, flags);

public Task<GeoRadiusResult[]> GeoRadiusAsync(RedisKey key, RedisValue member, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoRadiusAsync(key, member, radius, unit, count, order, options, flags);

public Task<GeoRadiusResult[]> GeoRadiusAsync(RedisKey key, double longitude, double latitude, double radius, GeoUnit unit = GeoUnit.Meters, int count = -1, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoRadiusAsync(key, longitude, latitude, radius, unit, count, order, options, flags);

public Task<GeoRadiusResult[]> GeoSearchAsync(RedisKey key, RedisValue member, GeoSearchShape shape, int count = -1, bool demandClosest = true, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoSearchAsync(key, member, shape, count, demandClosest, order, options, flags);

public Task<GeoRadiusResult[]> GeoSearchAsync(RedisKey key, double longitude, double latitude, GeoSearchShape shape, int count = -1, bool demandClosest = true, Order? order = null, GeoRadiusOptions options = GeoRadiusOptions.Default, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoSearchAsync(key, longitude, latitude, shape, count, demandClosest, order, options, flags);

public Task<long> GeoSearchAndStoreAsync(RedisKey sourceKey, RedisKey destinationKey, RedisValue member, GeoSearchShape shape, int count = -1, bool demandClosest = true, Order? order = null, bool storeDistances = false, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoSearchAndStoreAsync(sourceKey, destinationKey, member, shape, count, demandClosest, order, storeDistances, flags);

public Task<long> GeoSearchAndStoreAsync(RedisKey sourceKey, RedisKey destinationKey, double longitude, double latitude, GeoSearchShape shape, int count = -1, bool demandClosest = true, Order? order = null, bool storeDistances = false, CommandFlags flags = CommandFlags.None)
=> GetActiveDatabase().GeoSearchAndStoreAsync(sourceKey, destinationKey, longitude, latitude, shape, count, demandClosest, order, storeDistances, flags);
}
Loading
Loading