Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a1ce62a
handle smigrating
nkaradzhov Nov 11, 2025
568f941
first approximation to handling smigrated
nkaradzhov Nov 13, 2025
2fabfd7
deduplicate notifications based on sequence id
nkaradzhov Dec 2, 2025
5bd24c0
add slotnumber to commands
nkaradzhov Jan 29, 2026
a32a26d
add support for extracting commands from queue
nkaradzhov Dec 1, 2025
aba451b
parse notification
nkaradzhov Dec 1, 2025
93373d4
work on main algo
nkaradzhov Dec 2, 2025
fea3ce2
fix: handle string values in push message reply comparison
nkaradzhov Dec 2, 2025
d7e9ab9
parse SMIGRATED according to new format
nkaradzhov Dec 2, 2025
a9e3504
comply with the new notification structure
nkaradzhov Dec 2, 2025
8c28e8d
refine algo
nkaradzhov Dec 2, 2025
f902741
handle pubSubNode replacement
nkaradzhov Dec 3, 2025
bf6a8ba
tests: merge all `after` functions into one
nkaradzhov Dec 5, 2025
6b22db5
tests: add `testWithProxiedCluster()` function
nkaradzhov Jan 29, 2026
1041b8f
Update index.ts
nkaradzhov Jan 29, 2026
0bf16ff
tests: add ProxyController for easier proxy comms
nkaradzhov Dec 5, 2025
f10adca
fix: access private queue through _self proxy and guard client close …
PavelPashov Dec 10, 2025
0c1c488
test(cluster): add fault injector infrastructure for hitless upgrade …
PavelPashov Dec 10, 2025
b70e70e
feat(test-utils): add RE database management and test utilities
nkaradzhov Jan 29, 2026
746de41
fix: fix command queue extraction and prepend logic
PavelPashov Dec 17, 2025
5e54b7d
test: add slot migration tests and refactor proxied fault injector
PavelPashov Dec 18, 2025
23ade08
fix: wait for ALL ports while spawning proxied redis
nkaradzhov Jan 7, 2026
8bba914
fix: handle partial PubSubListeners in resubscribeAllPubSubListeners
nkaradzhov Jan 7, 2026
1abe68c
refactor: maintenance tests and enhance fault injector client
nkaradzhov Feb 11, 2026
8b5c144
refactor: improve SMIGRATED push message parsing and add comprehensiv…
nkaradzhov Jan 29, 2026
d406f85
refactor: #handleSmigrated: move source cleanup outside destinations …
nkaradzhov Jan 29, 2026
35f2e14
refactor: add error handling to #handleSmigrated with try-catch-finally
nkaradzhov Jan 29, 2026
d5fc530
refactor: replace hardcoded node ID 'asdff' with meaningful smigrated…
nkaradzhov Jan 29, 2026
dad734e
fix: merge conflict residuals
nkaradzhov Jan 30, 2026
4ea16f2
refactor: remove extra db deletion
nkaradzhov Feb 2, 2026
1e0cde4
test: iterate over all trigger requirements and improve test naming
nkaradzhov Feb 3, 2026
8e44eea
uncomment tests
nkaradzhov Feb 3, 2026
266b402
test: refactor test naming to use single baseTestName variable with i…
nkaradzhov Feb 3, 2026
cbbb21f
remove debug logs
nkaradzhov Feb 4, 2026
85fee03
fix: prevent PubSub subscription loss during cluster maintenance
nkaradzhov Feb 4, 2026
a1ae692
Fix PubSub test hangs by awaiting publish batches
nkaradzhov Feb 4, 2026
2d6168d
Fix slot migration hangs during SMIGRATED handling
nkaradzhov Feb 4, 2026
9d56da1
improve FI debug logs
nkaradzhov Feb 6, 2026
a8099d5
implement unrelaxation
nkaradzhov Feb 6, 2026
f24e98a
chore: delete temp arch files
nkaradzhov Feb 10, 2026
429382f
fix: address PR comments
nkaradzhov Feb 11, 2026
6a1f243
fix: route commands to correct destinations during SMIGRATED handling
nkaradzhov Feb 11, 2026
5fa536a
fix: ensure slotNumber is passed to commands when options is undefined
nkaradzhov Feb 12, 2026
636a171
fix: schedule writes after moving slotless commands to destination node
nkaradzhov Feb 12, 2026
9b77b46
fix: properly emit error event
nkaradzhov Feb 12, 2026
6cfc2f5
fix: make cache check resilient to options object creation
nkaradzhov Feb 12, 2026
6cd61c2
fix: resolve flaky tests
nkaradzhov Feb 12, 2026
bac70ef
feat: enable test filtering
nkaradzhov Feb 13, 2026
32179b4
docs: add usage comment to smart-client-handoffs-oss.e2e.ts
nkaradzhov Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dump.rdb
documentation/
tsconfig.tsbuildinfo
junit-results/
*.log
34 changes: 17 additions & 17 deletions packages/client/lib/RESP/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class Decoder {
this.#next = undefined;
return this.#decodeTypeValue(type, chunk);
}

