Python package for in-memory database utilities supporting Redis, Memcached, Dragonfly, and Valkey. Provides both synchronous and asynchronous connection interfaces for high-performance caching and fast-access storage.
- Multiple Database Support: Redis, Memcached, Dragonfly, and Valkey
- Sync & Async APIs: Full support for both synchronous and asynchronous operations
- Auto-Detection: Automatic database type detection from connection URLs
- Redis Sentinel Support: High availability with Redis Sentinel configurations
- Memory Efficient: Uses
__slots__
for optimized memory usage - Environment Configuration: Easy setup via environment variables or
.env
files - Type Safety: Full type hints with
py.typed
marker
pip install mem-db-utils
Create a .env
file or set environment variables:
# Required
DB_URL=redis://localhost:6379/0
# Optional
DB_TYPE=redis # Auto-detected from URL if not specified
REDIS_CONNECTION_TYPE=direct # or 'sentinel'
REDIS_MASTER_SERVICE=mymaster # Required for Sentinel
DB_TIMEOUT=30 # Connection timeout in seconds
from mem_db_utils import MemDBConnector
# Initialize connector
connector = MemDBConnector()
# Connect to database
conn = connector.connect(db=0)
# Basic operations (Redis-compatible databases)
conn.ping() # Test connection
conn.set("key", "value") # Set a key
value = conn.get("key") # Get a key
conn.delete("key") # Delete a key
import asyncio
from mem_db_utils.asyncio import MemDBConnector
async def main():
# Initialize async connector
connector = MemDBConnector()
# Connect to database
conn = await connector.connect(db=0)
# Basic async operations (Redis-compatible databases)
await conn.ping() # Test connection
await conn.set("key", "value") # Set a key
value = await conn.get("key") # Get a key
await conn.delete("key") # Delete a key
# Always close async connections
await conn.aclose()
# Run async function
asyncio.run(main())
Database | URL Scheme | Sync Support | Async Support | Database Selection | Notes |
---|---|---|---|---|---|
Redis | redis:// |
âś… | âś… | âś… | Full featured |
Dragonfly | dragonfly:// |
âś… | âś… | âś… | Redis-compatible |
Valkey | valkey:// |
âś… | âś… | âś… | Redis-compatible |
Memcached | memcached:// |
✅ | ❌ | ❌ | Basic connection only |
DB_URL=redis://localhost:6379/0
DB_URL=redis://:password@localhost:6379/0
DB_URL=redis://username:password@localhost:6379/0
DB_URL=redis://sentinel-host:26379
REDIS_CONNECTION_TYPE=sentinel
REDIS_MASTER_SERVICE=mymaster
# Dragonfly
DB_URL=dragonfly://localhost:6380/0
# Valkey
DB_URL=valkey://localhost:6381/0
# Memcached
DB_URL=memcached://localhost:11211
Feature | Sync API | Async API |
---|---|---|
Import | from mem_db_utils import MemDBConnector |
from mem_db_utils.asyncio import MemDBConnector |
Connect | conn = connector.connect() |
conn = await connector.connect() |
Operations | conn.set("key", "value") |
await conn.set("key", "value") |
Close | Automatic | await conn.aclose() |
Concurrency | Thread-based | Native async/await |
Performance | Good for I/O blocking | Excellent for high concurrency |
from mem_db_utils import MemDBConnector
connector = MemDBConnector()
conn = connector.connect()
conn.set("key", "value")
result = conn.get("key")
import asyncio
from mem_db_utils.asyncio import MemDBConnector
async def main():
connector = MemDBConnector()
conn = await connector.connect()
await conn.set("key", "value")
result = await conn.get("key")
await conn.aclose() # Important: close async connections
asyncio.run(main())
- Import Change: Use
mem_db_utils.asyncio
for async connector - Async/Await: Add
await
to all connection operations - Function Definition: Use
async def
for functions using async connector - Connection Closing: Always call
await conn.aclose()
for async connections - Event Loop: Use
asyncio.run()
or existing event loop
import asyncio
from mem_db_utils.asyncio import MemDBConnector
async def worker(connector, worker_id):
"""Worker function for concurrent operations."""
conn = await connector.connect(db=0)
try:
# Perform operations
await conn.set(f"worker_{worker_id}", f"data_{worker_id}")
result = await conn.get(f"worker_{worker_id}")
await conn.delete(f"worker_{worker_id}")
return f"Worker {worker_id}: {result}"
finally:
await conn.aclose()
async def concurrent_example():
connector = MemDBConnector()
# Run 5 concurrent workers
tasks = [worker(connector, i) for i in range(5)]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(concurrent_example())
from mem_db_utils import MemDBConnector
# Override defaults via constructor
connector = MemDBConnector(
redis_type="sentinel", # Force Sentinel mode
master_service="my-master" # Custom master service name
)
# Connect with custom options
conn = connector.connect(
db=1, # Different database number
decode_response=False, # Get bytes instead of strings
timeout=60 # Custom timeout
)
from mem_db_utils import MemDBConnector
import redis.exceptions
try:
connector = MemDBConnector()
conn = connector.connect(db=0)
conn.ping()
except redis.exceptions.ConnectionError:
print("Failed to connect to database")
except redis.exceptions.ResponseError:
print("Invalid database operation")
except Exception as e:
print(f"Unexpected error: {e}")
import asyncio
from mem_db_utils.asyncio import MemDBConnector
import redis.exceptions
async def async_error_example():
connector = MemDBConnector()
conn = None
try:
conn = await connector.connect(db=0)
await conn.ping()
except redis.exceptions.ConnectionError:
print("Failed to connect to database")
except redis.exceptions.ResponseError:
print("Invalid database operation")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if conn:
await conn.aclose()
asyncio.run(async_error_example())
# Sync - Redis/Dragonfly/Valkey only
connector = MemDBConnector()
conn = connector.connect(db=2) # Connect to database 2
# Async - Redis/Dragonfly/Valkey only
async def connect_to_db():
connector = MemDBConnector()
conn = await connector.connect(db=2) # Connect to database 2
await conn.aclose()
# Test if package imports correctly
from mem_db_utils import MemDBConnector
from mem_db_utils.asyncio import MemDBConnector as AsyncMemDBConnector
print("Import successful!")
from mem_db_utils import MemDBConnector
try:
connector = MemDBConnector()
conn = connector.connect(db=0)
# Test Redis-compatible databases
if hasattr(conn, 'ping'):
result = conn.ping()
print(f"Connection successful: {result}")
else:
print("Connection established (Memcached)")
except Exception as e:
print(f"Connection failed: {e}")
import asyncio
from mem_db_utils.asyncio import MemDBConnector
async def test_async_connection():
try:
connector = MemDBConnector()
conn = await connector.connect(db=0)
# Test Redis-compatible databases
if hasattr(conn, 'ping'):
result = await conn.ping()
print(f"Async connection successful: {result}")
await conn.aclose()
else:
print("Async connection established")
except Exception as e:
print(f"Async connection failed: {e}")
asyncio.run(test_async_connection())
Use Sync API when:
- Simple scripts or applications with low concurrency
- Blocking I/O is acceptable
- Working within existing synchronous codebases
- Single-threaded applications
Use Async API when:
- High concurrency requirements (>100 concurrent operations)
- Non-blocking I/O is important
- Building async applications with FastAPI, aiohttp, etc.
- Maximum throughput is needed
Both connectors use __slots__
for memory efficiency, reducing per-instance memory usage by restricting attribute storage.
-
Import Error: Ensure
DB_URL
environment variable is setexport DB_URL=redis://localhost:6379/0
-
Connection Timeout: Increase timeout value
export DB_TIMEOUT=60
-
Async Connection Not Closed: Always call
await conn.aclose()
conn = await connector.connect() try: # operations finally: await conn.aclose()
-
Database Selection Not Supported: Only Redis-compatible databases support database selection
# Works: Redis, Dragonfly, Valkey conn = connector.connect(db=1) # Not supported: Memcached
The package automatically loads environment variables from .env
files using python-dotenv
. Ensure your .env
file is in the working directory or parent directories.
Contributions are welcome! Please ensure your code passes all tests and follows the existing code style.
# Install development dependencies
pip install -e .
pip install coverage pre-commit pytest pytest-cov pytest-dotenv ruff pytest-asyncio
# Run tests
python -m pytest tests/ -v
# Check linting
ruff check . && ruff format --check .
This project is licensed under the MIT License - see the LICENSE file for details.