Skip to content

feat: add BrainLayer queue merge utility#283

Merged
EtanHey merged 2 commits into
mainfrom
fix/queue-merge-utility
May 15, 2026
Merged

feat: add BrainLayer queue merge utility#283
EtanHey merged 2 commits into
mainfrom
fix/queue-merge-utility

Conversation

@EtanHey
Copy link
Copy Markdown
Owner

@EtanHey EtanHey commented May 15, 2026

Summary

  • Adds scripts/merge_queue.py to union an AirDrop'd M1 Pro BrainLayer queue directory into the live M4 Max ~/.brainlayer/queue/ without mirroring, deleting, or overwriting.
  • Adds testable brainlayer.queue_merge.merge_queue_dirs() logic with byte-hash idempotency, exact-content skip reporting, and same-filename/different-content collision renames.
  • Covers dry-run behavior, idempotent re-runs, filename collisions, and non-JSONL skips with fixture tests.

Operator Usage

python3 scripts/merge_queue.py ~/Downloads/queue --dry-run
python3 scripts/merge_queue.py ~/Downloads/queue
launchctl kickstart -k gui/$(id -u)/com.brainlayer.drain

This PR does not run the actual M1→M4 queue merge; Etan has not AirDropped the source queue yet.

Test Plan

  • RED: pytest tests/test_queue_merge.py -q failed with ModuleNotFoundError before implementation.
  • GREEN: pytest tests/test_queue_merge.py -q → 4 passed.
  • ruff check src/brainlayer/queue_merge.py scripts/merge_queue.py tests/test_queue_merge.py → all checks passed.
  • Manual CLI smoke with temp dirs using python3 scripts/merge_queue.py verified dry-run, first merge, and idempotent second run.
  • ./scripts/run_tests.sh → BrainLayer test gate passed: 1843 passed, 9 skipped, 75 deselected, 1 xfailed; MCP registration 3 passed; isolated eval/hook routing 32 passed; bun 1 passed; regression shell passed.
  • Pre-push hook reran ./scripts/run_tests.sh and passed again with the same gate.

Notes

  • Local cr review --plain was attempted before commit but hung after setup for ~5 minutes and was killed; PR review bots are requested below.
  • Waiting for orc approval before merge.

Note

Medium Risk
Introduces new filesystem merge logic for the write-arbitration queue; while append-only and well-tested, mistakes could duplicate events or affect drain behavior during a live merge.

Overview
Adds an operator-facing CLI (scripts/merge_queue.py) to merge an AirDrop’d queue directory into the live BrainLayer queue, supporting --dry-run/--dest and printing per-file actions plus a post-merge launchctl hint.

Implements brainlayer.queue_merge.merge_queue_dirs() to append-only copy .jsonl events with SHA-256 deduping (skip byte-identical content across reruns or different filenames), deterministic collision renames (-merge-<hash>), and atomic temp-then-rename writes; non-.jsonl files are ignored and destination-file races during hash scanning are tolerated. Adds pytest coverage for dry-run, idempotency, collisions, non-JSONL skips, missing/equal dirs, dest creation, cross-filename dedupe, and drain-race handling.

Reviewed by Cursor Bugbot for commit 0d89b1e. Bugbot is set up for automated code reviews on this repo. Configure here.

Note

Add merge_queue_dirs utility to safely merge AirDrop'd queue directories into the live queue

  • Adds src/brainlayer/queue_merge.py with merge_queue_dirs, which unions a source queue directory into a destination, copying only .jsonl files and skipping exact duplicates by SHA-256 content hash.
  • Filename collisions (same name, different content) are resolved by renaming with a deterministic -merge-<hash> suffix; no files are overwritten or deleted.
  • Adds scripts/merge_queue.py as a CLI entrypoint with --dry-run support, printing a per-file action summary and a launchctl kickstart hint after a live merge.
  • Full test coverage in tests/test_queue_merge.py covers idempotency, deduplication, collision renaming, missing source/dest handling, and dry-run behaviour.

Macroscope summarized 0d89b1e.

Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

Warning

Rate limit exceeded

@EtanHey has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 47 minutes and 48 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: b4be12d3-ab10-40de-a850-bee22f978fa6

📥 Commits

Reviewing files that changed from the base of the PR and between 2cc24cc and 0d89b1e.

📒 Files selected for processing (3)
  • scripts/merge_queue.py
  • src/brainlayer/queue_merge.py
  • tests/test_queue_merge.py
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/queue-merge-utility

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 15, 2026

@cursor @BugBot review

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 15, 2026

@coderabbitai review

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 15, 2026

@codex review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@cursor
Copy link
Copy Markdown

cursor Bot commented May 15, 2026