#decodeTypeValue(type, chunk) {
switch (type) {
case RESP_TYPES.NULL:
Expand Down Expand Up @@ -128,7 +128,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.DOUBLE:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -137,7 +137,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.SIMPLE_STRING:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -146,7 +146,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.BLOB_STRING:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -170,7 +170,7 @@ export class Decoder {
this.onErrorReply,
this.#decodeSimpleError(chunk)
);

case RESP_TYPES.BLOB_ERROR:
return this.#handleDecodedValue(
this.onErrorReply,
Expand All @@ -188,7 +188,7 @@ export class Decoder {
this.onReply,
this.#decodeSet(this.getTypeMapping(), chunk)
);

case RESP_TYPES.MAP:
return this.#handleDecodedValue(
this.onReply,
Expand Down Expand Up @@ -421,17 +421,17 @@ export class Decoder {
return this.#cursor === chunk.length ?
this.#decodeDoubleExponent.bind(this, d) :
this.#decodeDoubleExponent(d, chunk);

case ASCII['\r']:
this.#cursor = cursor + 2; // skip \r\n
return isNegative ? -double : double;
}

if (decimalIndex < Decoder.#DOUBLE_DECIMAL_MULTIPLIERS.length) {
double += (byte - ASCII['0']) * Decoder.#DOUBLE_DECIMAL_MULTIPLIERS[decimalIndex++];
}
} while (++cursor < chunk.length);

this.#cursor = cursor;
return this.#decodeDoubleDecimal.bind(this, isNegative, decimalIndex, double);
}
Expand Down Expand Up @@ -613,7 +613,7 @@ export class Decoder {
}

