Skip to content
Open
7 changes: 7 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

Changes in Version 4.18.0 (2026/XX/XX)
--------------------------------------
PyMongo 4.18 brings a number of changes including:

- Improved connection pool throughput under concurrent load by replacing the
single pool lock with fine-grained locks to reduce lock contention.

Changes in Version 4.17.0 (2026/04/20)
--------------------------------------

Expand Down
1 change: 1 addition & 0 deletions doc/contributors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,4 @@ The following is a list of people who have contributed to
- Steven Silvester (blink1073)
- Noah Stapp (NoahStapp)
- Cal Jacobson (cj81499)
- Sophia Yang (sophiayangDB)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay!! first pymongo change!!

159 changes: 97 additions & 62 deletions pymongo/asynchronous/pool.py

Large diffs are not rendered by default.

159 changes: 97 additions & 62 deletions pymongo/synchronous/pool.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion test/asynchronous/test_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, pool):
self.unlock = create_async_event()

async def lock_pool(self):
async with self.pool.lock:
async with self.pool._conns_lock:
self.locked.set()
# Wait for the unlock flag.
unlock_pool = await self.wait(self.unlock, 10)
Expand Down
2 changes: 1 addition & 1 deletion test/asynchronous/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async def _create_entity(self, entity_spec, uri=None):
while True:
if (time.monotonic() - t0) > spec["awaitMinPoolSizeMS"] * 1000:
raise ValueError("Test timed out during awaitMinPoolSize")
async with pool.lock:
async with pool._conns_lock:
if len(pool.conns) + pool.active_sockets >= pool.opts.min_pool_size:
break
await asyncio.sleep(0.1)
Expand Down
56 changes: 55 additions & 1 deletion test/performance/async_perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,12 @@

sys.path[0:0] = [""]

from test.asynchronous import AsyncPyMongoTestCase, async_client_context, unittest
from test.asynchronous import AsyncPyMongoTestCase, async_client_context, get_loop, unittest

from bson import encode
from gridfs import AsyncGridFSBucket
from pymongo import (
AsyncMongoClient,
DeleteOne,
InsertOne,
ReplaceOne,
Expand Down Expand Up @@ -98,6 +99,9 @@ def tearDownModule():
else:
print(output)

if getattr(async_client_context, "client", None):
get_loop().run_until_complete(async_client_context.client.close())


class Timer:
def __enter__(self):
Expand Down Expand Up @@ -297,6 +301,56 @@ async def do_task(self):
await asyncio.gather(*[find_one({"_id": _id}) for _id in self.inserted_ids])


class SmallReadTest(PerformanceTest):
dataset = "small_doc.json"
n_tasks = 1

async def asyncSetUp(self):
await super().asyncSetUp()
with open( # noqa: ASYNC101
os.path.join(TEST_PATH, os.path.join("single_and_multi_document", self.dataset))
) as data:
self.document = json.loads(data.read())

self.data_size = len(encode(self.document)) * NUM_DOCS

client_options = dict(async_client_context.client_options)
client_options["minPoolSize"] = self.n_tasks
self.client = AsyncMongoClient(**client_options)

await self.client.drop_database("perftest")
self.corpus = self.client.perftest.corpus
result = await self.corpus.insert_many([self.document.copy() for _ in range(NUM_DOCS)])
self.inserted_ids = result.inserted_ids

await self.client.admin.command("ping")
await self.corpus.find_one({"_id": self.inserted_ids[0]})

async def asyncTearDown(self):
try:
await super().asyncTearDown()
await self.client.drop_database("perftest")
finally:
await self.client.close()

async def before(self):
pass

async def after(self):
pass


class TestSmallReadFindOneByID(SmallReadTest, AsyncPyMongoTestCase):
async def do_task(self):
find_one = self.corpus.find_one
for _id in self.inserted_ids:
await find_one({"_id": _id})


class TestSmallReadFindOneByID8Tasks(TestSmallReadFindOneByID):
n_tasks = 8


class SmallDocInsertTest(TestDocument):
dataset = "small_doc.json"

Expand Down
71 changes: 71 additions & 0 deletions test/performance/perf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ def tearDownModule():
else:
print(output)

if getattr(client_context, "client", None):
client_context.client.close()


class Timer:
def __enter__(self):
Expand Down Expand Up @@ -394,6 +397,74 @@ class TestFindOneByID8Threads(TestFindOneByID):
n_threads = 8


class SmallReadTest(PerformanceTest):
dataset = "small_doc.json"
n_threads = 1

def setUp(self):
super().setUp()
with open(
os.path.join(TEST_PATH, os.path.join("single_and_multi_document", self.dataset))
) as data:
self.document = json.loads(data.read())

self.data_size = len(encode(self.document)) * NUM_DOCS

client_options = dict(client_context.client_options)
client_options["minPoolSize"] = self.n_threads
self.client = MongoClient(**client_options)

self.client.drop_database("perftest")
self.corpus = self.client.perftest.corpus
result = self.corpus.insert_many([self.document.copy() for _ in range(NUM_DOCS)])
self.inserted_ids = result.inserted_ids

self.client.admin.command("ping")
self.corpus.find_one({"_id": self.inserted_ids[0]})

def tearDown(self):
try:
super().tearDown()
self.client.drop_database("perftest")
finally:
self.client.close()

def before(self):
pass

def after(self):
pass


class TestSmallReadFindOneByID(SmallReadTest, unittest.TestCase):
def do_task(self):
find_one = self.corpus.find_one
for _id in self.inserted_ids:
find_one({"_id": _id})


class TestSmallReadFindOneByID8Threads(TestSmallReadFindOneByID):
n_threads = 8


class TestSmallReadFindOneByID8ThreadsPerClient(SmallReadTest, unittest.TestCase):
n_threads = 8

def do_task(self):
client_options = dict(client_context.client_options)
client = MongoClient(**client_options)
try:
corpus = client.perftest.corpus
client.admin.command("ping")
corpus.find_one({"_id": self.inserted_ids[0]})

find_one = corpus.find_one
for _id in self.inserted_ids:
find_one({"_id": _id})
finally:
client.close()


class SmallDocInsertTest(TestDocument):
dataset = "small_doc.json"

Expand Down
2 changes: 1 addition & 1 deletion test/test_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def __init__(self, pool):
self.unlock = create_event()

def lock_pool(self):
with self.pool.lock:
with self.pool._conns_lock:
self.locked.set()
# Wait for the unlock flag.
unlock_pool = self.wait(self.unlock, 10)
Expand Down
2 changes: 1 addition & 1 deletion test/unified_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def _create_entity(self, entity_spec, uri=None):
while True:
if (time.monotonic() - t0) > spec["awaitMinPoolSizeMS"] * 1000:
raise ValueError("Test timed out during awaitMinPoolSize")
with pool.lock:
with pool._conns_lock:
if len(pool.conns) + pool.active_sockets >= pool.opts.min_pool_size:
break
time.sleep(0.1)
Expand Down
Loading