diff --git a/crawl4ai/deep_crawling/bff_strategy.py b/crawl4ai/deep_crawling/bff_strategy.py index fdb962485..b8ead2622 100644 --- a/crawl4ai/deep_crawling/bff_strategy.py +++ b/crawl4ai/deep_crawling/bff_strategy.py @@ -210,17 +210,21 @@ async def _arun_best_first( if not batch: continue - # Process the current batch of URLs. + # Process the current batch of URLs concurrently, but process the + # results in the original priority-queue order. arun_many streams + # results as requests finish, so discovering links immediately can + # make subsequent queue ordering depend on network timing. urls = [item[2] for item in batch] batch_config = config.clone(deep_crawl_strategy=None, stream=True) stream_gen = await crawler.arun_many(urls=urls, config=batch_config) + results_by_url: Dict[str, CrawlResult] = {} async for result in stream_gen: - result_url = result.url - # Find the corresponding tuple from the batch. - corresponding = next((item for item in batch if item[2] == result_url), None) - if not corresponding: + results_by_url[result.url] = result + + for score, depth, url, parent_url in batch: + result = results_by_url.get(url) + if result is None: continue - score, depth, url, parent_url = corresponding result.metadata = result.metadata or {} result.metadata["depth"] = depth result.metadata["parent_url"] = parent_url @@ -240,7 +244,7 @@ async def _arun_best_first( if result.success: # Discover new links from this result new_links: List[Tuple[str, Optional[str]]] = [] - await self.link_discovery(result, result_url, depth, visited, new_links, depths) + await self.link_discovery(result, url, depth, visited, new_links, depths) for new_url, new_parent in new_links: new_depth = depths.get(new_url, depth + 1) diff --git a/tests/deep_crawling/test_deep_crawl_resume.py b/tests/deep_crawling/test_deep_crawl_resume.py index 4b41e0fec..51ea74036 100644 --- a/tests/deep_crawling/test_deep_crawl_resume.py +++ b/tests/deep_crawling/test_deep_crawl_resume.py @@ -507,6 +507,37 @@ async def test_resume_maintains_priority_order(self): # So -0.9 should be crawled first assert crawl_order[0] == "https://example.com/high-priority" + @pytest.mark.asyncio + async def test_stream_results_follow_priority_order_with_out_of_order_batch(self): + saved_state = { + "strategy_type": "best_first", + "visited": ["https://example.com"], + "queue_items": [ + {"score": -0.9, "depth": 1, "url": "https://example.com/high", "parent_url": "https://example.com"}, + {"score": -0.1, "depth": 1, "url": "https://example.com/low", "parent_url": "https://example.com"}, + ], + "depths": {"https://example.com": 0}, + "pages_crawled": 1, + } + + async def mock_arun_many(urls, config): + async def gen(): + for url in reversed(urls): + result = MagicMock(url=url, success=True, metadata={}) + result.links = {"internal": [], "external": []} + yield result + return gen() + + strategy = BestFirstCrawlingStrategy(max_depth=2, max_pages=10, resume_state=saved_state) + mock_crawler = MagicMock(arun_many=mock_arun_many) + results = [ + result.url + async for result in strategy._arun_stream("https://example.com", mock_crawler, create_mock_config(stream=True)) + ] + + assert results[:2] == ["https://example.com/high", "https://example.com/low"] + + class TestCrossStrategyResume: """Tests that apply to all strategies."""