Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions crawl4ai/deep_crawling/bff_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,21 @@ async def _arun_best_first(
if not batch:
continue

# Process the current batch of URLs.
# Process the current batch of URLs concurrently, but process the
# results in the original priority-queue order. arun_many streams
# results as requests finish, so discovering links immediately can
# make subsequent queue ordering depend on network timing.
urls = [item[2] for item in batch]
batch_config = config.clone(deep_crawl_strategy=None, stream=True)
stream_gen = await crawler.arun_many(urls=urls, config=batch_config)
results_by_url: Dict[str, CrawlResult] = {}
async for result in stream_gen:
result_url = result.url
# Find the corresponding tuple from the batch.
corresponding = next((item for item in batch if item[2] == result_url), None)
if not corresponding:
results_by_url[result.url] = result

for score, depth, url, parent_url in batch:
result = results_by_url.get(url)
if result is None:
continue
score, depth, url, parent_url = corresponding
result.metadata = result.metadata or {}
result.metadata["depth"] = depth
result.metadata["parent_url"] = parent_url
Expand All @@ -240,7 +244,7 @@ async def _arun_best_first(
if result.success:
# Discover new links from this result
new_links: List[Tuple[str, Optional[str]]] = []
await self.link_discovery(result, result_url, depth, visited, new_links, depths)
await self.link_discovery(result, url, depth, visited, new_links, depths)

for new_url, new_parent in new_links:
new_depth = depths.get(new_url, depth + 1)
Expand Down
31 changes: 31 additions & 0 deletions tests/deep_crawling/test_deep_crawl_resume.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,37 @@ async def test_resume_maintains_priority_order(self):
# So -0.9 should be crawled first
assert crawl_order[0] == "https://example.com/high-priority"

@pytest.mark.asyncio
async def test_stream_results_follow_priority_order_with_out_of_order_batch(self):
saved_state = {
"strategy_type": "best_first",
"visited": ["https://example.com"],
"queue_items": [
{"score": -0.9, "depth": 1, "url": "https://example.com/high", "parent_url": "https://example.com"},
{"score": -0.1, "depth": 1, "url": "https://example.com/low", "parent_url": "https://example.com"},
],
"depths": {"https://example.com": 0},
"pages_crawled": 1,
}

async def mock_arun_many(urls, config):
async def gen():
for url in reversed(urls):
result = MagicMock(url=url, success=True, metadata={})
result.links = {"internal": [], "external": []}
yield result
return gen()

strategy = BestFirstCrawlingStrategy(max_depth=2, max_pages=10, resume_state=saved_state)
mock_crawler = MagicMock(arun_many=mock_arun_many)
results = [
result.url
async for result in strategy._arun_stream("https://example.com", mock_crawler, create_mock_config(stream=True))
]

assert results[:2] == ["https://example.com/high", "https://example.com/low"]



class TestCrossStrategyResume:
"""Tests that apply to all strategies."""
Expand Down