diff --git a/.github/workflows/run-test-suite.yml b/.github/workflows/run-test-suite.yml index 0be3d7c69..47f82e9df 100644 --- a/.github/workflows/run-test-suite.yml +++ b/.github/workflows/run-test-suite.yml @@ -108,6 +108,7 @@ jobs: run: ${{ inputs.test-command }} env: XDEBUG_MODE: off + TEMPORAL_TRANSCRIPT_DUMP_ON_FAIL: "1" - name: Check for failures if: steps.validate.outcome == 'failure' diff --git a/.run/Acceptance.run.xml b/.run/Acceptance.run.xml index ae1d8ef29..947e4841c 100644 --- a/.run/Acceptance.run.xml +++ b/.run/Acceptance.run.xml @@ -1,6 +1,11 @@ + + + + + - + \ No newline at end of file diff --git a/composer.json b/composer.json index 3632397e9..d3ea5fb04 100644 --- a/composer.json +++ b/composer.json @@ -104,7 +104,11 @@ "test:arch": "phpunit --testsuite=Arch --color=always --testdox", "test:accept": "tests/runner.php vendor/bin/phpunit --testsuite=Acceptance --color=always --testdox", "test:accept-slow": "tests/runner.php vendor/bin/phpunit --testsuite=\"Acceptance-Slow\" --color=always --testdox", - "test:accept-fast": "tests/runner.php vendor/bin/phpunit --testsuite=\"Acceptance-Fast\" --color=always --testdox" + "test:accept-fast": "tests/runner.php vendor/bin/phpunit --testsuite=\"Acceptance-Fast\" --color=always --testdox", + "transcripts:last": "php tests/Acceptance/transcript-merge.php", + "transcripts:list": "php tests/Acceptance/transcript-merge.php --list", + "transcripts:merge": "php tests/Acceptance/transcript-merge.php", + "transcripts:clean": "rm -rf runtime/tests/transcripts/*" }, "config": { "sort-packages": true, diff --git a/psalm-baseline.xml b/psalm-baseline.xml index cea94abbd..36b6ddf16 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -1430,6 +1430,82 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + $next($input)]]> + $next($input)]]> + + + + + fileDescriptor]]> + fileDescriptor]]> + + + + + + + fileDescriptor]]> + + + + + + + + + + + + getHeader()]]> + getPayloads()]]> + + + + + + + + + getSeconds() + \round($eventTime->getNanos() / 1_000_000_000, 6)]]> + + + + getSeconds()]]> + + diff --git a/runtime/.gitignore b/runtime/.gitignore index 72e8ffc0d..d6b7ef32c 100644 --- a/runtime/.gitignore +++ b/runtime/.gitignore @@ -1 +1,2 @@ * +!.gitignore diff --git a/testing/src/Transcript/MalformedTranscriptException.php b/testing/src/Transcript/MalformedTranscriptException.php new file mode 100644 index 000000000..d95a8d44a --- /dev/null +++ b/testing/src/Transcript/MalformedTranscriptException.php @@ -0,0 +1,23 @@ +buildAttributes(); + $this->transcript->writeMeta('activity_start', $attributes); + try { + $result = $next($input); + $this->transcript->writeMeta('activity_completed', $attributes); + return $result; + } catch (\Throwable $exception) { + $this->transcript->writeException('activity_throw', $attributes, $exception); + throw $exception; + } + } + + /** + * @return array + */ + private function buildAttributes(): array + { + try { + $info = Activity::getInfo(); + return [ + 'name' => $info->type->name, + 'attempt' => $info->attempt, + 'activity_id' => $info->id, + 'workflow_id' => $info->workflowExecution->getID(), + 'run_id' => $info->workflowExecution->getRunID(), + ]; + } catch (\Throwable) { + return ['name' => 'unknown', 'attempt' => 0]; + } + } +} diff --git a/testing/src/Transcript/TranscriptAdapter.php b/testing/src/Transcript/TranscriptAdapter.php new file mode 100644 index 000000000..c6bcf7db8 --- /dev/null +++ b/testing/src/Transcript/TranscriptAdapter.php @@ -0,0 +1,30 @@ +writer->writeLog((string) $level, (string) $message, $context); + } catch (\Throwable $error) { + $this->stderr->error('transcript-adapter-error', [ + 'message' => $error->getMessage(), + ]); + } + } +} diff --git a/testing/src/Transcript/TranscriptLine.php b/testing/src/Transcript/TranscriptLine.php new file mode 100644 index 000000000..d8d85f1ff --- /dev/null +++ b/testing/src/Transcript/TranscriptLine.php @@ -0,0 +1,32 @@ + $attributes + * @param array|null $payload + */ + public function __construct( + public readonly \DateTimeImmutable $timestamp, + public readonly int $processId, + public readonly int $sequence, + public readonly TranscriptSection $section, + public readonly array $attributes, + public readonly ?array $payload, + public readonly string $rawLine, + ) {} + + public function getAttribute(string $key): string|int|float|bool|null + { + return $this->attributes[$key] ?? null; + } + + public function hasAttribute(string $key): bool + { + return \array_key_exists($key, $this->attributes); + } +} diff --git a/testing/src/Transcript/TranscriptPaths.php b/testing/src/Transcript/TranscriptPaths.php new file mode 100644 index 000000000..6f4b80183 --- /dev/null +++ b/testing/src/Transcript/TranscriptPaths.php @@ -0,0 +1,102 @@ + $max ? \substr($value, 0, $max) : $value; + } +} diff --git a/testing/src/Transcript/TranscriptPlugin.php b/testing/src/Transcript/TranscriptPlugin.php new file mode 100644 index 000000000..7cf822327 --- /dev/null +++ b/testing/src/Transcript/TranscriptPlugin.php @@ -0,0 +1,27 @@ +addInterceptor(new TranscriptActivityInterceptor($this->transcript)); + $context->addInterceptor(new TranscriptWorkflowInterceptor($this->transcript)); + $next($context); + } +} diff --git a/testing/src/Transcript/TranscriptReader.php b/testing/src/Transcript/TranscriptReader.php new file mode 100644 index 000000000..a5c10939f --- /dev/null +++ b/testing/src/Transcript/TranscriptReader.php @@ -0,0 +1,192 @@ + */ + private array $files; + + public function __construct(string $directory) + { + $live = \glob($directory . '/*.log'); + $rotated = \glob($directory . '/*.log.*'); + $this->files = \array_values(\array_merge( + \is_array($live) ? $live : [], + \is_array($rotated) ? $rotated : [], + )); + } + + /** + * @return list + */ + public function getLines(): array + { + $lines = []; + foreach ($this->files as $file) { + $lineNumber = 0; + $handle = @\fopen($file, 'rb'); + if ($handle === false) { + continue; + } + try { + while (($raw = \fgets($handle)) !== false) { + $lineNumber++; + $raw = \rtrim($raw, "\n"); + if ($raw === '') { + continue; + } + $lines[] = $this->parseLine($raw, $file, $lineNumber); + } + } finally { + \fclose($handle); + } + } + \usort($lines, static function (TranscriptLine $a, TranscriptLine $b): int { + $byTimestamp = $a->timestamp <=> $b->timestamp; + if ($byTimestamp !== 0) { + return $byTimestamp; + } + $byProcess = $a->processId <=> $b->processId; + if ($byProcess !== 0) { + return $byProcess; + } + return $a->sequence <=> $b->sequence; + }); + return $lines; + } + + /** + * @return list + */ + public function findBySection(TranscriptSection $section): array + { + return \array_values(\array_filter( + $this->getLines(), + static fn(TranscriptLine $line): bool => $line->section === $section, + )); + } + + /** + * @return list + */ + public function getFiles(): array + { + return $this->files; + } + + /** + * Return lines that fall between the TEST_START and TEST_END boundaries for a specific test. + * If multiple boundary pairs exist (re-runs), the latest pair wins. + * + * @return list + */ + public function linesForTest(string $class, string $method): array + { + $lines = $this->getLines(); + $startLine = null; + $endLine = null; + foreach ($lines as $line) { + if ($line->section === TranscriptSection::TEST_START + && ($line->attributes['class'] ?? null) === $class + && ($line->attributes['method'] ?? null) === $method + ) { + $startLine = $line; + $endLine = null; + continue; + } + if ($line->section === TranscriptSection::TEST_END + && ($line->attributes['class'] ?? null) === $class + && ($line->attributes['method'] ?? null) === $method + ) { + $endLine = $line; + } + } + if ($startLine === null) { + return []; + } + $startTimestamp = $startLine->timestamp; + $endTimestamp = $endLine?->timestamp; + return \array_values(\array_filter( + $lines, + static function (TranscriptLine $candidate) use ($startTimestamp, $endTimestamp): bool { + if ($candidate->timestamp < $startTimestamp) { + return false; + } + if ($endTimestamp !== null && $candidate->timestamp > $endTimestamp) { + return false; + } + return true; + }, + )); + } + + private function parseLine(string $raw, string $file, int $lineNumber): TranscriptLine + { + $decoded = \json_decode($raw, true); + if (!\is_array($decoded)) { + throw new MalformedTranscriptException( + 'Line is not a valid JSON object: ' . \json_last_error_msg(), + $raw, + $lineNumber, + $file, + ); + } + + $sectionValue = $decoded['section'] ?? null; + if (!\is_string($sectionValue)) { + throw new MalformedTranscriptException('Missing section', $raw, $lineNumber, $file); + } + $sectionEnum = TranscriptSection::tryFrom($sectionValue); + if ($sectionEnum === null) { + throw new MalformedTranscriptException('Unknown section: ' . $sectionValue, $raw, $lineNumber, $file); + } + + $timestampRaw = $decoded['ts'] ?? null; + if (!\is_string($timestampRaw) || $timestampRaw === '') { + throw new MalformedTranscriptException('Missing or empty timestamp', $raw, $lineNumber, $file); + } + try { + $timestamp = new \DateTimeImmutable($timestampRaw); + } catch (\Throwable) { + throw new MalformedTranscriptException('Invalid timestamp: ' . $timestampRaw, $raw, $lineNumber, $file); + } + + $attributes = $decoded['attributes'] ?? []; + if (!\is_array($attributes)) { + throw new MalformedTranscriptException('attributes must be an object', $raw, $lineNumber, $file); + } + foreach ($attributes as $key => $value) { + if ($value !== null && !\is_scalar($value)) { + throw new MalformedTranscriptException( + \sprintf('attribute "%s" must be scalar or null, %s given', (string) $key, \get_debug_type($value)), + $raw, + $lineNumber, + $file, + ); + } + } + + $payload = $decoded['payload'] ?? null; + if ($payload !== null && !\is_array($payload)) { + throw new MalformedTranscriptException( + 'payload must be an object or null, ' . \get_debug_type($payload) . ' given', + $raw, + $lineNumber, + $file, + ); + } + + return new TranscriptLine( + timestamp: $timestamp, + processId: (int) ($decoded['pid'] ?? 0), + sequence: (int) ($decoded['seq'] ?? 0), + section: $sectionEnum, + attributes: $attributes, + payload: $payload, + rawLine: $raw, + ); + } +} diff --git a/testing/src/Transcript/TranscriptRun.php b/testing/src/Transcript/TranscriptRun.php new file mode 100644 index 000000000..7d35ea809 --- /dev/null +++ b/testing/src/Transcript/TranscriptRun.php @@ -0,0 +1,79 @@ + + */ + public function files(): array + { + $live = \glob($this->directory . '/*.log'); + $rotated = \glob($this->directory . '/*.log.*'); + return \array_values(\array_merge( + \is_array($live) ? $live : [], + \is_array($rotated) ? $rotated : [], + )); + } + + public function totalBytes(): int + { + $bytes = 0; + foreach ($this->files() as $file) { + $bytes += (int) @\filesize($file); + } + return $bytes; + } + + public function reader(): TranscriptReader + { + return new TranscriptReader($this->directory); + } + + public function merge(): string + { + $mergedDirectory = TranscriptPaths::mergedDirectory($this->directory); + try { + (new Filesystem())->mkdir($mergedDirectory); + } catch (IOException $ioError) { + throw new \RuntimeException( + "Failed to create merged directory: {$mergedDirectory} ({$ioError->getMessage()})", + previous: $ioError, + ); + } + $path = TranscriptPaths::mergedFile($this->directory); + $handle = \fopen($path, 'wb'); + if ($handle === false) { + throw new \RuntimeException("Failed to open merged file: {$path}"); + } + if (!\flock($handle, \LOCK_EX)) { + \fclose($handle); + throw new \RuntimeException("Failed to acquire lock on merged file: {$path}"); + } + try { + foreach ($this->reader()->getLines() as $line) { + $payload = $line->rawLine . "\n"; + $written = \fwrite($handle, $payload); + if ($written === false || $written < \strlen($payload)) { + throw new \RuntimeException("Short write while merging transcript at {$path}"); + } + } + } finally { + \flock($handle, \LOCK_UN); + \fclose($handle); + } + return $path; + } +} diff --git a/testing/src/Transcript/TranscriptSection.php b/testing/src/Transcript/TranscriptSection.php new file mode 100644 index 000000000..b4ad17885 --- /dev/null +++ b/testing/src/Transcript/TranscriptSection.php @@ -0,0 +1,22 @@ +stderr = $stderr ?? new NullLogger(); + $this->filesystem = new Filesystem(); + try { + $this->filesystem->mkdir($this->baseDirectory); + } catch (IOException $ioError) { + $this->stderr->warning('transcript-store: base directory create failed', [ + 'path' => $this->baseDirectory, + 'message' => $ioError->getMessage(), + ]); + } + } + + public static function create(?string $projectRoot = null, ?LoggerInterface $stderr = null): self + { + $projectRoot ??= \dirname(__DIR__, 4); + $configured = \getenv(self::BASE_DIR_ENV); + if (\is_string($configured) && $configured !== '') { + $base = \str_starts_with($configured, '/') + ? $configured + : $projectRoot . '/' . $configured; + return new self($base, $stderr); + } + return new self($projectRoot . '/' . self::DEFAULT_BASE_RELATIVE, $stderr); + } + + public static function currentRunIdFromEnvironment(): ?string + { + $runId = \getenv(self::RUN_ID_ENV); + return \is_string($runId) && $runId !== '' ? $runId : null; + } + + public static function currentRunIdOrOrphan(): string + { + return self::currentRunIdFromEnvironment() ?? ('orphan-' . (\getmypid() ?: 0)); + } + + public static function getOrCreateRunId(): string + { + $runId = self::currentRunIdFromEnvironment(); + if ($runId !== null) { + return $runId; + } + $runId = TranscriptPaths::generateRunId(); + \putenv(self::RUN_ID_ENV . '=' . $runId); + return $runId; + } + + public function runDirectory(string $runId): string + { + return TranscriptPaths::runDirectory($this->baseDirectory, $runId); + } + + public function ensureRunDirectory(string $runId): string + { + $directory = $this->runDirectory($runId); + $this->filesystem->mkdir($directory); + return $directory; + } + + /** + * @return list + */ + public function listRuns(): array + { + $entries = @\scandir($this->baseDirectory); + if ($entries === false) { + return []; + } + $runs = []; + foreach ($entries as $entry) { + if ($entry === '.' || $entry === '..' || TranscriptPaths::isReservedEntry($entry)) { + continue; + } + $path = $this->baseDirectory . '/' . $entry; + if (!\is_dir($path)) { + continue; + } + $mtime = @\filemtime($path); + $runs[] = new TranscriptRun( + id: $entry, + directory: $path, + mtime: $mtime === false ? null : $mtime, + ); + } + \usort( + $runs, + static function (TranscriptRun $a, TranscriptRun $b): int { + $byMtime = ($b->mtime ?? 0) <=> ($a->mtime ?? 0); + if ($byMtime !== 0) { + return $byMtime; + } + return $b->id <=> $a->id; + }, + ); + return $runs; + } + + public function latestRun(): ?TranscriptRun + { + return $this->listRuns()[0] ?? null; + } + + public function findRun(string $runId): ?TranscriptRun + { + $directory = TranscriptPaths::runDirectory($this->baseDirectory, $runId); + if (!\is_dir($directory)) { + return null; + } + $mtime = @\filemtime($directory); + return new TranscriptRun( + id: \basename($directory), + directory: $directory, + mtime: $mtime === false ? null : $mtime, + ); + } + + public function currentRun(): ?TranscriptRun + { + $runId = self::currentRunIdFromEnvironment(); + return $runId === null ? $this->latestRun() : $this->findRun($runId); + } + + public function readMergedRun(?string $runId = null): ?string + { + $run = $runId === null ? $this->currentRun() : $this->findRun($runId); + if ($run === null || $run->files() === []) { + return null; + } + $content = @\file_get_contents($run->merge()); + return \is_string($content) && $content !== '' ? $content : null; + } + + public function pruneOldRuns(int $keep): int + { + $keep = \max(0, $keep); + $stale = \array_slice($this->listRuns(), $keep); + $deleted = 0; + foreach ($stale as $run) { + try { + $this->filesystem->remove($run->directory); + $deleted++; + } catch (IOException $ioError) { + $this->stderr->warning('transcript-store: prune failed', [ + 'path' => $run->directory, + 'message' => $ioError->getMessage(), + ]); + } + } + return $deleted; + } + + public function createWriter(string $runId, string $processLabel): TranscriptWriter + { + $directory = $this->ensureRunDirectory($runId); + return new TranscriptWriter(TranscriptPaths::writerFile($directory, $processLabel), $this->stderr); + } +} diff --git a/testing/src/Transcript/TranscriptWorkflowInterceptor.php b/testing/src/Transcript/TranscriptWorkflowInterceptor.php new file mode 100644 index 000000000..1206e5f3c --- /dev/null +++ b/testing/src/Transcript/TranscriptWorkflowInterceptor.php @@ -0,0 +1,87 @@ + $input->info->type->name, + 'workflow_id' => $input->info->execution->getID(), + 'run_id' => $input->info->execution->getRunID(), + 'is_replaying' => $input->isReplaying, + ]; + $this->runPhase('workflow_execute', $attributes, fn() => $next($input)); + } + + public function handleSignal(SignalInput $input, callable $next): void + { + $attributes = [ + 'signal_name' => $input->signalName, + 'workflow_id' => $input->info->execution->getID(), + 'is_replaying' => $input->isReplaying, + ]; + $this->runPhase('workflow_signal', $attributes, fn() => $next($input)); + } + + public function handleQuery(QueryInput $input, callable $next): mixed + { + $attributes = [ + 'query_name' => $input->queryName, + 'workflow_id' => $input->info->execution->getID(), + ]; + return $this->runPhase('workflow_query', $attributes, fn() => $next($input)); + } + + public function handleUpdate(UpdateInput $input, callable $next): mixed + { + $attributes = [ + 'update_name' => $input->updateName, + 'update_id' => $input->updateId, + 'workflow_id' => $input->info->execution->getID(), + 'is_replaying' => $input->isReplaying, + ]; + return $this->runPhase('workflow_update', $attributes, fn() => $next($input)); + } + + public function validateUpdate(UpdateInput $input, callable $next): void + { + $next($input); + } + + /** + * @template T + * @param array $attributes + * @param callable(): T $execution + * @return T + */ + private function runPhase(string $phase, array $attributes, callable $execution): mixed + { + $this->transcript->writeMeta($phase . '_start', $attributes); + try { + $result = $execution(); + $this->transcript->writeMeta($phase . '_completed', $attributes); + return $result; + } catch (\Throwable $exception) { + $this->transcript->writeException($phase . '_failed', $attributes, $exception); + throw $exception; + } + } +} diff --git a/testing/src/Transcript/TranscriptWriter.php b/testing/src/Transcript/TranscriptWriter.php new file mode 100644 index 000000000..7be3d4434 --- /dev/null +++ b/testing/src/Transcript/TranscriptWriter.php @@ -0,0 +1,343 @@ +processId = \getmypid() ?: 0; + $this->currentPath = $path; + $this->stderr = $stderr ?? new NullLogger(); + $this->filesystem = new Filesystem(); + $this->openFileDescriptor($path); + $this->writeMeta('writer_initialized', [ + 'path' => $path, + 'worker_start_epoch_ms' => TranscriptPaths::currentEpochMs(), + ]); + \register_shutdown_function(function (): void { + if ($this->fileDescriptor !== null) { + @\fflush($this->fileDescriptor); + } + }); + } + + public function getPath(): string + { + return $this->currentPath; + } + + /** + * @param array $attributes + */ + public function write( + TranscriptSection $section, + array $attributes = [], + mixed $payload = null, + ): void { + if ($this->inWrite) { + return; + } + $this->inWrite = true; + try { + $this->doWrite($section, $attributes, $payload); + } catch (\Throwable $e) { + $this->stderr->error('transcript-writer-internal-error', [ + 'class' => $e::class, + 'message' => $e->getMessage(), + ]); + } finally { + $this->inWrite = false; + } + } + + public function writeLog(string $level, string $message, array $context = []): void + { + $this->write(TranscriptSection::LOG, [ + 'level' => $level, + 'message' => $message, + ], $context === [] ? null : $context); + } + + public function writeWireInbound(string $frame, array $headers, int $inboundBatchId): void + { + $this->write(TranscriptSection::WIRE_INBOUND, [ + 'inbound_batch_id' => $inboundBatchId, + 'bytes' => \strlen($frame), + ], [ + 'headers' => $headers, + 'body' => $this->safeDecodeFrame($frame), + ]); + } + + public function writeWireOutbound(string $frame, int $inboundBatchId, int $outboundSeq): void + { + $this->write(TranscriptSection::WIRE_OUTBOUND, [ + 'inbound_batch_id' => $inboundBatchId, + 'outbound_seq' => $outboundSeq, + 'bytes' => \strlen($frame), + ], [ + 'body' => $this->safeDecodeFrame($frame), + ]); + } + + public function writeWireError(\Throwable $error): void + { + $this->write(TranscriptSection::WIRE_ERROR, [ + 'class' => $error::class, + ], [ + 'message' => $error->getMessage(), + 'trace' => $error->getTraceAsString(), + ]); + } + + public function writeException(string $phase, array $attributes, \Throwable $exception): void + { + $this->write(TranscriptSection::EXCEPTION, ['phase' => $phase] + $attributes, [ + 'class' => $exception::class, + 'message' => $exception->getMessage(), + 'trace' => $exception->getTraceAsString(), + 'previous' => $exception->getPrevious()?->getMessage(), + ]); + } + + public function writeFatal(\Throwable $throwable): void + { + $this->write(TranscriptSection::FATAL, [ + 'class' => $throwable::class, + ], [ + 'message' => $throwable->getMessage(), + 'trace' => $throwable->getTraceAsString(), + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + ]); + } + + /** + * @param array $errorRecord + */ + public function writeFatalFromError(array $errorRecord): void + { + $this->write(TranscriptSection::FATAL, [ + 'type' => (int) ($errorRecord['type'] ?? 0), + 'file' => (string) ($errorRecord['file'] ?? ''), + 'line' => (int) ($errorRecord['line'] ?? 0), + ], [ + 'message' => (string) ($errorRecord['message'] ?? ''), + ]); + } + + public function writeError(int $type, string $message, string $file, int $line): void + { + $this->write(TranscriptSection::ERROR, [ + 'type' => $type, + 'file' => $file, + 'line' => $line, + ], [ + 'message' => $message, + ]); + } + + public function writeTestBoundary(TranscriptSection $boundary, array $attributes): void + { + if ($boundary !== TranscriptSection::TEST_START && $boundary !== TranscriptSection::TEST_END) { + return; + } + $this->write($boundary, $attributes); + } + + public function writeHistoryEvent(string $workflowId, string $runId, array $eventAttributes, string $attributesJson): void + { + $this->write(TranscriptSection::HISTORY, [ + 'workflow_id' => $workflowId, + 'run_id' => $runId, + ] + $eventAttributes, [ + 'attrs' => $attributesJson, + ]); + } + + public function writeHistoryError(string $workflowId, \Throwable $error): void + { + $this->write(TranscriptSection::HISTORY_ERROR, [ + 'workflow_id' => $workflowId, + 'class' => $error::class, + ], [ + 'message' => $error->getMessage(), + ]); + } + + public function writeMeta(string $event, array $attributes = []): void + { + $this->write(TranscriptSection::META, ['event' => $event] + $attributes); + } + + public function flush(): void + { + if ($this->fileDescriptor === null) { + return; + } + @\fflush($this->fileDescriptor); + } + + /** + * @param array $attributes + */ + private function doWrite(TranscriptSection $section, array $attributes, mixed $payload): void + { + if ($this->fileDescriptor === null) { + return; + } + $this->rotateIfNeeded(); + + $this->sequence++; + $record = [ + 'ts' => (new \DateTimeImmutable('now'))->format('Y-m-d\TH:i:s.uP'), + 'pid' => $this->processId, + 'seq' => $this->sequence, + 'section' => $section->value, + 'attributes' => (object) $attributes, + ]; + if ($payload !== null) { + $record['payload'] = $payload; + } + + $encoded = \json_encode($record, self::JSON_FLAGS); + if ($encoded === false) { + $encoded = \json_encode([ + 'ts' => $record['ts'], + 'pid' => $this->processId, + 'seq' => $this->sequence, + 'section' => $section->value, + 'attributes' => (object) $attributes, + 'payload' => ['error' => 'json_encode_failed', 'message' => \json_last_error_msg()], + ], self::JSON_FLAGS); + } + if ($encoded === false) { + $this->stderr->error('transcript-writer-internal-error: json fallback failed', [ + 'message' => \json_last_error_msg(), + ]); + return; + } + $line = $encoded . "\n"; + + if (!\flock($this->fileDescriptor, \LOCK_EX)) { + $this->stderr->error('transcript-writer-internal-error: flock failed'); + return; + } + try { + \fwrite($this->fileDescriptor, $line); + \fflush($this->fileDescriptor); + } finally { + \flock($this->fileDescriptor, \LOCK_UN); + } + } + + private function rotateIfNeeded(): void + { + $stat = @\fstat($this->fileDescriptor); + if ($stat === false) { + return; + } + if ($stat['size'] < self::SIZE_CAP_BYTES) { + return; + } + $this->rotationCounter++; + $rotated = TranscriptPaths::rotatedFile($this->currentPath, $this->rotationCounter); + try { + $this->filesystem->rename($this->currentPath, $rotated, true); + } catch (IOException $ioError) { + $this->stderr->error('transcript-writer-internal-error: rotate rename failed', [ + 'from' => $this->currentPath, + 'to' => $rotated, + 'message' => $ioError->getMessage(), + ]); + return; + } + $this->openFileDescriptor($this->currentPath); + $this->doWrite(TranscriptSection::META, [ + 'event' => 'writer_rotated', + 'from' => $rotated, + 'to' => $this->currentPath, + 'reason' => 'size_cap', + ], null); + } + + private function openFileDescriptor(string $path): void + { + try { + $this->filesystem->mkdir(\dirname($path)); + } catch (IOException $ioError) { + $this->stderr->error('transcript-writer-internal-error: mkdir failed', [ + 'path' => $path, + 'message' => $ioError->getMessage(), + ]); + } + $resource = @\fopen($path, 'ab'); + if ($resource === false) { + $this->stderr->error('transcript-writer-internal-error: fopen failed', ['path' => $path]); + if (\is_resource($this->fileDescriptor)) { + @\fclose($this->fileDescriptor); + } + $this->fileDescriptor = null; + return; + } + if (\is_resource($this->fileDescriptor)) { + @\fclose($this->fileDescriptor); + } + $this->fileDescriptor = $resource; + } + + private function safeDecodeFrame(string $frame): mixed + { + $trimmed = \ltrim($frame); + if ($trimmed !== '' && ($trimmed[0] === '{' || $trimmed[0] === '[')) { + $decoded = \json_decode($frame, true); + if ($decoded !== null || \json_last_error() === \JSON_ERROR_NONE) { + return ['encoding' => 'json', 'value' => $decoded]; + } + } + + $temporalFrame = WireFrameDecoder::decode($frame); + if ($temporalFrame !== null) { + return $temporalFrame; + } + + return [ + 'encoding' => 'raw', + 'preview_base64' => \base64_encode(\substr($frame, 0, 512)), + ]; + } +} diff --git a/testing/src/Transcript/WireFrameDecoder.php b/testing/src/Transcript/WireFrameDecoder.php new file mode 100644 index 000000000..229bf198d --- /dev/null +++ b/testing/src/Transcript/WireFrameDecoder.php @@ -0,0 +1,170 @@ +>}|null + */ + public static function decode(string $frame, ?DataConverterInterface $converter = null): ?array + { + if ($frame === '') { + return null; + } + + $proto = new Frame(); + try { + $proto->mergeFromString($frame); + } catch (\Throwable) { + return null; + } + + $messages = $proto->getMessages(); + if (\count($messages) === 0) { + return null; + } + + $converter ??= self::$defaultConverter ??= DataConverter::createDefault(); + + $decoded = []; + foreach ($messages as $message) { + $decoded[] = self::decodeMessage($message, $converter); + } + + return [ + 'encoding' => 'temporal-frame', + 'messages' => $decoded, + ]; + } + + /** + * Uses proto's native JSON serialization to extract non-default fields, then + * replaces bytes-shaped fields (`options`, `payloads`, `header`) with their + * SDK-decoded, human-readable representation. + * + * @return array + */ + private static function decodeMessage(Message $message, DataConverterInterface $converter): array + { + try { + $json = $message->serializeToJsonString(true); + } catch (\Throwable) { + return ['error' => 'proto_json_serialize_failed']; + } + $decoded = \json_decode($json, true) ?? []; + + if ($message->getOptions() !== '') { + $decoded['options'] = self::decodeJsonBytes($message->getOptions()); + } + if ($message->hasPayloads()) { + $decoded['payloads'] = self::decodePayloads($message->getPayloads(), $converter); + } + if ($message->hasHeader()) { + $decoded['header'] = self::decodeHeader($message->getHeader(), $converter); + } + + return $decoded; + } + + /** + * @return list + */ + private static function decodePayloads(Payloads $payloads, DataConverterInterface $converter): array + { + $out = []; + foreach ($payloads->getPayloads() as $payload) { + $out[] = self::decodeSinglePayload($payload, $converter); + } + return $out; + } + + /** + * @return array + */ + private static function decodeHeader(Header $header, DataConverterInterface $converter): array + { + $out = []; + /** @var MapField $fields */ + $fields = $header->getFields(); + foreach ($fields as $name => $payload) { + $out[$name] = self::decodeSinglePayload($payload, $converter); + } + return $out; + } + + /** + * Decodes a single payload using its own `metadata.encoding`. Falls back to a raw + * representation when encoding is absent (e.g., {@see \Temporal\DataConverter\RawValue}) + * or when the converter cannot interpret the bytes. + */ + private static function decodeSinglePayload(Payload $payload, DataConverterInterface $converter): mixed + { + /** @var MapField $meta */ + $meta = $payload->getMetadata(); + if (!isset($meta[EncodingKeys::METADATA_ENCODING_KEY])) { + return self::payloadFallback($payload); + } + try { + return $converter->fromPayload($payload, null); + } catch (\Throwable) { + return self::payloadFallback($payload); + } + } + + /** + * @return array + */ + private static function payloadFallback(Payload $payload): array + { + $metadata = []; + $meta = $payload->getMetadata(); + foreach ($meta as $key => $value) { + $metadata[$key] = self::bytesToReadable((string) $value); + } + return [ + 'metadata' => $metadata, + 'data' => self::bytesToReadable($payload->getData()), + ]; + } + + private static function decodeJsonBytes(string $bytes): mixed + { + $decoded = \json_decode($bytes, true); + if ($decoded !== null || \json_last_error() === \JSON_ERROR_NONE) { + return $decoded; + } + return self::bytesToReadable($bytes); + } + + /** + * Returns the bytes as a UTF-8 string when valid, otherwise wraps the + * bytes in a base64 representation so the JSON line stays well-formed. + */ + private static function bytesToReadable(string $bytes): mixed + { + if ($bytes === '') { + return ''; + } + if (\preg_match('//u', $bytes) === 1) { + return $bytes; + } + return ['encoding' => 'base64', 'value' => \base64_encode($bytes)]; + } +} diff --git a/testing/src/Transcript/WorkflowHistoryDumper.php b/testing/src/Transcript/WorkflowHistoryDumper.php new file mode 100644 index 000000000..53215b79f --- /dev/null +++ b/testing/src/Transcript/WorkflowHistoryDumper.php @@ -0,0 +1,126 @@ + $args Call arguments; WorkflowStubInterface entries contribute their execution. + */ + public function dump( + TranscriptWriter $transcript, + WorkflowClientInterface $workflowClient, + array $args, + ): void { + $executions = []; + $stubCount = 0; + foreach ($args as $arg) { + if (!$arg instanceof WorkflowStubInterface) { + continue; + } + $stubCount++; + $execution = $arg->getExecution(); + if ($execution->getRunID() === null) { + $transcript->writeMeta('history_skipped', [ + 'reason' => 'no_run_id', + 'workflow_id' => $execution->getID(), + ]); + continue; + } + $key = $execution->getID() . ':' . $execution->getRunID(); + $executions[$key] = $execution; + } + + if ($executions === [] && $stubCount === 0) { + $transcript->writeMeta('history_skipped', ['reason' => 'no_executions_inspected']); + return; + } + + foreach ($executions as $execution) { + $this->dumpExecution($transcript, $workflowClient, $execution); + } + } + + private function dumpExecution( + TranscriptWriter $transcript, + WorkflowClientInterface $workflowClient, + WorkflowExecution $execution, + ): void { + try { + $eventCount = 0; + $startSec = null; + foreach ($workflowClient->getWorkflowHistory($execution) as $event) { + $eventCount++; + $eventAttributes = [ + 'event_id' => (int) $event->getEventId(), + 'event_type' => EventType::name($event->getEventType()), + ]; + $eventTime = $event->getEventTime(); + if ($eventTime !== null) { + $sec = $eventTime->getSeconds() + \round($eventTime->getNanos() / 1_000_000_000, 6); + $eventAttributes['event_time'] = $eventTime->getSeconds() . '.' . $eventTime->getNanos(); + $startSec ??= $sec; + $eventAttributes['delta_ms'] = (int) \round(($sec - $startSec) * 1000); + } + $eventAttributes += $this->extractFailureSummary($event); + $payloadJson = '{}'; + try { + $payloadJson = $event->serializeToJsonString(); + } catch (\Throwable $serializationError) { + $eventAttributes['serialize_error'] = $serializationError->getMessage(); + } + $transcript->writeHistoryEvent( + $execution->getID(), + (string) $execution->getRunID(), + $eventAttributes, + $payloadJson, + ); + } + $transcript->writeMeta('history_dumped', [ + 'workflow_id' => $execution->getID(), + 'run_id' => $execution->getRunID(), + 'event_count' => $eventCount, + ]); + } catch (\Throwable $historyError) { + $transcript->writeHistoryError($execution->getID(), $historyError); + } + } + + /** + * Pulls a one-line summary out of *Failed / *FailedEventAttributes so + * the row can be grepped/scanned without parsing the full proto-JSON. + * + * @return array + */ + private function extractFailureSummary(HistoryEvent $event): array + { + $cause = $event->getStartChildWorkflowExecutionFailedEventAttributes()?->getCause() + ?? $event->getSignalExternalWorkflowExecutionFailedEventAttributes()?->getCause() + ?? $event->getRequestCancelExternalWorkflowExecutionFailedEventAttributes()?->getCause(); + if ($cause !== null) { + return ['cause' => (string) $cause]; + } + + $failure = $event->getActivityTaskFailedEventAttributes()?->getFailure() + ?? $event->getWorkflowTaskFailedEventAttributes()?->getFailure() + ?? $event->getNexusOperationFailedEventAttributes()?->getFailure() + ?? $event->getWorkflowExecutionFailedEventAttributes()?->getFailure() + ?? $event->getChildWorkflowExecutionFailedEventAttributes()?->getFailure() + ?? $event->getNexusOperationCancelRequestFailedEventAttributes()?->getFailure(); + if ($failure === null) { + return []; + } + return [ + 'failure_kind' => $failure->getFailureInfo(), + 'failure_message' => $failure->getMessage(), + ]; + } +} diff --git a/tests/Acceptance/.rr.yaml b/tests/Acceptance/.rr.yaml index aa13bec30..07c49f144 100644 --- a/tests/Acceptance/.rr.yaml +++ b/tests/Acceptance/.rr.yaml @@ -4,6 +4,8 @@ rpc: server: command: "php worker.php" + env: + TEMPORAL_TRANSCRIPT_DIR: "runtime/tests/transcripts" # Workflow and activity mesh service temporal: diff --git a/tests/Acceptance/App/Feature/WorkerFactory.php b/tests/Acceptance/App/Feature/WorkerFactory.php index c2c14f741..5c50eaae7 100644 --- a/tests/Acceptance/App/Feature/WorkerFactory.php +++ b/tests/Acceptance/App/Feature/WorkerFactory.php @@ -4,54 +4,69 @@ namespace Temporal\Tests\Acceptance\App\Feature; +use Psr\Log\LoggerInterface; use Spiral\Core\Attribute\Singleton; use Spiral\Core\InvokerInterface; use Temporal\Tests\Acceptance\App\Attribute\Worker; +use Temporal\Tests\Acceptance\App\Logger\FanoutLogger; use Temporal\Tests\Acceptance\App\Logger\LoggerFactory; +use Temporal\Testing\Transcript\TranscriptAdapter; +use Temporal\Testing\Transcript\TranscriptWriter; use Temporal\Tests\Acceptance\App\Runtime\Feature; -use Spiral\Core\Container\InjectorInterface; -use Temporal\Client\WorkflowStubInterface; use Temporal\Worker\WorkerFactoryInterface; use Temporal\Worker\WorkerInterface; use Temporal\Worker\WorkerOptions; -/** - * @implements InjectorInterface - */ #[Singleton] final class WorkerFactory { public function __construct( private readonly WorkerFactoryInterface $workerFactory, private readonly InvokerInterface $invoker, + private readonly LoggerInterface $logger, + private readonly ?TranscriptWriter $transcript = null, ) {} public function createWorker( Feature $feature, ): WorkerInterface { - // Find Worker attribute - $attr = self::findAttribute( + $attribute = self::findAttribute( ...\array_map(static fn(array $check): string => $check[0], $feature->checks), ...$feature->workflows, ...$feature->activities, ); - $options = $attr?->options === null ? null : $this->invoker->invoke($attr->options); - $interceptorProvider = $attr?->pipelineProvider === null ? null : $this->invoker->invoke($attr->pipelineProvider); - $logger = $attr?->logger === null ? null : $this->invoker->invoke($attr->logger); + $options = $attribute?->options === null ? null : $this->invoker->invoke($attribute->options); + $interceptorProvider = $attribute?->pipelineProvider === null + ? null + : $this->invoker->invoke($attribute->pipelineProvider); + $logger = $attribute?->logger === null ? null : $this->invoker->invoke($attribute->logger); + if ($logger !== null && !$logger instanceof LoggerInterface) { + throw new \InvalidArgumentException(sprintf("Logger must implement PSR-3 LoggerInterface, got %s", \get_debug_type($logger))); + } - // Add plugins from the attribute to the factory's registry (already instantiated, no invoker needed) - if ($attr?->plugins !== null) { - $this->workerFactory->getPluginRegistry()->merge($attr->plugins); + if ($attribute?->plugins !== null) { + $this->workerFactory->getPluginRegistry()->merge($attribute->plugins); } return $this->workerFactory->newWorker( $feature->taskQueue, $options ?? WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(10), interceptorProvider: $interceptorProvider, - logger: $logger ?? LoggerFactory::createServerLogger($feature->taskQueue), + logger: $this->decorateLogger($logger, $feature), ); } + private function decorateLogger(?LoggerInterface $logger, Feature $feature): LoggerInterface + { + $serverLogger = LoggerFactory::createServerLogger($feature->taskQueue); + $loggers = [$this->logger, $logger, $serverLogger]; + if ($this->transcript !== null) { + $loggers[] = new TranscriptAdapter($this->transcript, $this->logger); + } + + return new FanoutLogger(...array_filter($loggers)); + } + /** * Find {@see Worker} attribute in the classes collection. * If more than one attribute is found, an exception is thrown. diff --git a/tests/Acceptance/App/Logger/FanoutLogger.php b/tests/Acceptance/App/Logger/FanoutLogger.php new file mode 100644 index 000000000..74316921c --- /dev/null +++ b/tests/Acceptance/App/Logger/FanoutLogger.php @@ -0,0 +1,37 @@ + */ + private readonly array $sinks; + + public function __construct( + private readonly LoggerInterface $stderr, + LoggerInterface ...$sinks, + ) { + $this->sinks = $sinks; + } + + public function log($level, \Stringable|string $message, array $context = []): void + { + foreach ($this->sinks as $sink) { + try { + $sink->log($level, $message, $context); + } catch (\Throwable $error) { + $this->stderr->error('fanout-logger-error', [ + 'sink' => $sink::class, + 'message' => $error->getMessage(), + ]); + } + } + } +} diff --git a/tests/Acceptance/App/Logger/LoggerFactory.php b/tests/Acceptance/App/Logger/LoggerFactory.php index 5dc4d4f95..94926ca0f 100644 --- a/tests/Acceptance/App/Logger/LoggerFactory.php +++ b/tests/Acceptance/App/Logger/LoggerFactory.php @@ -4,65 +4,29 @@ namespace Temporal\Tests\Acceptance\App\Logger; -/** - * Factory for creating logger instances used in acceptance tests. - * - * Provides methods to create both server-side and client-side loggers with appropriate - * configuration for test environments. - */ +use Symfony\Component\Filesystem\Filesystem; + final class LoggerFactory { - /** @var non-empty-string Default relative path for test logs */ private const DEFAULT_LOG_DIR = 'runtime/tests/logs'; - /** - * Create a server-side file logger. - * - * @param non-empty-string $taskQueue Task queue name used to identify the log file - * @param non-empty-string|null $baseDir Optional base directory, defaults to project root - * @return FileLogger Configured file logger instance - */ public static function createServerLogger( string $taskQueue, ?string $baseDir = null, ): FileLogger { - $logDir = self::getLogDirectory($baseDir); - return new FileLogger($logDir, $taskQueue); + return new FileLogger(self::getLogDirectory($baseDir), $taskQueue); } - /** - * Create a client-side logger for assertions. - * - * The client logger provides methods for reading and analyzing log entries - * created by the server-side logger. - * - * @param non-empty-string $taskQueue Task queue name used to identify the log file - * @param non-empty-string|null $baseDir Optional base directory, defaults to project root - * @return ClientLogger Configured client logger instance - */ public static function createClientLogger( string $taskQueue, ?string $baseDir = null, ): ClientLogger { - $logDir = self::getLogDirectory($baseDir); - return new ClientLogger($logDir, $taskQueue); + return new ClientLogger(self::getLogDirectory($baseDir), $taskQueue); } - /** - * Generate the log filename for a specific task queue. - * - * Uses sha1 hash of task queue name to handle special characters. - * - * @param non-empty-string $dir Directory path - * @param non-empty-string $taskQueue Task queue name - * @return non-empty-string Full path to the log file - */ - public static function getLogFilename( - string $dir, - string $taskQueue, - ): string { + public static function getLogFilename(string $dir, string $taskQueue): string + { $filename = \sha1($taskQueue) . '.log'; - return \preg_replace( '#/{2,}#', '/', @@ -70,24 +34,11 @@ public static function getLogFilename( ); } - /** - * Get the absolute path to the log directory. - * - * Creates the directory if it doesn't exist. - * - * @param non-empty-string|null $baseDir Optional base directory, defaults to project root - * @return non-empty-string Absolute path to the log directory - */ private static function getLogDirectory(?string $baseDir = null): string { - $baseDir ??= \dirname(__DIR__, 4); // Go up to project root + $baseDir ??= \dirname(__DIR__, 4); $logDir = $baseDir . '/' . self::DEFAULT_LOG_DIR; - - // Ensure directory exists - if (!\is_dir($logDir)) { - \mkdir($logDir, 0777, true); - } - + (new Filesystem())->mkdir($logDir); return $logDir; } } diff --git a/tests/Acceptance/App/Runtime/ContainerFacade.php b/tests/Acceptance/App/Runtime/ContainerFacade.php index 8229424c3..c5498bdeb 100644 --- a/tests/Acceptance/App/Runtime/ContainerFacade.php +++ b/tests/Acceptance/App/Runtime/ContainerFacade.php @@ -4,7 +4,9 @@ namespace Temporal\Tests\Acceptance\App\Runtime; +use Spiral\Core\Container; + final class ContainerFacade { - public static \Spiral\Core\Container $container; + public static Container $container; } diff --git a/tests/Acceptance/App/Runtime/FatalHandler.php b/tests/Acceptance/App/Runtime/FatalHandler.php new file mode 100644 index 000000000..9adee63c2 --- /dev/null +++ b/tests/Acceptance/App/Runtime/FatalHandler.php @@ -0,0 +1,89 @@ +writeError($type, $message, $file, $line); + } finally { + self::$inHandler = false; + } + return false; + }); + + \set_exception_handler(static function (\Throwable $throwable) use ($stderr, $writer): void { + if (self::$inHandler) { + return; + } + self::$inHandler = true; + try { + $writer->writeFatal($throwable); + $writer->flush(); + } finally { + self::$inHandler = false; + } + $stderr->critical('fatal', [ + 'class' => $throwable::class, + 'message' => $throwable->getMessage(), + ]); + exit(1); + }); + + \register_shutdown_function(static function () use ($writer): void { + $error = \error_get_last(); + if ($error === null) { + $writer->flush(); + return; + } + if (!\in_array((int) $error['type'], self::FATAL_ERROR_TYPES, true)) { + $writer->flush(); + return; + } + if (self::$inHandler) { + return; + } + self::$inHandler = true; + try { + $writer->writeFatalFromError($error); + $writer->flush(); + } finally { + self::$inHandler = false; + } + }); + + $writer->writeMeta('fatal_handler_registered', [ + 'pid' => \getmypid() ?: 0, + ]); + } +} diff --git a/tests/Acceptance/App/Runtime/RRStarter.php b/tests/Acceptance/App/Runtime/RRStarter.php index f3c653764..694f13306 100644 --- a/tests/Acceptance/App/Runtime/RRStarter.php +++ b/tests/Acceptance/App/Runtime/RRStarter.php @@ -6,10 +6,12 @@ use Temporal\Testing\Environment; use Temporal\Testing\SystemInfo; +use Temporal\Testing\Transcript\TranscriptStore; final class RRStarter { private Environment $environment; + public function __construct( private State $runtime, ?Environment $environment = null, @@ -18,15 +20,14 @@ public function __construct( \register_shutdown_function(fn() => $this->stop()); } - /** - * @param list $allowedTestClasses - */ - public function start(array $allowedTestClasses = []): void + public function start(): void { if ($this->environment->isRoadRunnerRunning()) { return; } + $allowedTestClasses = $this->runtime->allowedTestClasses; + $systemInfo = SystemInfo::detect(); $run = $this->runtime->command; @@ -62,8 +63,15 @@ public function start(array $allowedTestClasses = []): void $rrCommand[] = "tls.cert={$run->tlsCert}"; } + $envs = []; + $runId = \getenv('TEMPORAL_TRANSCRIPT_RUN_ID'); + if (\is_string($runId) && $runId !== '') { + $envs['TEMPORAL_TRANSCRIPT_RUN_ID'] = $runId; + } + $this->environment->startRoadRunner( rrCommand: $rrCommand, + envs: $envs, configFile: $this->runtime->rrConfigDir . DIRECTORY_SEPARATOR . '.rr.yaml', ); } diff --git a/tests/Acceptance/App/Runtime/State.php b/tests/Acceptance/App/Runtime/State.php index 1becbc3ee..9eea9d8aa 100644 --- a/tests/Acceptance/App/Runtime/State.php +++ b/tests/Acceptance/App/Runtime/State.php @@ -18,6 +18,13 @@ final class State /** @var non-empty-string */ public string $address; + /** + * @var list Test classes the worker pool was bootstrapped with. + * Honored by RRStarter on initial start AND on every restart, so failure-triggered + * restarts preserve the same selection instead of falling back to "register all". + */ + public array $allowedTestClasses = []; + /** * @param non-empty-string $rrConfigDir Dir with rr.yaml * @param non-empty-string $workDir Dir where tests are run diff --git a/tests/Acceptance/App/RuntimeBuilder.php b/tests/Acceptance/App/RuntimeBuilder.php index 7342616c7..64086f205 100644 --- a/tests/Acceptance/App/RuntimeBuilder.php +++ b/tests/Acceptance/App/RuntimeBuilder.php @@ -73,6 +73,7 @@ public static function createState( array $allowedTestClasses = [], ): State { $runtime = new State($command, \dirname(__DIR__), $workDir, $testCasesDir, $workers); + $runtime->allowedTestClasses = $allowedTestClasses; self::hydrateClasses($runtime, $allowedTestClasses); diff --git a/tests/Acceptance/App/TaskQueueResolver.php b/tests/Acceptance/App/TaskQueueResolver.php index fc12ecf91..399ab18fc 100644 --- a/tests/Acceptance/App/TaskQueueResolver.php +++ b/tests/Acceptance/App/TaskQueueResolver.php @@ -22,6 +22,7 @@ final class TaskQueueResolver \Temporal\Tests\Acceptance\Harness\Signal\Activities\ActivitiesTest::class, \Temporal\Tests\Acceptance\Extra\Versioning\Classic\ClassicTest::class, \Temporal\Tests\Acceptance\Extra\Versioning\Deployment\DeploymentTest::class, + \Temporal\Tests\Acceptance\Extra\Activity\ActivityPaused\ActivityPausedTest::class, ]; /** diff --git a/tests/Acceptance/App/TestCase.php b/tests/Acceptance/App/TestCase.php index 4acd00d62..f164575ca 100644 --- a/tests/Acceptance/App/TestCase.php +++ b/tests/Acceptance/App/TestCase.php @@ -22,6 +22,12 @@ use Temporal\Tests\Acceptance\App\Feature\WorkerFactory; use Temporal\Tests\Acceptance\App\Logger\ClientLogger; use Temporal\Tests\Acceptance\App\Logger\LoggerFactory; +use Temporal\Testing\Transcript\TranscriptLine; +use Temporal\Testing\Transcript\TranscriptSection; +use Temporal\Testing\Transcript\TranscriptStore; +use Temporal\Testing\Transcript\TranscriptWriter; +use Temporal\Testing\Transcript\WorkflowHistoryDumper; +use Temporal\Worker\Logger\StderrLogger; use Temporal\Tests\Acceptance\App\Runtime\ContainerFacade; use Temporal\Tests\Acceptance\App\Runtime\Feature; use Temporal\Tests\Acceptance\App\Runtime\RRStarter; @@ -50,6 +56,7 @@ protected function runTest(): mixed LoggerInterface::class => ClientLogger::class, ClientLogger::class => $logger, ]; + $workflowClient = $container->get(WorkflowClientInterface::class); // Auto-inject plugin-configured client from #[Worker(plugins: [...])] attribute $workerAttr = WorkerFactory::findAttribute(static::class); @@ -57,9 +64,8 @@ protected function runTest(): mixed $pluginRegistry = new PluginRegistry($workerAttr->plugins); $clientPlugins = $pluginRegistry->getPlugins(ClientPluginInterface::class); if ($clientPlugins !== []) { - $existingClient = $container->get(WorkflowClientInterface::class); $pluginClient = WorkflowClient::create( - serviceClient: $existingClient->getServiceClient(), + serviceClient: $workflowClient->getServiceClient(), options: (new ClientOptions())->withNamespace($runtime->namespace), pluginRegistry: new PluginRegistry($workerAttr->plugins), ); @@ -69,14 +75,28 @@ protected function runTest(): mixed return $container->runScope( new Scope(name: 'feature', bindings: $bindings), - function (Container $container): mixed { - $reflection = new \ReflectionMethod($this, $this->name()); - $args = $container->resolveArguments($reflection); - $this->setDependencyInput($args); + function (Container $container) use ($workflowClient): mixed { + $args = []; + $caughtException = null; + $startedAt = \microtime(true); + + $transcript = $container->has(TranscriptWriter::class) + ? $container->get(TranscriptWriter::class) + : null; + $dumper = new WorkflowHistoryDumper(); + $transcript?->writeTestBoundary(TranscriptSection::TEST_START, [ + 'class' => static::class, + 'method' => $this->name(), + ]); try { + $reflection = new \ReflectionMethod($this, $this->name()); + $args = $container->resolveArguments($reflection); + $this->setDependencyInput($args); + return parent::runTest(); } catch (\Throwable $e) { + $caughtException = $e; if ($e instanceof TemporalException) { echo \sprintf( "\n=== En error occurred while testing %s: %s (%s) ===\n", @@ -87,19 +107,11 @@ function (Container $container): mixed { echo "\n=== Stack trace ===\n"; echo $e->getTraceAsString(); echo "\n=== Workflow history ===\n"; - $this->printWorkflowHistory($container->get(WorkflowClientInterface::class), $args); - - $logRecords = $container->get(ClientLogger::class)->getRecords(); - if ($logRecords !== []) { - echo "\n=== Client log records ===\n"; - foreach ($logRecords as $record) { - echo \sprintf( - "[%s] %s%s\n", - $record->level, - $record->message, - \json_encode($record->context, JSON_UNESCAPED_UNICODE), - ); - } + $this->printWorkflowHistory($workflowClient, $args); + + $clientLogger = $container->get(ClientLogger::class); + foreach ($clientLogger->getRecords() as $record) { + $transcript?->writeLog($record->level, $record->message, $record->context); } echo "\n\n"; @@ -114,13 +126,63 @@ function (Container $container): mixed { throw $e; } finally { - // Cleanup: terminate injected workflow if any + if ($transcript !== null) { + $dumper->dump($transcript, $workflowClient, $args); + } foreach ($args as $arg) { if ($arg instanceof WorkflowStubInterface) { try { $arg->terminate('test-end'); } catch (\Throwable $e) { - // ignore + $transcript?->writeMeta('workflow_terminate_failed', [ + 'workflow_id' => $arg->getExecution()->getID(), + 'class' => $e::class, + 'message' => $e->getMessage(), + ]); + } + } + } + if ($transcript !== null) { + $status = match (true) { + $caughtException === null => 'passed', + $caughtException instanceof SkippedTest => 'skipped', + default => 'failed', + }; + $endAttributes = [ + 'class' => static::class, + 'method' => $this->name(), + 'status' => $status, + 'duration_ms' => (int) ((\microtime(true) - $startedAt) * 1000), + ]; + if ($caughtException !== null) { + $endAttributes['exception_class'] = $caughtException::class; + } + $transcript->writeTestBoundary(TranscriptSection::TEST_END, $endAttributes); + $transcript->flush(); + if ($caughtException !== null && !$caughtException instanceof SkippedTest) { + $stderr = $container->has(StderrLogger::class) + ? $container->get(StderrLogger::class) + : null; + $stderr?->error('transcript', ['path' => $transcript->getPath()]); + $runId = TranscriptStore::currentRunIdFromEnvironment(); + $store = TranscriptStore::create(stderr: $stderr); + $run = $runId === null ? $store->latestRun() : $store->findRun($runId); + if ($run !== null && $run->files() !== []) { + try { + $mergedPath = $run->merge(); + $stderr?->info("view merged transcript: less {$mergedPath}"); + if (self::shouldDumpTranscriptOnFail()) { + $content = @\file_get_contents($mergedPath); + if (\is_string($content) && $content !== '') { + $label = $runId !== null ? "transcript run {$runId}" : 'transcript'; + $stderr?->info("{$label} dump:\n" . $content); + } + } + } catch (\Throwable $mergeError) { + $stderr?->warning('transcript merge failed', [ + 'message' => $mergeError->getMessage(), + ]); + } } } } @@ -129,6 +191,24 @@ function (Container $container): mixed { ); } + private static function shouldDumpTranscriptOnFail(): bool + { + $flag = \getenv('TEMPORAL_TRANSCRIPT_DUMP_ON_FAIL'); + return \is_string($flag) && !\in_array(\strtolower($flag), ['', '0', 'false', 'off', 'no'], true); + } + + /** + * @return list + */ + protected function readCurrentTestTranscript(): array + { + $run = TranscriptStore::create()->currentRun(); + if ($run === null) { + return []; + } + return $run->reader()->linesForTest(static::class, $this->name()); + } + private function printWorkflowHistory(WorkflowClientInterface $workflowClient, array $args): void { foreach ($args as $arg) { @@ -140,10 +220,10 @@ private function printWorkflowHistory(WorkflowClientInterface $workflowClient, a ? 0 : $ts->getSeconds() + \round($ts->getNanos() / 1_000_000_000, 6); + $start = null; foreach ($workflowClient->getWorkflowHistory($arg->getExecution()) as $event) { $start ??= $fnTime($event->getEventTime()); echo "\n" . \str_pad((string) $event->getEventId(), 3, ' ', STR_PAD_LEFT) . ' '; - # Calculate delta time $deltaMs = \round(1_000 * ($fnTime($event->getEventTime()) - $start)); echo \str_pad(\number_format($deltaMs, 0, '.', "'"), 6, ' ', STR_PAD_LEFT) . 'ms '; echo \str_pad(EventType::name($event->getEventType()), 40, ' ', STR_PAD_RIGHT) . ' '; @@ -162,12 +242,10 @@ private function printWorkflowHistory(WorkflowClientInterface $workflowClient, a ?? $event->getWorkflowExecutionFailedEventAttributes()?->getFailure() ?? $event->getChildWorkflowExecutionFailedEventAttributes()?->getFailure() ?? $event->getNexusOperationCancelRequestFailedEventAttributes()?->getFailure(); - if ($failure === null) { continue; } - # Render failure echo "Failure:\n"; echo " ========== BEGIN ===========\n"; $this->renderFailure($failure, 1); @@ -185,7 +263,7 @@ private function renderFailure(Failure $failure, int $level): void echo $fnPad('Source: ' . $failure->getSource()) . "\n"; echo $fnPad('Info: ' . $failure->getFailureInfo()) . "\n"; echo $fnPad('Message: ' . $failure->getMessage()) . "\n"; - echo $fnPad("Stack trace:") . "\n"; + echo $fnPad('Stack trace:') . "\n"; echo $fnPad($failure->getStackTrace()) . "\n"; $previous = $failure->getCause(); if ($previous !== null) { diff --git a/tests/Acceptance/App/Transport/RecordingHost.php b/tests/Acceptance/App/Transport/RecordingHost.php new file mode 100644 index 000000000..ce2cd21fa --- /dev/null +++ b/tests/Acceptance/App/Transport/RecordingHost.php @@ -0,0 +1,73 @@ +record(fn() => $this->transcript->writeMeta('host_recording_started', [ + 'inner' => $inner::class, + 'pid' => \getmypid() ?: 0, + 'transcript_path' => $this->transcript->getPath(), + ])); + } + + public function waitBatch(): ?CommandBatch + { + try { + $batch = $this->inner->waitBatch(); + } catch (\Throwable $error) { + $this->record(fn() => $this->transcript->writeWireError($error)); + throw $error; + } + if ($batch === null) { + return null; + } + $this->inboundBatchId++; + $this->outboundSeq = 0; + $batchId = $this->inboundBatchId; + $this->record(fn() => $this->transcript->writeWireInbound($batch->messages, $batch->context, $batchId)); + return $batch; + } + + public function send(string $frame): void + { + $this->outboundSeq++; + $batchId = $this->inboundBatchId; + $sequence = $this->outboundSeq; + try { + $this->inner->send($frame); + } catch (\Throwable $error) { + $this->record(fn() => $this->transcript->writeWireError($error)); + throw $error; + } + $this->record(fn() => $this->transcript->writeWireOutbound($frame, $batchId, $sequence)); + } + + public function error(\Throwable $error): void + { + $this->record(fn() => $this->transcript->writeWireError($error)); + $this->inner->error($error); + } + + private function record(callable $write): void + { + try { + $write(); + } catch (\Throwable) { + } + } +} diff --git a/tests/Acceptance/ExecutionStartedSubscriber.php b/tests/Acceptance/ExecutionStartedSubscriber.php index e3ab349d0..56940a800 100644 --- a/tests/Acceptance/ExecutionStartedSubscriber.php +++ b/tests/Acceptance/ExecutionStartedSubscriber.php @@ -27,6 +27,8 @@ use Temporal\DataConverter\DataConverterInterface; use Temporal\Testing\Environment; use Temporal\Tests\Acceptance\App\Feature\WorkflowStubInjector; +use Temporal\Testing\Transcript\TranscriptStore; +use Temporal\Testing\Transcript\TranscriptWriter; use Temporal\Tests\Acceptance\App\Runtime\ContainerFacade; use Temporal\Tests\Acceptance\App\Runtime\RRStarter; use Temporal\Tests\Acceptance\App\Runtime\State; @@ -65,7 +67,10 @@ private function boot(ExecutionStarted $event): void } $logger = new StderrLogger(); - $logger->info('[selection] picked test classes after filtering', ['count' => \count($selectedTestClasses)]); + $logger->info('[selection] picked test classes after filtering', [ + 'count' => \count($selectedTestClasses), + 'classes' => $selectedTestClasses, + ]); RuntimeBuilder::init(); @@ -90,11 +95,20 @@ private function boot(ExecutionStarted $event): void $container->bindSingleton(State::class, $state); $container->bindSingleton(Environment::class, $environment); $container->bindSingleton(LoggerInterface::class, $logger); + $container->bindSingleton(StderrLogger::class, $logger); + + $runId = TranscriptStore::getOrCreateRunId(); + $logger->info('[transcript] run id', ['run_id' => $runId]); + + $transcriptStore = TranscriptStore::create(stderr: $logger); + $transcriptStore->pruneOldRuns(keep: 20); + $testTranscript = $transcriptStore->createWriter($runId, 'test'); + $container->bindSingleton(TranscriptWriter::class, $testTranscript); $temporalRunner = new TemporalStarter($environment); $rrRunner = new RRStarter($state, $environment); $temporalRunner->start(); - $rrRunner->start($selectedTestClasses); + $rrRunner->start(); $serviceClient = $state->command->tlsKey === null && $state->command->tlsCert === null ? ServiceClient::create($state->address) diff --git a/tests/Acceptance/Extra/Transcript/TranscriptHappyPathTest.php b/tests/Acceptance/Extra/Transcript/TranscriptHappyPathTest.php new file mode 100644 index 000000000..9312a9f37 --- /dev/null +++ b/tests/Acceptance/Extra/Transcript/TranscriptHappyPathTest.php @@ -0,0 +1,85 @@ +getResult('string'); + self::assertSame('hello-from-activity', $result); + + $lines = $this->readCurrentTestTranscript(); + self::assertNotEmpty($lines, 'No transcript lines were captured for this test'); + + $workflowStart = $this->findMeta($lines, 'workflow_execute_start'); + self::assertCount(1, $workflowStart, 'Expected exactly one workflow_execute_start META'); + self::assertSame('Extra_Transcript_TranscriptHappyPath_run', $workflowStart[0]->attributes['workflow_type']); + self::assertSame($stub->getExecution()->getID(), $workflowStart[0]->attributes['workflow_id']); + + $workflowCompleted = $this->findMeta($lines, 'workflow_execute_completed'); + self::assertCount(1, $workflowCompleted, 'Expected exactly one workflow_execute_completed META'); + + $activityStart = $this->findMeta($lines, 'activity_start'); + self::assertCount(1, $activityStart, 'Expected exactly one activity_start META'); + self::assertSame('Extra_Transcript_TranscriptHappyPath.greet', $activityStart[0]->attributes['name']); + self::assertSame(1, $activityStart[0]->attributes['attempt']); + + $activityCompleted = $this->findMeta($lines, 'activity_completed'); + self::assertCount(1, $activityCompleted, 'Expected exactly one activity_completed META'); + } + + /** + * @param list $lines + * @return list + */ + private function findMeta(array $lines, string $event): array + { + return \array_values(\array_filter( + $lines, + static fn(TranscriptLine $line): bool => $line->section === TranscriptSection::META + && ($line->attributes['event'] ?? null) === $event, + )); + } +} + +#[WorkflowInterface] +class HappyPathWorkflow +{ + #[WorkflowMethod(name: 'Extra_Transcript_TranscriptHappyPath_run')] + public function run(): \Generator + { + $activity = Workflow::newActivityStub( + HappyPathActivity::class, + ActivityOptions::new()->withScheduleToCloseTimeout(10), + ); + return yield $activity->greet(); + } +} + +#[ActivityInterface(prefix: 'Extra_Transcript_TranscriptHappyPath.')] +class HappyPathActivity +{ + #[ActivityMethod] + public function greet(): string + { + return 'hello-from-activity'; + } +} diff --git a/tests/Acceptance/Extra/Transcript/TranscriptRetryTest.php b/tests/Acceptance/Extra/Transcript/TranscriptRetryTest.php new file mode 100644 index 000000000..4d510ce82 --- /dev/null +++ b/tests/Acceptance/Extra/Transcript/TranscriptRetryTest.php @@ -0,0 +1,101 @@ +getResult('string'); + self::assertSame('eventually-ok', $result); + + $lines = $this->readCurrentTestTranscript(); + + $throwsByAttempt = []; + $messagesByAttempt = []; + foreach ($lines as $line) { + if ($line->section !== TranscriptSection::EXCEPTION) { + continue; + } + if (($line->attributes['phase'] ?? null) !== 'activity_throw') { + continue; + } + $attempt = (int) ($line->attributes['attempt'] ?? 0); + $throwsByAttempt[$attempt] = ($throwsByAttempt[$attempt] ?? 0) + 1; + $messagesByAttempt[$attempt] = (string) ($line->payload['message'] ?? ''); + } + self::assertSame(1, $throwsByAttempt[1] ?? 0, 'Exactly one activity_throw expected for attempt=1'); + self::assertSame(1, $throwsByAttempt[2] ?? 0, 'Exactly one activity_throw expected for attempt=2'); + self::assertArrayNotHasKey(3, $throwsByAttempt, 'Attempt 3 should succeed without throw'); + self::assertStringContainsString('boom-attempt-1', $messagesByAttempt[1] ?? ''); + self::assertStringContainsString('boom-attempt-2', $messagesByAttempt[2] ?? ''); + + $activityStartsByAttempt = []; + foreach ($lines as $line) { + if ($line->section !== TranscriptSection::META) { + continue; + } + if (($line->attributes['event'] ?? null) !== 'activity_start') { + continue; + } + $attempt = (int) ($line->attributes['attempt'] ?? 0); + $activityStartsByAttempt[$attempt] = ($activityStartsByAttempt[$attempt] ?? 0) + 1; + } + self::assertSame(1, $activityStartsByAttempt[1] ?? 0); + self::assertSame(1, $activityStartsByAttempt[2] ?? 0); + self::assertSame(1, $activityStartsByAttempt[3] ?? 0, 'Attempt 3 must record an activity_start'); + } +} + +#[WorkflowInterface] +class RetryWorkflow +{ + #[WorkflowMethod(name: 'Extra_Transcript_TranscriptRetry_run')] + public function run(): \Generator + { + $activity = Workflow::newActivityStub( + RetryActivity::class, + ActivityOptions::new() + ->withScheduleToCloseTimeout(30) + ->withRetryOptions(RetryOptions::new()->withMaximumAttempts(3)->withInitialInterval(1)), + ); + return yield $activity->flaky(); + } +} + +#[ActivityInterface(prefix: 'Extra_Transcript_TranscriptRetry.')] +class RetryActivity +{ + #[ActivityMethod] + public function flaky(): string + { + $attempt = Activity::getInfo()->attempt; + if ($attempt < 3) { + throw new ApplicationFailure( + "boom-attempt-{$attempt}", + 'TestFailure', + false, + ); + } + return 'eventually-ok'; + } +} diff --git a/tests/Acceptance/Extra/Transcript/TranscriptWorkflowFailureTest.php b/tests/Acceptance/Extra/Transcript/TranscriptWorkflowFailureTest.php new file mode 100644 index 000000000..30cd6acc5 --- /dev/null +++ b/tests/Acceptance/Extra/Transcript/TranscriptWorkflowFailureTest.php @@ -0,0 +1,59 @@ +getResult('string'); + } catch (\Throwable $exception) { + $thrown = $exception; + } + self::assertInstanceOf(WorkflowFailedException::class, $thrown); + + $lines = $this->readCurrentTestTranscript(); + + $executeMarkers = \array_values(\array_filter( + $lines, + static fn(TranscriptLine $line): bool => $line->section === TranscriptSection::META + && ($line->attributes['event'] ?? null) === 'workflow_execute_start', + )); + self::assertCount(1, $executeMarkers, 'Expected exactly one workflow_execute_start META'); + self::assertSame('Extra_Transcript_TranscriptWorkflowFailure_run', $executeMarkers[0]->attributes['workflow_type']); + self::assertSame($stub->getExecution()->getID(), $executeMarkers[0]->attributes['workflow_id']); + + $outbound = \array_values(\array_filter( + $lines, + static fn(TranscriptLine $line): bool => $line->section === TranscriptSection::WIRE_OUTBOUND, + )); + self::assertNotEmpty($outbound, 'Expected at least one WIRE_OUTBOUND frame from the worker'); + } +} + +#[WorkflowInterface] +class FailingWorkflow +{ + #[WorkflowMethod(name: 'Extra_Transcript_TranscriptWorkflowFailure_run')] + public function run(): \Generator + { + yield; + throw new ApplicationFailure('workflow-boom', 'TestWorkflowFailure', false); + } +} diff --git a/tests/Acceptance/transcript-merge.php b/tests/Acceptance/transcript-merge.php new file mode 100644 index 000000000..1b1d5f425 --- /dev/null +++ b/tests/Acceptance/transcript-merge.php @@ -0,0 +1,109 @@ +error('repeated flag', ['flag' => '--list']); + exit(2); + } + $listMode = true; + continue; + } + if ($arg === '--last' || $arg === 'last') { + if ($lastMode) { + $stderr->error('repeated flag', ['flag' => '--last']); + exit(2); + } + $lastMode = true; + continue; + } + if (\str_starts_with($arg, '-')) { + $stderr->error('unknown flag', ['flag' => $arg]); + exit(2); + } + if ($selector !== null) { + $stderr->error('only one positional selector accepted', ['previous' => $selector, 'new' => $arg]); + exit(2); + } + $selector = $arg; +} + +if ($listMode && $lastMode) { + $stderr->error('--list and --last are mutually exclusive'); + exit(2); +} +if ($listMode && $selector !== null) { + $stderr->error('--list does not accept a selector', ['selector' => $selector]); + exit(2); +} +if ($lastMode && $selector !== null) { + $stderr->error('--last does not accept a selector', ['selector' => $selector]); + exit(2); +} + +if ($listMode) { + exit(printRuns($store, $stderr)); +} + +try { + $run = $selector === null ? $store->latestRun() : $store->findRun($selector); +} catch (\InvalidArgumentException $invalidSelector) { + $stderr->error('invalid selector', ['selector' => $selector, 'message' => $invalidSelector->getMessage()]); + exit(2); +} +if ($run === null) { + $stderr->error( + $selector === null ? 'no transcript runs found' : 'transcript run not found', + ['base_directory' => $store->baseDirectory, 'selector' => $selector], + ); + $stderr->info('try `composer transcripts:list` to see known runs'); + exit(1); +} + +if ($run->files() === []) { + $stderr->error('no transcript files in run', ['directory' => $run->directory]); + exit(1); +} + +\fwrite(\STDOUT, $run->merge() . "\n"); +exit(0); + +function printRuns(TranscriptStore $store, StderrLogger $stderr): int +{ + $runs = $store->listRuns(); + if ($runs === []) { + $stderr->error('no transcript runs found', ['base_directory' => $store->baseDirectory]); + return 1; + } + \fwrite(\STDOUT, "Known transcript runs (newest first):\n"); + foreach ($runs as $run) { + \fwrite(\STDOUT, \sprintf( + " %s %s %d files %d bytes\n", + $run->id, + $run->mtime === null ? 'unknown' : \date('Y-m-d H:i:s', $run->mtime), + \count($run->files()), + $run->totalBytes(), + )); + } + return 0; +} diff --git a/tests/Acceptance/worker.php b/tests/Acceptance/worker.php index 4af53925c..08da5fa6d 100644 --- a/tests/Acceptance/worker.php +++ b/tests/Acceptance/worker.php @@ -23,21 +23,33 @@ use Temporal\DataConverter\ProtoConverter; use Temporal\DataConverter\ProtoJsonConverter; use Temporal\Internal\Support\StackRenderer; +use Temporal\Plugin\PluginRegistry; use Temporal\Testing\Command; +use Temporal\Testing\Transcript\TranscriptStore; +use Temporal\Testing\Transcript\TranscriptWriter; +use Temporal\Testing\Transcript\TranscriptPlugin; +use Temporal\Tests\Acceptance\App\Runtime\FatalHandler; use Temporal\Tests\Acceptance\App\Runtime\Feature; use Temporal\Tests\Acceptance\App\Runtime\State; use Temporal\Tests\Acceptance\App\RuntimeBuilder; use Temporal\Worker\Logger\StderrLogger; +use Temporal\Tests\Acceptance\App\Transport\RecordingHost; +use Temporal\Worker\Transport\RoadRunner; use Temporal\Worker\WorkerFactoryInterface; use Temporal\Worker\WorkerInterface; use Temporal\WorkerFactory; \chdir(__DIR__ . '/../..'); require './vendor/autoload.php'; + +$logger = new StderrLogger(); +$workerTranscript = TranscriptStore::create(stderr: $logger) + ->createWriter(TranscriptStore::currentRunIdOrOrphan(), 'worker'); +FatalHandler::register($workerTranscript, $logger); + RuntimeBuilder::init(); StackRenderer::addIgnoredPath(__FILE__); -$logger = new StderrLogger(); /** @var list $allowedTestClasses */ $allowedTestClasses = []; @@ -69,6 +81,8 @@ ); $run = $runtime->command; $container = new Spiral\Core\Container(); + $container->bindSingleton(TranscriptWriter::class, $workerTranscript); + $container->bindSingleton(LoggerInterface::class, $logger); $converters = [ new NullConverter(), @@ -82,9 +96,17 @@ } $converter = new DataConverter(...$converters); $container->bindSingleton(DataConverter::class, $converter); - $container->bindSingleton(WorkerFactoryInterface::class, WorkerFactory::create(converter: $converter)); - $workerFactory = $container->get(\Temporal\Tests\Acceptance\App\Feature\WorkerFactory::class); + $plugins = [new TranscriptPlugin($workerTranscript)]; + $container->bindSingleton( + WorkerFactoryInterface::class, + WorkerFactory::create( + converter: $converter, + pluginRegistry: new PluginRegistry($plugins), + ) + ); + + $workerFactory = $container->get(\Temporal\Tests\Acceptance\App\Feature\WorkerFactory::class); $getWorker = static function (Feature $feature) use (&$workers, $workerFactory): WorkerInterface { return $workers[$feature->taskQueue] ??= $workerFactory->createWorker($feature); }; @@ -119,7 +141,15 @@ $getWorker($feature)->registerActivityImplementations($container->make($activity)); } - $container->get(WorkerFactoryInterface::class)->run(); + $host = new RecordingHost(RoadRunner::create(), $workerTranscript); + $container->get(WorkerFactoryInterface::class)->run($host); } catch (\Throwable $e) { - td($e); + $workerTranscript->writeFatal($e); + $workerTranscript->flush(); + $logger->critical('fatal', [ + 'class' => $e::class, + 'message' => $e->getMessage(), + 'trace' => $e->getTraceAsString(), + ]); + exit(1); } diff --git a/tests/Fixtures/src/Activity/ExternalActivityFixturePaths.php b/tests/Fixtures/src/Activity/ExternalActivityFixturePaths.php new file mode 100644 index 000000000..a7d52f84d --- /dev/null +++ b/tests/Fixtures/src/Activity/ExternalActivityFixturePaths.php @@ -0,0 +1,20 @@ +taskToken); + file_put_contents(ExternalActivityFixturePaths::tokenPath(), Activity::getInfo()->taskToken); file_put_contents( - 'runtime/activityId', + ExternalActivityFixturePaths::idPath(), json_encode( [ 'id' => Activity::getInfo()->workflowExecution->getID(), diff --git a/tests/Functional/Client/ActivityCompletionClientTestCase.php b/tests/Functional/Client/ActivityCompletionClientTestCase.php index 122cbb8b4..02062e89c 100644 --- a/tests/Functional/Client/ActivityCompletionClientTestCase.php +++ b/tests/Functional/Client/ActivityCompletionClientTestCase.php @@ -18,6 +18,7 @@ use Temporal\Exception\Client\WorkflowFailedException; use Temporal\Exception\Failure\ActivityFailure; use Temporal\Exception\Failure\ApplicationFailure; +use Temporal\Tests\Activity\ExternalActivityFixturePaths; /** * @group client @@ -35,10 +36,10 @@ public function testCompleteAsyncActivityById() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(2); - $this->assertFileExists('runtime/activityId'); - $data = json_decode(file_get_contents('runtime/activityId')); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::idPath()); + $data = json_decode(file_get_contents(ExternalActivityFixturePaths::idPath())); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -57,10 +58,10 @@ public function testCompleteAsyncActivityByIdExplicit() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/activityId'); - $data = json_decode(file_get_contents('runtime/activityId')); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::idPath()); + $data = json_decode(file_get_contents(ExternalActivityFixturePaths::idPath())); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -79,10 +80,10 @@ public function testCompleteAsyncActivityByIdInvalid() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/activityId'); - $data = json_decode(file_get_contents('runtime/activityId')); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::idPath()); + $data = json_decode(file_get_contents(ExternalActivityFixturePaths::idPath())); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -105,10 +106,10 @@ public function testCompleteAsyncActivityByToken() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/taskToken'); - $taskToken = file_get_contents('runtime/taskToken'); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $taskToken = file_get_contents(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -127,11 +128,11 @@ public function testCompleteAsyncActivityByTokenInvalid() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/taskToken'); - $taskToken = file_get_contents('runtime/taskToken'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $taskToken = file_get_contents(ExternalActivityFixturePaths::tokenPath()); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -154,10 +155,10 @@ public function testCompleteAsyncActivityByTokenExceptionally() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/taskToken'); - $taskToken = file_get_contents('runtime/taskToken'); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $taskToken = file_get_contents(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -185,10 +186,10 @@ public function testCompleteAsyncActivityByTokenExceptionallyById() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(2); - $this->assertFileExists('runtime/taskToken'); - $data = json_decode(file_get_contents('runtime/activityId')); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $data = json_decode(file_get_contents(ExternalActivityFixturePaths::idPath())); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -222,10 +223,10 @@ public function testHeartBeatByID() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/taskToken'); - $data = json_decode(file_get_contents('runtime/activityId')); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $data = json_decode(file_get_contents(ExternalActivityFixturePaths::idPath())); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); @@ -269,10 +270,10 @@ public function testHeartBeatByToken() $this->assertNotEmpty($e->getExecution()->getRunID()); sleep(1); - $this->assertFileExists('runtime/taskToken'); - $taskToken = file_get_contents('runtime/taskToken'); - unlink('runtime/taskToken'); - unlink('runtime/activityId'); + $this->assertFileExists(ExternalActivityFixturePaths::tokenPath()); + $taskToken = file_get_contents(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::tokenPath()); + unlink(ExternalActivityFixturePaths::idPath()); $act = $client->newActivityCompletionClient(); diff --git a/tests/Unit/Logger/FatalHandlerTestCase.php b/tests/Unit/Logger/FatalHandlerTestCase.php new file mode 100644 index 000000000..51f20c3de --- /dev/null +++ b/tests/Unit/Logger/FatalHandlerTestCase.php @@ -0,0 +1,115 @@ +directory . '/fatal.log'; + $script = $this->buildFixtureScript($logFile, <<<'PHP' + trigger_error('intentional fatal', E_USER_ERROR); + PHP); + $this->executeFixture($script); + + $reader = new TranscriptReader($this->directory); + $fatal = $reader->findBySection(TranscriptSection::FATAL); + self::assertNotEmpty($fatal, $this->diagnostic('Expected a [FATAL] line', $logFile)); + self::assertStringContainsString('intentional fatal', (string) ($fatal[0]->payload['message'] ?? '')); + } + + public function testUncaughtErrorIsRecordedAsFatalViaExceptionHandler(): void + { + $logFile = $this->directory . '/uncaught.log'; + $script = $this->buildFixtureScript($logFile, <<<'PHP' + throw new \Error('uncaught fatal'); + PHP); + $this->executeFixture($script); + + $reader = new TranscriptReader($this->directory); + $fatal = $reader->findBySection(TranscriptSection::FATAL); + self::assertNotEmpty($fatal, $this->diagnostic('FATAL marker missing', $logFile)); + self::assertSame(\Error::class, $fatal[0]->attributes['class']); + self::assertSame('uncaught fatal', (string) $fatal[0]->payload['message']); + } + + public function testWritesPriorToFatalArePreserved(): void + { + $logFile = $this->directory . '/preserved.log'; + $script = $this->buildFixtureScript($logFile, <<<'PHP' + $writer->writeTestBoundary(\Temporal\Testing\Transcript\TranscriptSection::TEST_START, ['name' => 'pre-fatal']); + $writer->writeLog('info', 'about to die', []); + trigger_error('boom', E_USER_ERROR); + PHP); + $this->executeFixture($script); + + $reader = new TranscriptReader($this->directory); + $boundaries = $reader->findBySection(TranscriptSection::TEST_START); + $logs = $reader->findBySection(TranscriptSection::LOG); + $fatal = $reader->findBySection(TranscriptSection::FATAL); + self::assertNotEmpty($boundaries, $this->diagnostic('TEST_START not preserved across fatal', $logFile)); + self::assertNotEmpty($logs, $this->diagnostic('LOG not preserved across fatal', $logFile)); + self::assertNotEmpty($fatal, $this->diagnostic('FATAL marker missing', $logFile)); + } + + private function diagnostic(string $message, string $logFile): string + { + return $message + . "\nfixture stdout/stderr:\n" . $this->lastFixtureOutput() + . "\ntranscript content:\n" . @\file_get_contents($logFile); + } + + private function buildFixtureScript(string $logFile, #[Language("PHP")]string $body): string + { + $baseDir = \dirname(__DIR__, 3); + $autoloadPath = \var_export($baseDir . '/vendor/autoload.php', true); + $logFileExport = \var_export($logFile, true); + return << */ + private array $lastFixtureOutput = []; + + private function executeFixture(string $script): void + { + $scriptPath = $this->directory . '/fixture-' . \uniqid('', true) . '.php'; + \file_put_contents($scriptPath, $script); + $command = \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg($scriptPath) . ' 2>&1'; + \exec($command, $output, $exitCode); + $this->lastFixtureOutput = $output; + self::assertNotSame(0, $exitCode, 'Fixture process should exit non-zero on fatal; output: ' . \implode("\n", $output)); + } + + private function lastFixtureOutput(): string + { + return \implode("\n", $this->lastFixtureOutput); + } +} diff --git a/tests/Unit/Logger/TranscriptTestSupport.php b/tests/Unit/Logger/TranscriptTestSupport.php new file mode 100644 index 000000000..2f40c7ea8 --- /dev/null +++ b/tests/Unit/Logger/TranscriptTestSupport.php @@ -0,0 +1,49 @@ +filesystem = new Filesystem(); + $this->directory = $this->makeTempDirectory(); + $this->filesystem->mkdir($this->directory); + } + + #[\Override] + protected function tearDown(): void + { + try { + $this->filesystem->remove($this->directory); + } catch (IOException) { + } + } + + private function makeTempDirectory(): string + { + $shortName = \strtolower((new \ReflectionClass(static::class))->getShortName()); + return \sys_get_temp_dir() + . '/' . $shortName + . '-' . (\getmypid() ?: 0) + . '-' . \uniqid('', true); + } +} diff --git a/tests/Unit/Logger/TranscriptWriterTestCase.php b/tests/Unit/Logger/TranscriptWriterTestCase.php new file mode 100644 index 000000000..90c66e35b --- /dev/null +++ b/tests/Unit/Logger/TranscriptWriterTestCase.php @@ -0,0 +1,358 @@ +directory . '/log.log'); + $writer->writeLog('info', 'hello world', ['key' => 'value']); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $logs = $reader->findBySection(TranscriptSection::LOG); + self::assertCount(1, $logs); + self::assertSame('info', $logs[0]->attributes['level']); + self::assertSame('hello world', $logs[0]->attributes['message']); + self::assertSame(['key' => 'value'], $logs[0]->payload); + } + + public function testMultiLineContextIsEscapedOnOneLine(): void + { + $writer = new TranscriptWriter($this->directory . '/log.log'); + $writer->writeLog('warning', "line1\nline2\rline3", []); + $writer->flush(); + + $raw = \file_get_contents($writer->getPath()); + $bodyLines = \array_values(\array_filter(\explode("\n", $raw), static fn(string $line): bool => $line !== '')); + $logLine = null; + foreach ($bodyLines as $line) { + if (\str_contains($line, '"section":"LOG"')) { + $logLine = $line; + break; + } + } + self::assertNotNull($logLine); + self::assertStringNotContainsString("\n", $logLine); + self::assertStringContainsString('line1\\nline2\\rline3', $logLine); + } + + public function testWriteWireRoundTripsFrameBytes(): void + { + $writer = new TranscriptWriter($this->directory . '/wire.log'); + $frame = '{"command":"InvokeActivity","payloads":["abc"]}'; + $writer->writeWireInbound($frame, ['tickTime' => '2026-05-13'], 42); + $writer->writeWireOutbound($frame, 42, 1); + $writer->writeWireOutbound($frame, 42, 2); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $inbound = $reader->findBySection(TranscriptSection::WIRE_INBOUND); + $outbound = $reader->findBySection(TranscriptSection::WIRE_OUTBOUND); + self::assertCount(1, $inbound); + self::assertCount(2, $outbound); + self::assertSame(42, $inbound[0]->attributes['inbound_batch_id']); + self::assertSame(\strlen($frame), $inbound[0]->attributes['bytes']); + self::assertSame(['tickTime' => '2026-05-13'], $inbound[0]->payload['headers']); + self::assertSame(42, $outbound[0]->attributes['inbound_batch_id']); + self::assertSame(1, $outbound[0]->attributes['outbound_seq']); + self::assertSame(2, $outbound[1]->attributes['outbound_seq']); + $decoded = $inbound[0]->payload['body']['value'] ?? null; + self::assertIsArray($decoded); + self::assertSame('InvokeActivity', $decoded['command']); + } + + public function testWriteWireDecodesTemporalProtobufFrame(): void + { + $jsonPayload = (new Payload()) + ->setMetadata(['encoding' => 'json/plain']) + ->setData('{"hello":"world","n":7}'); + $nullPayload = (new Payload()) + ->setMetadata(['encoding' => 'binary/null']) + ->setData(''); + $headerPayload = (new Payload()) + ->setMetadata(['encoding' => 'json/plain']) + ->setData('"trace-id"'); + + $message = new Message(); + $message->setId(42); + $message->setCommand('StartWorkflow'); + $message->setOptions('{"name":"Demo","attempt":1}'); + $message->setPayloads((new Payloads())->setPayloads([$jsonPayload, $nullPayload])); + $message->setHeader((new Header())->setFields(['traceId' => $headerPayload])); + $message->setHistoryLength(3); + $message->setHistorySize(375); + $message->setRunId('run-1'); + $message->setTaskQueue('default'); + $message->setReplay(true); + + $frame = (new Frame())->setMessages([$message])->serializeToString(); + + $writer = new TranscriptWriter($this->directory . '/proto-wire.log'); + $writer->writeWireInbound($frame, ['tickTime' => '2026-05-26'], 7); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $inbound = $reader->findBySection(TranscriptSection::WIRE_INBOUND); + self::assertCount(1, $inbound); + self::assertSame(\strlen($frame), $inbound[0]->attributes['bytes']); + + $body = $inbound[0]->payload['body']; + self::assertSame('temporal-frame', $body['encoding']); + self::assertCount(1, $body['messages']); + $decoded = $body['messages'][0]; + + self::assertSame('42', $decoded['id']); + self::assertSame('StartWorkflow', $decoded['command']); + self::assertSame(['name' => 'Demo', 'attempt' => 1], $decoded['options']); + self::assertSame('3', $decoded['history_length']); + self::assertSame('375', $decoded['history_size']); + self::assertSame('run-1', $decoded['run_id']); + self::assertSame('default', $decoded['task_queue']); + self::assertTrue($decoded['replay']); + + self::assertCount(2, $decoded['payloads']); + self::assertSame(['hello' => 'world', 'n' => 7], $decoded['payloads'][0]); + self::assertNull($decoded['payloads'][1]); + + self::assertSame(['traceId' => 'trace-id'], $decoded['header']); + } + + public function testWriteWireDecodesFailureAndPreservesNonUtf8Payload(): void + { + $cause = (new Failure()) + ->setMessage('inner') + ->setSource('PHP_SDK') + ->setStackTrace("at foo\nat bar"); + $failure = (new Failure()) + ->setMessage('Should not be called') + ->setSource('PHP_SDK') + ->setStackTrace('#0 outer') + ->setCause($cause); + + $binaryPayload = (new Payload()) + ->setMetadata(['encoding' => 'binary/plain']) + ->setData("\x00\xff\x01\x02non-utf8"); + + $message = (new Message()) + ->setCommand('CompleteWorkflow') + ->setFailure($failure) + ->setPayloads((new Payloads())->setPayloads([$binaryPayload])); + + $frame = (new Frame())->setMessages([$message])->serializeToString(); + + $writer = new TranscriptWriter($this->directory . '/proto-failure.log'); + $writer->writeWireOutbound($frame, 1, 1); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $outbound = $reader->findBySection(TranscriptSection::WIRE_OUTBOUND); + self::assertCount(1, $outbound); + + $decoded = $outbound[0]->payload['body']['messages'][0]; + self::assertSame('CompleteWorkflow', $decoded['command']); + self::assertSame('Should not be called', $decoded['failure']['message']); + self::assertSame('PHP_SDK', $decoded['failure']['source']); + self::assertSame('#0 outer', $decoded['failure']['stack_trace']); + self::assertSame('inner', $decoded['failure']['cause']['message']); + self::assertArrayNotHasKey('cause', $decoded['failure']['cause']); + self::assertArrayNotHasKey('id', $decoded); + self::assertArrayNotHasKey('replay', $decoded); + + $fallback = $decoded['payloads'][0]; + self::assertSame(['encoding' => 'binary/plain'], $fallback['metadata']); + self::assertSame('base64', $fallback['data']['encoding']); + self::assertSame("\x00\xff\x01\x02non-utf8", \base64_decode($fallback['data']['value'])); + } + + public function testWriteWireFallsBackToBase64ForUnparseableBinaryFrame(): void + { + $frame = "\x00\xff\x01\xfa\xfb\xfc"; + $writer = new TranscriptWriter($this->directory . '/garbage.log'); + $writer->writeWireInbound($frame, [], 1); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $inbound = $reader->findBySection(TranscriptSection::WIRE_INBOUND); + self::assertCount(1, $inbound); + $body = $inbound[0]->payload['body']; + self::assertSame('raw', $body['encoding']); + self::assertSame($frame, \base64_decode($body['preview_base64'])); + } + + public function testWriteExceptionCarriesClassAndTrace(): void + { + $writer = new TranscriptWriter($this->directory . '/exc.log'); + $writer->writeException('activity_throw', ['attempt' => 2], new \RuntimeException('boom')); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $exceptions = $reader->findBySection(TranscriptSection::EXCEPTION); + self::assertCount(1, $exceptions); + self::assertSame('activity_throw', $exceptions[0]->attributes['phase']); + self::assertSame(2, $exceptions[0]->attributes['attempt']); + self::assertSame(\RuntimeException::class, $exceptions[0]->payload['class']); + self::assertSame('boom', $exceptions[0]->payload['message']); + self::assertNotSame('', $exceptions[0]->payload['trace']); + } + + public function testWriteFatalCarriesThrowableMetadata(): void + { + $writer = new TranscriptWriter($this->directory . '/fatal.log'); + $writer->writeFatal(new \Error('boom')); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $fatal = $reader->findBySection(TranscriptSection::FATAL); + self::assertCount(1, $fatal); + self::assertSame(\Error::class, $fatal[0]->attributes['class']); + self::assertSame('boom', $fatal[0]->payload['message']); + } + + public function testWriteHistoryEventSerializesProtoJson(): void + { + $writer = new TranscriptWriter($this->directory . '/history.log'); + $writer->writeHistoryEvent('wf-1', 'run-1', ['event_id' => 5, 'event_type' => 'ActivityTaskScheduled'], '{"event":"abc"}'); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $history = $reader->findBySection(TranscriptSection::HISTORY); + self::assertCount(1, $history); + self::assertSame('wf-1', $history[0]->attributes['workflow_id']); + self::assertSame(5, $history[0]->attributes['event_id']); + self::assertSame('ActivityTaskScheduled', $history[0]->attributes['event_type']); + self::assertSame('{"event":"abc"}', $history[0]->payload['attrs']); + } + + public function testEveryLineCarriesPidAndIsoTimestamp(): void + { + $writer = new TranscriptWriter($this->directory . '/all.log'); + $writer->writeLog('info', 'one', []); + $writer->writeMeta('event_two', ['k' => 'v']); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $lines = $reader->getLines(); + self::assertGreaterThanOrEqual(2, \count($lines)); + $processId = \getmypid(); + foreach ($lines as $line) { + self::assertSame($processId, $line->processId); + self::assertNotNull($line->timestamp); + } + } + + public function testConcurrentWritersUnderLockExProduceWellFormedLines(): void + { + $path = $this->directory . '/concurrent.log'; + $childCount = 4; + $writesPerChild = 50; + $baseDir = \dirname(__DIR__, 3); + $autoloadPath = \var_export($baseDir . '/vendor/autoload.php', true); + $childPaths = []; + /** @var list $processes */ + $processes = []; + for ($i = 0; $i < $childCount; $i++) { + $script = $this->directory . "/child-{$i}.php"; + \file_put_contents($script, <<writeLog('info', "child-{$i}-write-\$j", []); + } + \$writer->flush(); + PHP); + $childPaths[] = $script; + $process = \proc_open([\PHP_BINARY, $script], [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ], $pipes); + if (!\is_resource($process)) { + self::fail("proc_open failed for child {$i}"); + } + \fclose($pipes[0]); + \fclose($pipes[1]); + $processes[] = ['process' => $process, 'stderr' => $pipes[2], 'index' => $i]; + } + foreach ($processes as $entry) { + $stderr = \stream_get_contents($entry['stderr']); + \fclose($entry['stderr']); + $exitCode = \proc_close($entry['process']); + if ($exitCode !== 0) { + self::fail(\sprintf( + "child %d exited with %d; stderr:\n%s", + $entry['index'], + $exitCode, + (string) $stderr, + )); + } + } + foreach ($childPaths as $script) { + @\unlink($script); + } + $reader = new TranscriptReader($this->directory); + $logs = $reader->findBySection(TranscriptSection::LOG); + self::assertSame($childCount * $writesPerChild, \count($logs)); + foreach ($logs as $line) { + self::assertStringStartsWith('child-', (string) $line->attributes['message']); + } + } + + public function testReaderRejectsMalformedLine(): void + { + $path = $this->directory . '/bad.log'; + \file_put_contents($path, "this line does not match the schema\n"); + $reader = new TranscriptReader($this->directory); + + $this->expectException(MalformedTranscriptException::class); + $reader->getLines(); + } + + public function testReaderIncludesRotatedLogFiles(): void + { + $writer = new TranscriptWriter($this->directory . '/rotated.log'); + $writer->writeLog('info', 'pre-rotation', []); + $writer->flush(); + \rename($this->directory . '/rotated.log', $this->directory . '/rotated.log.1'); + + $writer = new TranscriptWriter($this->directory . '/rotated.log'); + $writer->writeLog('info', 'post-rotation', []); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $messages = \array_map( + static fn($line): string => (string) $line->attributes['message'], + $reader->findBySection(TranscriptSection::LOG), + ); + self::assertContains('pre-rotation', $messages); + self::assertContains('post-rotation', $messages); + } +} diff --git a/tests/Unit/Logger/WorkflowHistoryDumperTestCase.php b/tests/Unit/Logger/WorkflowHistoryDumperTestCase.php new file mode 100644 index 000000000..5069bc9b5 --- /dev/null +++ b/tests/Unit/Logger/WorkflowHistoryDumperTestCase.php @@ -0,0 +1,307 @@ +newWriter('empty.log'); + $client = $this->createMock(WorkflowClientInterface::class); + $client->expects(self::never())->method('getWorkflowHistory'); + + (new WorkflowHistoryDumper())->dump($writer, $client, []); + $writer->flush(); + + $meta = $this->readMeta(); + self::assertCount(1, $meta); + self::assertSame('history_skipped', $meta[0]->attributes['event']); + self::assertSame('no_executions_inspected', $meta[0]->attributes['reason']); + } + + public function testWritesHistorySkippedWhenNoStubsPresent(): void + { + $writer = $this->newWriter('nonstub.log'); + $client = $this->createMock(WorkflowClientInterface::class); + $client->expects(self::never())->method('getWorkflowHistory'); + + (new WorkflowHistoryDumper())->dump($writer, $client, ['just-a-string', 42, new \stdClass()]); + $writer->flush(); + + $meta = $this->readMeta(); + self::assertSame('history_skipped', $meta[0]->attributes['event']); + } + + public function testWritesHistoryEventsAndDumpedMetaForSingleExecution(): void + { + $writer = $this->newWriter('single.log'); + $execution = new WorkflowExecution('wf-1', 'run-1'); + $stub = $this->createMock(WorkflowStubInterface::class); + $stub->method('getExecution')->willReturn($execution); + + $events = [ + $this->newEvent(1, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, 1700000000, 123), + $this->newEvent(2, EventType::EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, 1700000001, 456), + ]; + $client = $this->createMock(WorkflowClientInterface::class); + $client->method('getWorkflowHistory')->willReturn($this->makeHistory($events)); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stub]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $history = $reader->findBySection(TranscriptSection::HISTORY); + self::assertCount(2, $history); + self::assertSame(1, $history[0]->attributes['event_id']); + self::assertSame('EVENT_TYPE_WORKFLOW_EXECUTION_STARTED', $history[0]->attributes['event_type']); + self::assertSame('wf-1', $history[0]->attributes['workflow_id']); + self::assertSame('run-1', $history[0]->attributes['run_id']); + self::assertSame('1700000000.123', $history[0]->attributes['event_time']); + self::assertSame(2, $history[1]->attributes['event_id']); + + $dumpedMetas = \array_values(\array_filter( + $reader->findBySection(TranscriptSection::META), + static fn(TranscriptLine $line): bool => ($line->attributes['event'] ?? null) === 'history_dumped', + )); + self::assertCount(1, $dumpedMetas); + self::assertSame('wf-1', $dumpedMetas[0]->attributes['workflow_id']); + self::assertSame(2, $dumpedMetas[0]->attributes['event_count']); + } + + public function testDeduplicatesExecutionsWithSameIdAndRunId(): void + { + $writer = $this->newWriter('dedup.log'); + $execution = new WorkflowExecution('wf-dup', 'run-1'); + $stubA = $this->createMock(WorkflowStubInterface::class); + $stubA->method('getExecution')->willReturn($execution); + $stubB = $this->createMock(WorkflowStubInterface::class); + $stubB->method('getExecution')->willReturn(new WorkflowExecution('wf-dup', 'run-1')); + + $client = $this->createMock(WorkflowClientInterface::class); + $client->expects(self::once()) + ->method('getWorkflowHistory') + ->willReturn($this->makeHistory([ + $this->newEvent(1, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED), + ])); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stubA, $stubB]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $dumpedMetas = \array_values(\array_filter( + $reader->findBySection(TranscriptSection::META), + static fn(TranscriptLine $line): bool => ($line->attributes['event'] ?? null) === 'history_dumped', + )); + self::assertCount(1, $dumpedMetas, 'Same execution id+runId should be dumped once'); + } + + public function testDumpsBothExecutionsWhenSameIdButDifferentRunId(): void + { + $writer = $this->newWriter('two-runs.log'); + $stubA = $this->createMock(WorkflowStubInterface::class); + $stubA->method('getExecution')->willReturn(new WorkflowExecution('wf-retry', 'run-1')); + $stubB = $this->createMock(WorkflowStubInterface::class); + $stubB->method('getExecution')->willReturn(new WorkflowExecution('wf-retry', 'run-2')); + + $client = $this->createMock(WorkflowClientInterface::class); + $client->expects(self::exactly(2)) + ->method('getWorkflowHistory') + ->willReturnOnConsecutiveCalls( + $this->makeHistory([$this->newEvent(1, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)]), + $this->makeHistory([$this->newEvent(1, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)]), + ); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stubA, $stubB]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $dumpedMetas = \array_values(\array_filter( + $reader->findBySection(TranscriptSection::META), + static fn(TranscriptLine $line): bool => ($line->attributes['event'] ?? null) === 'history_dumped', + )); + self::assertCount(2, $dumpedMetas); + $runIds = \array_map(static fn(TranscriptLine $line): mixed => $line->attributes['run_id'], $dumpedMetas); + self::assertEqualsCanonicalizing(['run-1', 'run-2'], $runIds); + } + + public function testWritesHistoryErrorWhenClientThrows(): void + { + $writer = $this->newWriter('err.log'); + $stub = $this->createMock(WorkflowStubInterface::class); + $stub->method('getExecution')->willReturn(new WorkflowExecution('wf-err', 'run-x')); + + $client = $this->createMock(WorkflowClientInterface::class); + $client->method('getWorkflowHistory')->willThrowException(new \RuntimeException('temporal-unreachable')); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stub]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $errors = $reader->findBySection(TranscriptSection::HISTORY_ERROR); + self::assertCount(1, $errors); + self::assertSame('wf-err', $errors[0]->attributes['workflow_id']); + self::assertSame(\RuntimeException::class, $errors[0]->attributes['class']); + self::assertSame('temporal-unreachable', $errors[0]->payload['message']); + + $dumped = \array_filter( + $reader->findBySection(TranscriptSection::META), + static fn(TranscriptLine $line): bool => ($line->attributes['event'] ?? null) === 'history_dumped', + ); + self::assertEmpty($dumped); + } + + public function testEnrichesHistoryEntriesWithDeltaMsAndFailureSummary(): void + { + $writer = $this->newWriter('enrich.log'); + $stub = $this->createMock(WorkflowStubInterface::class); + $stub->method('getExecution')->willReturn(new WorkflowExecution('wf-1', 'run-1')); + + $start = $this->newEvent(1, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, 1700000000, 0); + $failed = $this->newEvent(2, EventType::EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, 1700000000, 250_000_000); + $failed->setWorkflowExecutionFailedEventAttributes( + (new WorkflowExecutionFailedEventAttributes())->setFailure( + (new Failure()) + ->setMessage('Should not be called') + ->setSource('PHP_SDK') + ->setApplicationFailureInfo( + (new ApplicationFailureInfo())->setType('Exception'), + ), + ), + ); + + $client = $this->createMock(WorkflowClientInterface::class); + $client->method('getWorkflowHistory')->willReturn($this->makeHistory([$start, $failed])); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stub]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $history = $reader->findBySection(TranscriptSection::HISTORY); + self::assertCount(2, $history); + self::assertSame(0, $history[0]->attributes['delta_ms']); + self::assertSame(250, $history[1]->attributes['delta_ms']); + self::assertSame('application_failure_info', $history[1]->attributes['failure_kind']); + self::assertSame('Should not be called', $history[1]->attributes['failure_message']); + self::assertArrayNotHasKey('failure_kind', $history[0]->attributes); + self::assertArrayNotHasKey('cause', $history[0]->attributes); + } + + public function testRecordsCauseAttributeForChildWorkflowStartFailure(): void + { + $writer = $this->newWriter('cause.log'); + $stub = $this->createMock(WorkflowStubInterface::class); + $stub->method('getExecution')->willReturn(new WorkflowExecution('wf-c', 'run-c')); + + $event = $this->newEvent(7, EventType::EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED, 1700000000, 0); + $event->setStartChildWorkflowExecutionFailedEventAttributes( + (new StartChildWorkflowExecutionFailedEventAttributes()) + ->setCause(WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE), + ); + + $client = $this->createMock(WorkflowClientInterface::class); + $client->method('getWorkflowHistory')->willReturn($this->makeHistory([$event])); + + (new WorkflowHistoryDumper())->dump($writer, $client, [$stub]); + $writer->flush(); + + $reader = new TranscriptReader($this->directory); + $history = $reader->findBySection(TranscriptSection::HISTORY); + self::assertCount(1, $history); + self::assertArrayHasKey('cause', $history[0]->attributes); + self::assertSame( + (string) WorkflowTaskFailedCause::WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + (string) $history[0]->attributes['cause'], + ); + self::assertArrayNotHasKey('failure_kind', $history[0]->attributes); + } + + private function newWriter(string $name): TranscriptWriter + { + return new TranscriptWriter($this->directory . '/' . $name); + } + + /** + * @return list + */ + private function readMeta(): array + { + $reader = new TranscriptReader($this->directory); + return \array_values(\array_filter( + $reader->findBySection(TranscriptSection::META), + static fn(TranscriptLine $line): bool => ($line->attributes['event'] ?? null) !== 'writer_initialized', + )); + } + + private function newEvent(int $id, int $type, ?int $seconds = null, int $nanos = 0): HistoryEvent + { + $event = new HistoryEvent(); + $event->setEventId($id); + $event->setEventType($type); + if ($seconds !== null) { + $timestamp = new Timestamp(); + $timestamp->setSeconds($seconds); + $timestamp->setNanos($nanos); + $event->setEventTime($timestamp); + } + return $event; + } + + /** + * @param list $events + */ + private function makeHistory(array $events): WorkflowExecutionHistory + { + $history = new History(); + $history->setEvents($events); + $response = new GetWorkflowExecutionHistoryResponse(); + $response->setHistory($history); + $generator = (static function () use ($response): \Generator { + yield [$response]; + })(); + return new WorkflowExecutionHistory(Paginator::createFromGenerator($generator, null)); + } +} diff --git a/tests/Unit/Plugin/TranscriptPluginTestCase.php b/tests/Unit/Plugin/TranscriptPluginTestCase.php new file mode 100644 index 000000000..b5304987c --- /dev/null +++ b/tests/Unit/Plugin/TranscriptPluginTestCase.php @@ -0,0 +1,90 @@ +newWriter()); + + self::assertSame('temporal-php.transcript', $plugin->getName()); + } + + public function testConfigureWorkerAddsActivityAndWorkflowInterceptors(): void + { + $writer = $this->newWriter(); + $plugin = new TranscriptPlugin($writer); + $context = new WorkerPluginContext('test-queue', WorkerOptions::new()); + $nextCalled = false; + + $plugin->configureWorker($context, static function (WorkerPluginContext $received) use (&$nextCalled, $context): void { + $nextCalled = true; + self::assertSame($context, $received); + }); + + self::assertTrue($nextCalled, 'next callback must be invoked'); + $interceptors = $context->getInterceptors(); + self::assertCount(2, $interceptors); + self::assertInstanceOf(TranscriptActivityInterceptor::class, $interceptors[0]); + self::assertInstanceOf(TranscriptWorkflowInterceptor::class, $interceptors[1]); + } + + public function testConfigureWorkerAppendsInterceptorsWithoutClobberingExistingOnes(): void + { + $writer = $this->newWriter(); + $plugin = new TranscriptPlugin($writer); + $context = new WorkerPluginContext('test-queue', WorkerOptions::new()); + $existing = new TranscriptActivityInterceptor($writer); + $context->addInterceptor($existing); + + $plugin->configureWorker($context, static fn() => null); + + $interceptors = $context->getInterceptors(); + self::assertCount(3, $interceptors); + self::assertSame($existing, $interceptors[0]); + self::assertInstanceOf(TranscriptActivityInterceptor::class, $interceptors[1]); + self::assertInstanceOf(TranscriptWorkflowInterceptor::class, $interceptors[2]); + } + + public function testRegistryExposesPluginUnderWorkerPluginInterface(): void + { + $registry = new PluginRegistry(); + $plugin = new TranscriptPlugin($this->newWriter()); + $registry->add($plugin); + + $workerPlugins = $registry->getPlugins(WorkerPluginInterface::class); + self::assertCount(1, $workerPlugins); + self::assertSame($plugin, $workerPlugins[0]); + } + + private function newWriter(): TranscriptWriter + { + return new TranscriptWriter($this->directory . '/' . \uniqid('plugin-', true) . '.log'); + } +}