feat: concurrent add pipeline with crash-safe mutation recovery#104
Open
gwokhou wants to merge 8 commits into
Open
feat: concurrent add pipeline with crash-safe mutation recovery#104gwokhou wants to merge 8 commits into
gwokhou wants to merge 8 commits into
Conversation
Add openkb/mutation.py, the transactional layer for KB mutations, plus tests. MutationSnapshot snapshots the target paths, journals the intent, and restores on failure across active/committed/rolled_back states (children restored before parents; self-cleans its backup dir on partial failure). snapshot_paths writes the active journal as the recovery signal; recover_pending_journals rolls back active journals and discards terminal ones. publish_staged_tree copies staged raw/source artifacts into place. Module and tests only — wired into the add path in the next commit.
Parallelize directory `add`: prepare files concurrently (hash, prefilter, staging, convert) while live-KB mutation stays serialized under the mutation lock. - Split add into prepare (file-local, into an isolated staging dir) and commit (under the lock): snapshot -> publish -> index -> compile -> registry write -> mark_committed, with snapshot rollback on failure. - convert_document gains assume_locked/staging_dir/doc_name_override so parallel prepares never write the live KB unlocked. - Reserve doc_names in scan order via HashRegistry.memory so same-stem files behave like serial adds. - New file_processing_jobs config (default 2).
Document file_processing_jobs: add it to the Settings yaml example and note (README Advanced options + config.yaml.example) that only file preparation is parallelized while live-KB mutation stays serialized, so raising it helps mainly when conversion is the bottleneck.
Add DEBUG-level timing logs across the add pipeline (lock_wait, prefilter, prepare, index, compile, commit) via _log_add_timing, gated behind isEnabledFor(DEBUG) so there is no cost when disabled. Surfaces where time goes during concurrent directory add.
Hash directory inputs up front and skip files whose hash is already in the registry before spawning prepare workers, so known duplicates no longer go through conversion and staging. Hashing runs across the jobs workers; files that fail to hash surface as per-file "failed" outcomes instead of aborting the batch.
recover_pending_journals was wired only into _kb_mutation_lock (the add path), so remove/recompile/lint/chat — which take kb_ingest_lock directly via _with_kb_lock — never drained. An `openkb add` that crashed mid-commit left an ACTIVE journal that an intervening `openkb remove` ignored and a later `openkb add` then rolled back, clobbering the remove's hashes.json edits and resurrecting the removed document. Move draining into kb_lock's first exclusive acquisition so every mutation entry point restores the KB to a known state before mutating. Delay-import mutation from locks to break the locks<->mutation cycle, and drop the now-redundant drain (plus its double-scan/double-log per file) from _kb_mutation_lock. Co-Authored-By: Claude <noreply@anthropic.com>
- test_exclusive_lock_drains_active_journal_before_yielding: regression guard for the lock-level drain — an ACTIVE journal left by a crashed add is rolled back the moment any exclusive lock is taken, not only on the add path. - test_add_directory_jobs_gt1_runs_real_pipeline: end-to-end exercise of the jobs>1 ThreadPoolExecutor branch (real prepare + real commit, only LLM compile mocked). Every prior jobs>1 test mocked both halves, so futures ordering, staging publish, registry writes, and cleanup were never run. Co-Authored-By: Claude <noreply@anthropic.com>
114abea to
a48d054
Compare
Cleanup pass over the concurrent-ingestion + mutation-journal feature. Behavior preserved (full suite: 820 passed). - mutation: stream _copy_file_atomic (mkstemp + copyfileobj + fsync + os.replace) instead of buffering whole files; route snapshot_paths file backups through it so every file copy in the module is atomic and streaming — large raw PDFs no longer spike peak memory. - converter: drop the dead ConvertResult.staging_dir field (only _PreparedAdd.staging_dir is ever read); split convert_document into a lock-acquiring wrapper + _convert_document_locked so the parallel prepare path calls the locked form directly, removing the assume_locked flag and its recursive self-call. - cli: hoist a module-level logger (drop four local rebinding sites); unify the jobs==1 and jobs>1 commit loops into one ThreadPoolExecutor path; document the defensive long-doc snapshot paths. Co-Authored-By: Claude <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Parallelize
openkb add <dir>and make every KB mutation crash-recoverable through a journaled snapshot/rollback layer.Why
Directory
addwas strictly serial — ingest time scaled linearly with file count and was dominated by conversion. Worse, a crash between convert and the registry write orphaned raw/source artifacts in the live KB with no way to recover or clean them up.Changes
ThreadPoolExecutor(file_processing_jobs, default 2); live-KB mutation — publish, PageIndex indexing, LLM compile, registry write, log append — stays under the mutation lock. Commits run in scan order for stablelog.md/ CLI output.openkb/mutation.py). Each commit snapshots the KB paths it will touch, journals the intent, and rolls back on failure.recover_pending_journalsrolls back anyactivejournal left by an interrupted run and discardscommitted/rolled_backones. SQLite sidecars (pageindex.db-wal/-shm/-journal) are snapshotted too, so long-doc failures don't leave sidecars newer than the db.remove/recompile/lint/chatalso restore the KB before mutating — not justadd. Prevents a crashedadd's journal from later clobbering an interveningremove.convert_documentwrites into an isolated staging dir published atomically at commit, so a crash can no longer orphan live-KB artifacts.Tests
test_mutation.py+test_add_command.pycover snapshot rollback, commit protection, partial-failure cleanup, lock-driven recovery, and the jobs>1 pipeline end-to-end (real prepare + commit, only LLM compile mocked). Full suite: 820 passed.🤖 Generated with Claude Code