I'll perform a thorough bugbot-style review of the queue merge utility implementation.

Code Review: Queue Merge Utility

Overall Assessment

The implementation is solid and well-designed. The code demonstrates careful attention to atomicity, idempotency, and edge cases. I found no critical bugs, but I have several recommendations for additional test coverage and minor improvements.


✅ Strengths

  1. Atomic writes (_copy_atomic): Using tmp file + replace pattern prevents partial writes - excellent!

  2. Hash-based deduplication: Using SHA256 content hashes instead of filenames for duplicate detection is the right approach for this use case.

  3. Collision handling: The deterministic collision naming (stem-merge-{hash[:12]}.jsonl) with counter fallback handles multiple edge cases correctly.

  4. Idempotency: The design ensures re-running the merge is safe:

    • Content hashes prevent duplicate copies
    • _collision_target checks for existing identical content
    • No destructive operations (no deletes, no overwrites)
  5. Clear separation of concerns: Core logic in queue_merge.py, CLI wrapper in scripts/, good module structure.


⚠️ Potential Issues & Recommendations

1. Missing Test Coverage

The current 4 tests are good but incomplete. Add these test cases:

def test_queue_merge_raises_on_nonexistent_source(tmp_path):
    """Verify NotADirectoryError when source doesn't exist."""
    source = tmp_path / "missing"
    dest = tmp_path / "dest"
    dest.mkdir()
    with pytest.raises(NotADirectoryError):
        merge_queue_dirs(source, dest)

def test_queue_merge_raises_when_source_equals_dest(tmp_path):
    """Verify ValueError when source == dest."""
    queue_dir = tmp_path / "queue"
    queue_dir.mkdir()
    with pytest.raises(ValueError, match="must be different"):
        merge_queue_dirs(queue_dir, queue_dir)

def test_queue_merge_creates_dest_if_missing(tmp_path):
    """Verify dest directory is created if it doesn't exist."""
    source = tmp_path / "source"
    dest = tmp_path / "dest"
    source.mkdir()
    _write_event(source / "a.jsonl", "content")
    
    result = merge_queue_dirs(source, dest)
    
    assert dest.exists()
    assert result.copied == ["a.jsonl"]

def test_queue_merge_deduplicates_across_different_filenames(tmp_path):
    """Verify same content with different names is deduplicated."""
    source = tmp_path / "source"
    dest = tmp_path / "dest"
    source.mkdir()
    dest.mkdir()
    _write_event(source / "a.jsonl", "same content")
    _write_event(source / "b.jsonl", "same content")
    
    result = merge_queue_dirs(source, dest)
    
    # First file copied, second skipped (same hash)
    assert len(result.copied) == 1
    assert len(result.skipped_exact) == 1

2. Memory Usage with Large Files

Lines 33, 78: path.read_bytes() loads entire files into memory. For queue files this is probably acceptable, but consider:

  • Adding a comment documenting this assumption
  • If queue files can grow to GBs, consider streaming hash calculation:
def _sha256_file(path: Path, chunk_size: int = 65536) -> str:
    """Stream-hash large files to avoid loading into memory."""
    h = hashlib.sha256()
    with path.open('rb') as f:
        while chunk := f.read(chunk_size):
            h.update(chunk)
    return h.hexdigest()

3. Redundant File Reads in Collision Loop

Lines 42-49: In _collision_target, the while loop reads candidate.read_bytes() on every iteration. If there are many collisions (unlikely but possible), this could be slow. Consider caching:

def _collision_target(...) -> Path:
    # ... existing code ...
    
    # Cache of path -> content we've already read
    seen_content: dict[Path, bytes] = {}
    
    if not target.exists() or target.read_bytes() == content:
        return target
    
    counter = 1
    while True:
        candidate = dest_dir / f"{stem}-merge-{content_hash[:12]}-{counter}{suffix}"
        if not candidate.exists():
            return candidate
        
        # Cache reads
        if candidate not in seen_content:
            seen_content[candidate] = candidate.read_bytes()
        
        if seen_content[candidate] == content:
            return candidate
        counter += 1

Impact: Low - collisions should be rare in practice. This is an optimization, not a correctness issue.

4. Race Condition if Drain Daemon is Running

Line 31-33: If the drain daemon deletes files while _read_existing_hashes is iterating, you could get a FileNotFoundError. Mitigation:

def _read_existing_hashes(queue_dir: Path) -> dict[str, str]:
    hashes: dict[str, str] = {}
    if not queue_dir.exists():
        return hashes
    for path in sorted(queue_dir.glob("*.jsonl")):
        if path.is_file():
            try:
                hashes[_sha256(path.read_bytes())] = path.name
            except FileNotFoundError:
                # File deleted by drain daemon, skip
                continue
    return hashes

