- Introduction
- Installation
- Basic Concepts
- API Reference
- Query Operators
- Update Operators
- Aggregation Pipeline
- Indexing
- String Encryption
- Media Handling
- Thread Safety
- PyMongo Compatibility
- Performance Tips
- Examples
MainyDB is an embedded MongoDB-like database that stores all data in a single .mdb file. It provides a PyMongo-compatible API, allowing you to use MongoDB-style queries, updates, and aggregations in a lightweight, file-based database.
MainyDB is designed to be ultra-fast, production-ready, and a drop-in replacement for MongoDB in scenarios where a full MongoDB server is not needed or desired.
# Clone the repository
git clone https://github.com/dddevid/MainyDB.git
# Install dependencies
cd MainyDB
pip install -r requirements.txtMainyDB stores all data in a single .mdb file. This file contains:
- Database metadata
- Collection definitions
- Document data
- Indexes
Documents in MainyDB are JSON-like objects (Python dictionaries) with the following characteristics:
- Each document has a unique
_idfield (automatically generated if not provided) - Documents can contain nested objects and arrays
- Documents can store binary data (automatically encoded as base64)
Collections are groups of documents. They are similar to tables in relational databases but without a fixed schema.
A MainyDB instance can contain multiple databases, each with its own collections.
MainyDB(file_path, **kwargs)Creates or opens a MainyDB database.
Parameters:
file_path(str): Path to the .mdb filekwargs(dict, optional): Additional options
Methods:
list_collection_names(): Returns a list of collection names in the databasedrop_collection(name): Drops a collectionstats(): Returns database statisticsclose(): Closes the database connection
Properties:
<database_name>: Access a database by attribute
Example:
db = MainyDB("my_database.mdb")
my_app_db = db.my_app # Access the 'my_app' database
db.close() # Close the database when doneRepresents a database within MainyDB.
Methods:
list_collection_names(): Returns a list of collection names in the databasedrop_collection(name): Drops a collectioncreate_collection(name, options=None): Creates a new collectionstats(): Returns database statistics
Properties:
<collection_name>: Access a collection by attribute
Example:
db = MainyDB("my_database.mdb")
my_app = db.my_app # Get the 'my_app' database
# List collections
collections = my_app.list_collection_names()
# Create a collection
my_app.create_collection("users")
# Access a collection
users = my_app.usersRepresents a collection of documents.
create_collection(name, options=None): Creates a new collectiondrop(): Drops the collectionrenameCollection(newName): Renames the collectionstats(): Returns collection statisticscount_documents(query): Counts documents matching the querycreate_index(keys, **kwargs): Creates an index on the specified fields
insert_one(document): Inserts a single document- Returns:
InsertOneResultwithinserted_idproperty
- Returns:
insert_many(documents): Inserts multiple documents- Returns:
InsertManyResultwithinserted_idsproperty
- Returns:
find(query=None, projection=None): Finds documents matching the query- Returns:
Cursorobject
- Returns:
find_one(query=None, projection=None): Finds a single document- Returns: Document or
None
- Returns: Document or
update_one(filter, update, options=None): Updates a single document- Returns:
UpdateResultwithmatched_countandmodified_countproperties
- Returns:
update_many(filter, update, options=None): Updates multiple documents- Returns:
UpdateResultwithmatched_countandmodified_countproperties
- Returns:
replace_one(filter, replacement, options=None): Replaces a document- Returns:
UpdateResultwithmatched_countandmodified_countproperties
- Returns:
delete_one(filter): Deletes a single document- Returns:
DeleteResultwithdeleted_countproperty
- Returns:
delete_many(filter): Deletes multiple documents- Returns:
DeleteResultwithdeleted_countproperty
- Returns:
bulk_write(operations): Performs bulk write operations- Returns:
BulkWriteResultwith operation counts
- Returns:
distinct(field, query=None): Returns distinct values for a field- Returns: List of distinct values
aggregate(pipeline): Performs an aggregation pipeline- Returns:
Cursorobject with aggregation results
- Returns:
Example:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Insert a document
result = users.insert_one({"name": "John", "age": 30})
user_id = result.inserted_id
# Find documents
for user in users.find({"age": {"$gt": 25}}):
print(user["name"])
# Update a document
users.update_one({"_id": user_id}, {"$set": {"age": 31}})
# Delete a document
users.delete_one({"_id": user_id})The bulk_write() method allows you to perform multiple write operations in a single call, improving performance for batch operations.
Method:
bulk_write(operations, ordered=True): Executes multiple write operationsoperations: List of operation dictionariesordered: IfTrue, operations execute in order and stop on first error- Returns: Dict with
inserted_count,matched_count,modified_count,deleted_count,upserted_count, andupserted_ids
Supported Operations:
insert_one:{"insert_one": {"document": {...}}}update_one:{"update_one": {"filter": {...}, "update": {...}, "upsert": False}}update_many:{"update_many": {"filter": {...}, "update": {...}, "upsert": False}}replace_one:{"replace_one": {"filter": {...}, "replacement": {...}, "upsert": False}}delete_one:{"delete_one": {"filter": {...}}}delete_many:{"delete_many": {"filter": {...}}}
Example:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Perform bulk operations
result = users.bulk_write([
{"insert_one": {"document": {"name": "Alice", "age": 25}}},
{"insert_one": {"document": {"name": "Bob", "age": 30}}},
{"update_many": {"filter": {"age": {"$lt": 30}}, "update": {"$set": {"status": "young"}}}},
{"delete_one": {"filter": {"name": "OldUser"}}}
])
print(f"Inserted: {result['inserted_count']}")
print(f"Modified: {result['modified_count']}")
print(f"Deleted: {result['deleted_count']}")Methods:
-
drop(): Deletes the entire collection and all its documents- Returns:
Trueon success
- Returns:
-
rename(new_name): Renames the collectionnew_name: New name for the collection- Returns:
Trueon success
-
stats(): Returns collection statistics- Returns: Dict with
ns,count,size,avgObjSize,storageSize,nindexes,indexNames, andok
- Returns: Dict with
-
create_index(keys, **kwargs): Creates an index on specified fieldskeys: Field name (string), list of fields, or list of (field, direction) tuples- Returns: Index name
-
drop_index(index_name): Drops a specific indexindex_name: Name of the index to drop
-
drop_indexes(): Drops all indexes on the collection
Example:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Create indexes
idx1 = users.create_index("email")
idx2 = users.create_index([("age", 1), ("name", 1)])
# Get collection statistics
stats = users.stats()
print(f"Documents: {stats['count']}")
print(f"Storage: {stats['storageSize']} bytes")
print(f"Indexes: {stats['indexNames']}")
# Drop a specific index
users.drop_index(idx1)
# Rename collection
users.rename("users_backup")
# Drop collection
users.drop()Represents a database cursor for iterating over query results. Cursors support method chaining for building complex queries.
Methods:
sort(key_or_list, direction=None): Sorts the results- Single field:
sort("age", -1)orsort("age", 1) - Multiple fields:
sort([("age", -1), ("name", 1)])
- Single field:
skip(count): Skips the firstcountresultslimit(count): Limits the number of resultscount(): Returns the count of documents in the result setdistinct(field): Returns distinct values for a field in the result setto_list(): Converts cursor results to a list__iter__(): Allows iteration over the cursornext()/__next__(): Returns the next document
Example:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Method chaining
cursor = users.find({"age": {"$gt": 25}}).sort("age", -1).skip(10).limit(5)
# Iterate over results
for user in cursor:
print(user["name"])
# Get all results as a list
results = users.find({"status": "active"}).to_list()
# Get count without iterating
count = users.find({"age": {"$gt": 30}}).count()
# Get distinct values from cursor results
ages = users.find({"status": "active"}).distinct("age")Represents a unique identifier for documents. ObjectIds are automatically generated for documents that don't have an _id field.
Methods:
__init__(oid=None): Creates a new ObjectId- If
oidis None, generates a new UUID-based ID - Otherwise, uses the provided ID
- If
__str__(): Returns a string representation of the ObjectId__repr__(): Returns a detailed representation:ObjectId('...')__eq__(other): Compares two ObjectIds for equality__hash__(): Returns a hash value for use in sets and dicts
Example:
from MainyDB import ObjectId
# Create a new ObjectId (auto-generated)
obj_id = ObjectId()
print(obj_id) # e.g., "5f8d7e6b-5e4d-3c2b-1a09-8765432109ab"
# Create an ObjectId from a string
obj_id = ObjectId("5f8d7e6b-5e4d-3c2b-1a09-8765")
print(repr(obj_id)) # ObjectId('5f8d7e6b-5e4d-3c2b-1a09-8765')
# Compare ObjectIds
if obj_id1 == obj_id2:
print("The ObjectIds are equal")
# Use in sets (thanks to __hash__)
unique_ids = {ObjectId(), ObjectId(), ObjectId()}
# Use as dictionary keys
id_map = {obj_id: "user_data"}MainyDB supports all standard MongoDB query operators:
-
$eq: Equals{"age": {"$eq": 30}} # Equivalent to {"age": 30} -
$ne: Not equals{"age": {"$ne": 30}} # Age is not 30 -
$gt: Greater than{"age": {"$gt": 30}} # Age is greater than 30 -
$gte: Greater than or equal{"age": {"$gte": 30}} # Age is greater than or equal to 30 -
$lt: Less than{"age": {"$lt": 30}} # Age is less than 30 -
$lte: Less than or equal{"age": {"$lte": 30}} # Age is less than or equal to 30 -
$in: In array{"age": {"$in": [25, 30, 35]}} # Age is 25, 30, or 35 -
$nin: Not in array{"age": {"$nin": [25, 30, 35]}} # Age is not 25, 30, or 35
-
$and: Logical AND{"$and": [{"age": {"$gt": 25}}, {"name": "John"}]} # Age > 25 AND name is John -
$or: Logical OR{"$or": [{"age": {"$gt": 30}}, {"name": "John"}]} # Age > 30 OR name is John -
$not: Logical NOT{"age": {"$not": {"$gt": 30}}} # Age is not greater than 30 -
$nor: Logical NOR{"$nor": [{"age": 30}, {"name": "John"}]} # Age is not 30 AND name is not John
-
$all: All elements match{"tags": {"$all": ["mongodb", "database"]}} # Tags contains both "mongodb" and "database" -
$elemMatch: Element matches{"comments": {"$elemMatch": {"author": "John", "score": {"$gt": 5}}}} # Comments contains an element with author John and score > 5 -
$size: Array size{"tags": {"$size": 3}} # Tags array has exactly 3 elements
MainyDB supports all standard MongoDB update operators:
-
$set: Sets field values{"$set": {"age": 31, "updated": True}} # Set age to 31 and updated to True -
$unset: Removes fields{"$unset": {"temporary_field": ""}} # Remove temporary_field -
$inc: Increments field values{"$inc": {"age": 1, "count": 5}} # Increment age by 1 and count by 5 -
$mul: Multiplies field values{"$mul": {"price": 1.1}} # Multiply price by 1.1 (10% increase) -
$rename: Renames fields{"$rename": {"old_field": "new_field"}} # Rename old_field to new_field -
$min: Updates if value is less than current{"$min": {"lowest_score": 50}} # Set lowest_score to 50 if current value is greater than 50 -
$max: Updates if value is greater than current{"$max": {"highest_score": 95}} # Set highest_score to 95 if current value is less than 95 -
$currentDate: Sets field to current date{"$currentDate": {"last_updated": True}} # Set last_updated to current date
-
$push: Adds elements to arrays{"$push": {"tags": "new_tag"}} # Add "new_tag" to tags array -
$pop: Removes first or last element from arrays{"$pop": {"tags": 1}} # Remove last element from tags array {"$pop": {"tags": -1}} # Remove first element from tags array -
$pull: Removes elements from arrays{"$pull": {"tags": "old_tag"}} # Remove "old_tag" from tags array -
$pullAll: Removes all matching elements from arrays{"$pullAll": {"tags": ["tag1", "tag2"]}} # Remove "tag1" and "tag2" from tags array -
$addToSet: Adds elements to arrays if they don't exist{"$addToSet": {"tags": "unique_tag"}} # Add "unique_tag" to tags array if it doesn't exist
MainyDB supports MongoDB-style aggregation pipelines with the following stages:
-
$match: Filters documents{"$match": {"age": {"$gt": 30}}} # Match documents where age > 30 -
$project: Reshapes documents{"$project": {"name": 1, "age": 1, "_id": 0}} # Include only name and age fields, exclude _id -
$group: Groups documents{"$group": {"_id": "$city", "count": {"$sum": 1}, "avg_age": {"$avg": "$age"}}} # Group by city, count documents, and calculate average age -
$sort: Sorts documents{"$sort": {"age": -1}} # Sort by age in descending order -
$limit: Limits number of documents{"$limit": 10} # Limit to 10 documents -
$skip: Skips documents{"$skip": 10} # Skip the first 10 documents -
$unwind: Deconstructs arrays{"$unwind": "$tags"} # Create a document for each element in the tags array -
$addFields: Adds fields{"$addFields": {"full_name": {"$concat": ["$first_name", " ", "$last_name"]}}} # Add full_name field -
$lookup: Performs a left outer join{"$lookup": {"from": "comments", "localField": "_id", "foreignField": "post_id", "as": "comments"}} # Join with comments collection -
$count: Counts documents{"$count": "total"} # Count documents and store result in total field
Example:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Group users by city and calculate average age
result = users.aggregate([
{"$match": {"age": {"$gte": 18}}}, # Only include adults
{"$group": {
"_id": "$city",
"count": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}},
{"$sort": {"count": -1}} # Sort by count in descending order
])
for city_stats in result:
print(f"{city_stats['_id']}: {city_stats['count']} users, avg age: {city_stats['avg_age']:.1f}")MainyDB supports in-memory indexes for fast queries.
# Create a single-field index
users.create_index([("email", 1)]) # 1 for ascending, -1 for descending
# Create a compound index
users.create_index([("city", 1), ("age", -1)])-
unique: Ensures index keys are uniqueusers.create_index([("email", 1)], unique=True)
-
name: Custom name for the indexusers.create_index([("email", 1)], name="email_index")
MainyDB supports automatic encryption of string fields before storing them in the database. This feature provides two encryption methods:
- SHA256: One-way hashing (ideal for passwords and data that should never be decrypted)
- AES256: Reversible encryption (ideal for sensitive data like emails, SSN, credit cards)
To use encryption features, install the required dependency:
pip install pycryptodomeUse create_encryption_config() to define which fields should be encrypted:
from MainyDB import create_encryption_config, EncryptionManager
# Create encryption config
config = create_encryption_config(
sha256_fields=["password"], # One-way hash
aes256_fields=["email", "ssn"] # Reversible encryption
)The EncryptionManager handles all encryption/decryption operations:
# Create encryption manager with AES key
encryption_manager = EncryptionManager(
config,
aes_key="your-secret-key-here" # Required for AES256
)The encryption manager supports three methods for providing the AES key (in priority order):
-
Explicit parameter (recommended for production):
encryption_manager = EncryptionManager(config, aes_key="your-secret-key")
-
Environment variable:
export MAINYDB_ENCRYPTION_KEY="your-secret-key"
encryption_manager = EncryptionManager(config) # Uses env variable
-
Auto-generated (displays warning):
encryption_manager = EncryptionManager(config) # Generates random key
Apply encryption at the database level to encrypt all collections:
from MainyDB import MainyDB
from MainyDB.core import Database
db = MainyDB("app.mdb")
# Create encryption config
config = create_encryption_config(
sha256_fields=["password"],
aes256_fields=["email"]
)
encryption_manager = EncryptionManager(config, aes_key="secret-key")
# Create database with encryption
users_db = Database(db, "users", encryption_manager=encryption_manager)
users = users_db.create_collection("users")
# Insert user - password and email are automatically encrypted
users.insert_one({
"username": "john_doe",
"password": "secret123", # Will be hashed with SHA256
"email": "john@example.com" # Will be encrypted with AES256
})Apply encryption at the collection level for fine-grained control:
db = MainyDB("app.mdb")
users_db = Database(db, "users")
# Create encryption config for this collection only
config = create_encryption_config(sha256_fields=["password"])
encryption_manager = EncryptionManager(config, aes_key="secret-key")
# Create collection with encryption
users = users_db.create_collection("users", encryption_manager=encryption_manager)SHA256 creates a one-way hash that cannot be decrypted. Perfect for passwords:
config = create_encryption_config(sha256_fields=["password"])
encryption_manager = EncryptionManager(config)
users = users_db.create_collection("users", encryption_manager=encryption_manager)
# Insert user with password
users.insert_one({
"username": "alice",
"password": "mySecretPassword"
})
# Find user
user = users.find_one({"username": "alice"})
# user["password"] is now a dict: {"hash": "...", "salt": "...", "algorithm": "sha256"}
# Verify password
is_valid = encryption_manager.verify_sha256_field(
"password",
"mySecretPassword", # User input
user["password"] # Stored hash
)
# is_valid == TrueAES256 encrypts data that can be decrypted later. Perfect for sensitive data:
config = create_encryption_config(aes256_fields=["email", "ssn"])
encryption_manager = EncryptionManager(config, aes_key="secret-key")
users = users_db.create_collection("users", encryption_manager=encryption_manager)
# Insert user with sensitive data
users.insert_one({
"username": "bob",
"email": "bob@example.com",
"ssn": "123-45-6789"
})
# Find user - AES256 fields are automatically decrypted
user = users.find_one({"username": "bob"})
print(user["email"]) # "bob@example.com" (decrypted automatically)
print( user["ssn"]) # "123-45-6789" (decrypted automatically)Combine both SHA256 and AES256 for different fields:
config = create_encryption_config(
sha256_fields=["password", "security_answer"], # One-way
aes256_fields=["email", "phone", "address"] # Reversible
)
encryption_manager = EncryptionManager(config, aes_key="secret-key")
users = users_db.create_collection("users", encryption_manager=encryption_manager)
users.insert_one({
"username": "charlie",
"password": "secret", # SHA256 hashed
"email": "charlie@example.com", # AES256 encrypted
"phone": "+1-555-0100", # AES256 encrypted
"security_answer": "blue" # SHA256 hashed
})
# Find returns decrypted AES256 fields, SHA256 fields remain hashed
user = users.find_one({"username": "charlie"})
print(user["email"]) # "charlie@example.com" (decrypted)
print(user["password"]) # {"hash": "...", "salt": "...", "algorithm": "sha256"}Encrypted fields are automatically re-encrypted when updated:
# Update email (AES256 field)
users.update_one(
{"username": "bob"},
{"$set": {"email": "bob.new@example.com"}}
)
# Update password (SHA256 field)
users.update_one(
{"username": "alice"},
{"$set": {"password": "newPassword123"}}
)-
Key Storage: Never hardcode encryption keys in source code. Use environment variables or secure key management systems.
-
SHA256 for Passwords: Always use SHA256 for passwords and other data that should never be decrypted.
-
AES256 for Sensitive Data: Use AES256 for data you need to decrypt (emails, SSN, credit cards).
-
Key Rotation: Periodically rotate your AES encryption keys and re-encrypt data.
-
Secure Key Generation: Use strong, random keys:
import secrets import base64 key = base64.b64encode(secrets.token_bytes(32)).decode('utf-8')
-
Access Control: Limit access to encryption keys and encrypted databases.
- Encryption adds overhead to insert and update operations
- Decryption adds overhead to find operations
- SHA256 is faster than AES256 but cannot be reversed
- Consider indexing non-encrypted fields for query performance
All encryption operations are thread-safe and can be used in multi-threaded environments.
MainyDB can store and retrieve binary data like images and videos. Binary data is automatically encoded as base64 when stored and decoded when retrieved.
# Store an image
with open("image.jpg", "rb") as f:
image_data = f.read()
media = db.my_app.media
media.insert_one({
"name": "profile_pic.jpg",
"data": image_data # Automatically encoded as base64
})
# Retrieve the image
stored_media = media.find_one({"name": "profile_pic.jpg"})
image_data = stored_media["data"] # Automatically decoded from base64
# Save the image
with open("retrieved_image.jpg", "wb") as f:
f.write(image_data)MainyDB supports uploading and reading images in the following formats: .png, .jpg, .jpeg, .webp, .tiff, .heic, .gif. Binary data is stored as base64 and automatically decoded on read.
Beyond raw bytes, you can insert the image file path directly in the document. On insert/update, MainyDB reads the file, converts it to base64, and stores it.
# Insert via file path (direct upload)
images = db.my_app.images
doc = {
"_id": "sample1",
"filename": "avatar.png",
"image": "./assets/avatar.png" # image file path
}
images.insert_one(doc)
# Read: find_one immediately returns bytes
stored = images.find_one({"_id": "sample1"})
img_bytes = stored["image"] # image bytes
with open("avatar_copy.png", "wb") as f:
f.write(img_bytes)
# Read: find returns a lazy decoder for media fields
cur = images.find({"_id": "sample1"})
item = next(iter(cur))
decoder = item["image"] # call to obtain bytes
img_bytes_lazy = decoder()find_onereturns bytes directly for media fields.findreturns a decoder function for media fields (lazy), to reduce decoding overhead on large datasets.
update_one and update_many automatically apply media encoding:
# Update with bytes
new_bytes = b"\x89PNG..." # image bytes
images.update_one({"_id": "sample1"}, {"$set": {"image": new_bytes}})
# Update with file path
images.update_many({"filename": {"$eq": "avatar.png"}}, {"$set": {"image": "./assets/avatar.webp"}})
# Read verification
updated = images.find_one({"_id": "sample1"})
assert isinstance(updated["image"], (bytes, bytearray))MainyDB automatically caches decoded media for 2 hours to improve performance when the same media is accessed multiple times.
MainyDB is thread-safe and can be accessed from multiple threads. It uses locks to ensure that concurrent operations don't interfere with each other.
import threading
def update_counter(thread_id):
for i in range(1000):
# Atomic update operation
counters.update_one(
{"_id": "counter1"},
{"$inc": {"value": 1}}
)
# Create threads
threads = []
for i in range(10):
thread = threading.Thread(target=update_counter, args=(i,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()MainyDB can be used as a drop-in replacement for PyMongo:
from MainyDB import MongoClient
# Connect to a "server" (actually uses the file-based database)
client = MongoClient()
# Get a database
db = client.my_app
# Get a collection
users = db.users
# Use the same PyMongo API
users.insert_one({"name": "Jane Smith"})-
Use Indexes: Create indexes on fields that are frequently used in queries.
-
Limit Query Results: Use
limit()to restrict the number of documents returned. -
Project Only Needed Fields: Use projection to return only the fields you need.
-
Use Bulk Operations: Use
bulk_write()for multiple operations. -
Close the Database: Always call
close()when you're done to ensure data is properly saved.
See the examples directory for more detailed examples:
basic_usage.py: Basic CRUD operationsadvanced_usage.py: Advanced queries, aggregations, and concurrencyencryption_example.py: String encryption with SHA256 and AES256
MainyDB supports querying and updating nested fields using dot notation:
db = MainyDB("my_database.mdb")
users = db.my_app.users
# Insert document with nested structure
users.insert_one({
"name": "Alice",
"address": {
"street": "123 Main St",
"city": "New York",
"zip": "10001",
"coordinates": {"lat": 40.7128, "lng": -74.0060}
},
"contacts": {
"email": "alice@example.com",
"phone": "+1-555-0123"
}
})
# Query nested fields
user = users.find_one({"address.city": "New York"})
user = users.find_one({"address.coordinates.lat": {"$gt": 40}})
# Update nested fields
users.update_one(
{"name": "Alice"},
{"$set": {
"address.street": "456 Park Ave",
"contacts.phone": "+1-555-9999"
}}
)
# Increment nested numeric fields
users.update_one(
{"name": "Alice"},
{"$inc": {"address.coordinates.lat": 0.001}}
)db = MainyDB("my_database.mdb")
sales = db.analytics.sales
# Insert sample data
sales.insert_many([
{"product": "Laptop", "category": "Electronics", "price": 1200, "quantity": 5},
{"product": "Mouse", "category": "Electronics", "price": 25, "quantity": 50},
{"product": "Desk", "category": "Furniture", "price": 300, "quantity": 10},
{"product": "Chair", "category": "Furniture", "price": 150, "quantity": 20}
])
# Complex aggregation: Calculate revenue by category, sorted by total revenue
pipeline = [
{"$project": {
"category": 1,
"revenue": {"$multiply": ["$price", "$quantity"]}
}},
{"$group": {
"_id": "$category",
"total_revenue": {"$sum": "$revenue"},
"avg_revenue": {"$avg": "$revenue"},
"product_count": {"$sum": 1}
}},
{"$sort": {"total_revenue": -1}}
]
results = sales.aggregate(pipeline)
for category_stats in results:
print(f"{category_stats['_id']}: ${category_stats['total_revenue']:.2f}")db = MainyDB("my_database.mdb")
users = db.my_app.users
# Prepare bulk operations
operations = []
# Batch inserts
for i in range(100):
operations.append({
"insert_one": {
"document": {"name": f"User{i}", "index": i}
}
})
# Batch updates
for i in range(50):
operations.append({
"update_one": {
"filter": {"index": i},
"update": {"$set": {"status": "active"}}
}
})
# Batch deletes
for i in range(90, 100):
operations.append({
"delete_one": {
"filter": {"index": i}
}
})
# Execute all operations in one call
result = users.bulk_write(operations)
print(f"Inserted: {result['inserted_count']}, "
f"Modified: {result['modified_count']}, "
f"Deleted: {result['deleted_count']}")db = MainyDB("my_database.mdb")
products = db.shop.products
# Insert test data
products.insert_many([
{"name": "Product A", "category": "Electronics", "price": 100, "stock": 50},
{"name": "Product B", "category": "Books", "price": 20, "stock": 100},
# ... more products
])
# Create indexes for common queries
email_idx = products.create_index("email")
compound_idx = products.create_index([("category", 1), ("price", -1)])
# Get collection statistics
stats = products.stats()
print(f"Total documents: {stats['count']}")
print(f"Indexes: {stats['indexNames']}")
print(f"Average document size: {stats['avgObjSize']:.2f} bytes")
# Query with index (much faster for large collections)
electronics = products.find({"category": "Electronics", "price": {"$lt": 200}})
# Drop specific index when not needed
products.drop_index(email_idx)
# Drop all indexes
products.drop_indexes()from MainyDB import MainyDB, create_encryption_config, EncryptionManager
from MainyDB.core import Database
# Setup database with encryption
db = MainyDB("auth_system.mdb")
# Configure encryption for sensitive fields
config = create_encryption_config(
sha256_fields=["password"], # Passwords are hashed (cannot be decrypted)
aes256_fields=["email", "phone"] # PII is encrypted (can be decrypted)
)
encryption_manager = EncryptionManager(config, aes_key="your-secret-key-here")
# Create database with encryption
users_db = Database(db, "auth", encryption_manager=encryption_manager)
users = users_db.create_collection("users")
# Create indexes
users.create_index("username", unique=True)
# User registration
def register_user(username, password, email):
try:
result = users.insert_one({
"username": username,
"password": password, # Automatically hashed with SHA256
"email": email, # Automatically encrypted with AES256
"created_at": datetime.datetime.now(),
"active": True
})
return {"success": True, "user_id": str(result['inserted_id'])}
except Exception as e:
return {"success": False, "error": str(e)}
# User login
def login_user(username, password):
user = users.find_one({"username": username})
if not user:
return {"success": False, "error": "User not found"}
# Verify password (compares hash)
is_valid = encryption_manager.verify_sha256_field(
"password", password, user["password"]
)
if is_valid:
return {"success": True, "user": {
"username": user["username"],
"email": user["email"] # Automatically decrypted
}}
else:
return {"success": False, "error": "Invalid password"}
# Usage
register_user("alice", "secure_password123", "alice@example.com")
result = login_user("alice", "secure_password123")
print(result) # {"success": True, "user": {...}}db = MainyDB("analytics.mdb")
events = db.tracking.events
# Track user events
events.insert_many([
{"user_id": "user1", "event": "page_view", "page": "/home", "timestamp": datetime.datetime.now()},
{"user_id": "user1", "event": "click", "element": "buy_button", "timestamp": datetime.datetime.now()},
{"user_id": "user2", "event": "page_view", "page": "/product", "timestamp": datetime.datetime.now()},
])
# Create index for better query performance
events.create_index([("user_id", 1), ("timestamp", -1)])
# Generate daily report
pipeline = [
{"$match": {"event": "page_view"}},
{"$group": {
"_id": "$page",
"views": {"$sum": 1},
"unique_users": {"$addToSet": "$user_id"}
}},
{"$project": {
"page": "$_id",
"views": 1,
"unique_users": {"$size": "$unique_users"}
}},
{"$sort": {"views": -1}},
{"$limit": 10}
]
top_pages = events.aggregate(pipeline).to_list()
for page in top_pages:
print(f"{page['page']}: {page['views']} views from {page['unique_users']} users")