#decodeVerbatimStringFormat(stringLength, chunk) {
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
return this.#cursor >= chunk.length ?
this.#continueDecodeVerbatimStringFormat.bind(this, stringLength, formatCb) :
this.#continueDecodeVerbatimStringFormat(stringLength, formatCb, chunk);
Expand Down Expand Up @@ -689,13 +689,13 @@ export class Decoder {

case RESP_TYPES.BIG_NUMBER:
return this.#decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk);

case RESP_TYPES.DOUBLE:
return this.#decodeDouble(typeMapping[RESP_TYPES.DOUBLE], chunk);

case RESP_TYPES.SIMPLE_STRING:
return this.#decodeSimpleString(typeMapping[RESP_TYPES.SIMPLE_STRING], chunk);

case RESP_TYPES.BLOB_STRING:
return this.#decodeBlobString(typeMapping[RESP_TYPES.BLOB_STRING], chunk);

Expand All @@ -704,7 +704,7 @@ export class Decoder {

case RESP_TYPES.SIMPLE_ERROR:
return this.#decodeSimpleError(chunk);

case RESP_TYPES.BLOB_ERROR:
return this.#decodeBlobError(chunk);

Expand All @@ -713,7 +713,7 @@ export class Decoder {

case RESP_TYPES.SET:
return this.#decodeSet(typeMapping, chunk);

case RESP_TYPES.MAP:
return this.#decodeMap(typeMapping, chunk);

Expand Down Expand Up @@ -997,7 +997,7 @@ export class Decoder {
// decode simple string map key as string (and not as buffer)
case RESP_TYPES.SIMPLE_STRING:
return this.#decodeSimpleString(String, chunk);

// decode blob string map key as string (and not as buffer)
case RESP_TYPES.BLOB_STRING:
return this.#decodeBlobString(String, chunk);
Expand Down Expand Up @@ -1028,7 +1028,7 @@ export class Decoder {
this.#decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
}

const value = this.#decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
Expand Down
5 changes: 2 additions & 3 deletions packages/client/lib/client/cache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,12 @@ describe("Client Side Cache", () => {
});
});
});
it('should reflect comprehensive cache operations in stats via BasicClientSideCache', async function () {

describe('comprehensive stats', () => {
const csc = new BasicClientSideCache({
maxEntries: 2, // Small size to easily trigger evictions
});

testUtils.testWithClient('comprehensive_stats_run', async client => {
testUtils.testWithClient('should reflect comprehensive cache operations in stats via BasicClientSideCache', async client => {

// --- Phase 1: Initial misses and loads ---
await client.set('keyA', 'valueA_1');
Expand Down
88 changes: 85 additions & 3 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export interface CommandOptions<T = TypeMapping> {
* Timeout for the command in milliseconds
*/
timeout?: number;
/**
* @internal
* The slot the command is targeted to (if any)
*/
slotNumber?: number;
Comment thread
nkaradzhov marked this conversation as resolved.
}

export interface CommandToWrite extends CommandWaitingForReply {
Expand All @@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
listener: () => unknown;
originalTimeout: number | undefined;
} | undefined;
slotNumber?: number
}

interface CommandWaitingForReply {
Expand Down Expand Up @@ -186,14 +192,34 @@ export default class RedisCommandsQueue {
this.#pushHandlers.push(handler);
}

async waitForInflightCommandsToComplete(): Promise<void> {
async waitForInflightCommandsToComplete(options?: { timeoutMs?: number, flushOnTimeout?: boolean }): Promise<void> {
// In-flight commands already completed
if(this.#waitingForReply.length === 0) {
return
};
// Otherwise wait for in-flight commands to fire `empty` event
return new Promise(resolve => {
this.#waitingForReply.events.on('empty', resolve)
const onEmpty = () => {
if (timeoutId) clearTimeout(timeoutId);
resolve();
};

let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutMs = options?.timeoutMs;
if (timeoutMs !== undefined && timeoutMs > 0) {
timeoutId = setTimeout(() => {
this.#waitingForReply.events.off('empty', onEmpty);
const pendingCount = this.#waitingForReply.length;
dbgMaintenance(`waitForInflightCommandsToComplete timed out after ${timeoutMs}ms with ${pendingCount} commands still waiting`);
if (options?.flushOnTimeout && pendingCount > 0) {
dbgMaintenance(`Flushing ${pendingCount} commands that timed out waiting for reply`);
this.#flushWaitingForReply(new TimeoutError());
}
resolve(); // Resolve instead of reject - we don't want to fail the migration
}, timeoutMs);
}

this.#waitingForReply.events.once('empty', onEmpty);
});
}

Expand All @@ -219,6 +245,7 @@ export default class RedisCommandsQueue {
channelsCounter: undefined,
typeMapping: options?.typeMapping
};
value.slotNumber = options?.slotNumber

// If #maintenanceCommandTimeout was explicitly set, we should
// use it instead of the timeout provided by the command
Expand Down Expand Up @@ -283,7 +310,8 @@ export default class RedisCommandsQueue {
if (Array.isArray(reply)) {
if (this.#onPush(reply)) return;

if (PONG.equals(reply[0] as Buffer)) {
const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0];
if (PONG.equals(firstElement as Buffer)) {
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
Expand Down Expand Up @@ -342,6 +370,10 @@ export default class RedisCommandsQueue {
return this.#pubSub.removeAllListeners();
}

removeShardedPubSubListenersForSlots(slots: Set<number>) {
return this.#pubSub.removeShardedPubSubListenersForSlots(slots);
}

resubscribe(chainId?: symbol) {
const commands = this.#pubSub.resubscribe();
if (!commands.length) return;
Expand Down Expand Up @@ -541,4 +573,54 @@ export default class RedisCommandsQueue {
this.#waitingForReply.length === 0
);
}

/**
*
* Extracts commands for the given slots from the toWrite queue.
* Some commands dont have "slotNumber", which means they are not designated to particular slot/node.
* We ignore those.
*/
extractCommandsForSlots(slots: Set<number>): CommandToWrite[] {
const result: CommandToWrite[] = [];
let current = this.#toWrite.head;
while(current !== undefined) {
if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) {
result.push(current.value);
const toRemove = current;
current = current.next;
this.#toWrite.remove(toRemove);
} else {
// Move to next node even if we don't extract this command
current = current.next;
}
}
return result;
}

/**
* Gets all commands from the write queue without removing them.
*/
extractAllCommands(): CommandToWrite[] {
const result: CommandToWrite[] = [];
let current = this.#toWrite.head;
while(current) {
result.push(current.value);
this.#toWrite.remove(current);
current = current.next;
}
return result;
}

/**
* Prepends commands to the write queue in reverse.
*/
prependCommandsToWrite(commands: CommandToWrite[]) {
if (!commands.length) {
return;
}

for (let i = commands.length - 1; i >= 0; i--) {
this.#toWrite.unshift(commands[i]);
}
}
}
Loading
Loading