Impact: Medium - The usage docs say to stop the drain daemon, but defensive code is better.

5. Unclear CLI Output for Collisions

Lines 44-47 in scripts/merge_queue.py: The output shows the original filename for collisions, but the actual copied file has a different name. Consider:

for orig_name, new_name in result.collision_renames:
    print(f"collision-renamed {orig_name} -> {new_name}")

This requires changing QueueMergeResult to track both names:

@dataclass
class QueueMergeResult:
    copied: list[str] = field(default_factory=list)
    skipped_exact: list[str] = field(default_factory=list)
    skipped_non_jsonl: list[str] = field(default_factory=list)
    collision_renames: list[tuple[str, str]] = field(default_factory=list)  # (original, renamed)

🔍 Edge Cases Verified (No Issues Found)

  1. ✅ Multiple files with same content → correctly deduplicated via hash
  2. ✅ Same filename, different content → correctly renamed with hash prefix
  3. ✅ Hash collision in first 12 chars → counter suffix handles it
  4. ✅ Re-running merge → correctly skipped via existing_hashes
  5. ✅ Dry-run mode → correctly doesn't write or update hashes
  6. ✅ Atomic writes → tmp file pattern is correct
  7. ✅ Source == dest validation → correctly raises ValueError

📋 Recommendations Summary

Must-fix before merge:

  • None - no blocking issues found

Should add (high value):

  1. Add 4 missing test cases (source validation, dest creation, deduplication)
  2. Add try/except for FileNotFoundError in _read_existing_hashes (race condition)

Nice to have (medium value):
3. Improve CLI output to show renamed collision filenames
4. Add docstring to _sha256 mentioning it loads full file
5. Consider streaming hash for future-proofing large files

Low priority:
6. Cache file reads in collision counter loop


Final Verdict

APPROVED

This is production-ready code with good design. The test coverage is adequate for the current use case, though the additional tests would improve confidence. The code is safe, atomic, and idempotent as designed.

The only change I'd strongly recommend before the actual M1→M4 merge is adding the FileNotFoundError handler in case the drain daemon wasn't stopped.

Open in Web Open in Cursor 

Comment thread src/brainlayer/queue_merge.py Outdated
Comment thread src/brainlayer/queue_merge.py
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@EtanHey
Copy link
Copy Markdown
Owner Author

EtanHey commented May 15, 2026

Review follow-up pushed in 0d89b1e. Changes: added drain-race FileNotFoundError handling during destination hash scan, added collision rename reporting as old -> new, and expanded queue-merge tests from 4 to 9 cases covering validation, destination creation, cross-filename dedupe, and drain-consumed files.\n\nVerification after the follow-up:\n- pytest tests/test_queue_merge.py -q -> 9 passed\n- ruff check src/brainlayer/queue_merge.py scripts/merge_queue.py tests/test_queue_merge.py -> all checks passed\n- ./scripts/run_tests.sh -> BrainLayer test gate passed: 1848 passed, 9 skipped, 75 deselected, 1 xfailed; MCP registration 3 passed; isolated eval/hook routing 32 passed; bun 1 passed; regression shell passed.\n- pre-push hook reran ./scripts/run_tests.sh and passed again.\n\n@cursor @BugBot re-review\n@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0d89b1e38e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +72 to +73
result = QueueMergeResult()
existing_hashes = _read_existing_hashes(dest_dir)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Acquire the drain lock while merging queues

When this is run against the live queue, drain_once() in src/brainlayer/drain.py serializes its glob/read/unlink work with an exclusive fcntl.flock on the queue directory, but the merge scans existing hashes and later writes files without taking that same lock. In the normal LaunchAgent-running environment, the drain can therefore consume destination files between _read_existing_hashes() and the later collision/copy decisions, so the merge can act on stale state despite BrainLayer's one-writer queue contract. Hold the same queue-dir lock around the merge scan and writes, or require/verify the drain is stopped before merging.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 0d89b1e. Configure here.


@property
def total_actions(self) -> int:
return len(self.copied) + len(self.skipped_exact) + len(self.skipped_non_jsonl)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused total_actions property never tested or called

Low Severity

The total_actions property on QueueMergeResult is defined but never used anywhere in the codebase — not in the CLI script scripts/merge_queue.py, not in the tests, and not by any other consumer. Grepping for total_actions returns only the definition itself. This is dead code that could silently rot without anyone noticing if the dataclass fields change.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 0d89b1e. Configure here.

@EtanHey EtanHey merged commit edcf281 into main May 15, 2026
7 checks passed
@EtanHey EtanHey deleted the fix/queue-merge-utility branch May 15, 2026 14:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant