viveka-kosha v0.1.0
Async Database Library
Spring JPA-inspired async ORM for Python. Declarative entities, generic repositories, transactions, and cache decorators — built on SQLAlchemy async.
Overview
viveka-kosha maps familiar Spring JPA concepts to idiomatic Python decorators:
| Spring JPA | viveka-kosha |
|---|---|
| @Entity + @Table | @entity + @table |
| JpaRepository<T, ID> | DbRepo[T] |
| @Repository | @repository |
| @Transactional | @transactional |
| @Cacheable | @cacheable(key_prefix, ttl) |
| @CacheEvict | @cache_evict(key_prefix) |
| OpenSessionInView filter | DbMiddleware |
| ThreadLocal session | ContextVar session |
Installation
Install with your database driver:
Add to config.ini:
[database]
url = postgresql+asyncpg://user:pass@localhost/mydb
pool_size = 10
max_overflow = 20
echo = false
Quickstart
from fastapi import FastAPI
from sqlalchemy import Column, Integer, String
from viveka_db import (
VivekaDbBase, DbRepo, DatabaseSessionFactory, DbMiddleware,
entity, table, repository, transactional, cacheable, cache_evict
)
from viveka_common.config import ConfigService
# 1. Startup
config = ConfigService()
DatabaseSessionFactory(config_service=config)
# 2. App + middleware
app = FastAPI()
app.add_middleware(DbMiddleware, engine=DatabaseSessionFactory.get_instance().engine)
# 3. Entity
@entity
@table("users")
class User(VivekaDbBase):
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(255), unique=True)
# 4. Repository
@repository
class UserRepo(DbRepo[User]):
def __init__(self): super().__init__(User)
@transactional
@cacheable(key_prefix="user", ttl=3600)
async def get(self, user_id: int):
return await self.get_by_id(user_id)
@transactional
async def save(self, user: User) -> User:
return (await self.insert_all([user]))[0]
@entity
Marks a class as a mapped database entity and registers it in the global entity registry. The class must inherit from VivekaDbBase.
from viveka_db import entity, table, VivekaDbBase
from sqlalchemy import Column, Integer, String, DateTime
@entity
@table("products")
class Product(VivekaDbBase):
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(200), nullable=False)
description = Column(String(1000))
price = Column(Integer, nullable=False)
@entity before (above) @table. Python applies decorators bottom-up, so @table runs first to set __tablename__, then @entity registers the class.@table(name) & VivekaDbBase
@table(name) sets __tablename__ and triggers SQLAlchemy's declarative mapping.
VivekaDbBase is the shared DeclarativeBase all entities must inherit from. This ensures all table definitions share one metadata object, allowing VivekaDbBase.metadata.create_all(engine) to create every table in dependency order.
# Create all tables at startup
async with engine.begin() as conn:
await conn.run_sync(VivekaDbBase.metadata.create_all)
@repository
Marks a class for auto-discovery. No runtime behaviour — it's a marker analogous to Spring's @Repository.
@repository
class OrderRepo(DbRepo[Order]):
def __init__(self): super().__init__(Order)
DbRepo[T]
Generic CRUD base class. Extend it and call super().__init__(YourModel).
All methods require @transactional on the calling method — they use self.db which is injected by the decorator.
Built-in Methods
| Method | Returns | Description |
|---|---|---|
| insert_all(entities) | List[T] | Bulk insert, flushes immediately |
| get_by_id(id) | Optional[T] | Fetch by primary key |
| get_all(skip, limit) | List[T] | Paginated fetch — defaults skip=0, limit=100 |
| update(entity) | T | Merge entity state and flush |
| delete(id) | bool | Delete by primary key — True if found and deleted |
| delete_entity(entity) | bool | Delete a loaded entity instance |
| exists(id) | bool | Check existence by primary key |
| count() | int | Total row count for the entity table |
| find_by_field(field, value) | Optional[T] | First match on any field by name |
| find_all_by_field(field, value) | List[T] | All matches on any field by name |
@transactional
Injects an AsyncSession into self.db before the method runs. Works in two modes automatically:
| Context | Behaviour |
|---|---|
| HTTP request (via DbMiddleware) | Reuses the existing request session. DbMiddleware commits or rolls back — @transactional does not. |
| Background task / async worker | Opens a dedicated session, commits on success, rolls back on exception, closes when done. Logs a warning so async usage is visible. |
@transactional
async def save_order(self, order: Order) -> Order:
# self.db is an AsyncSession here
result = await self.insert_all([order])
return result[0]
@transactional
async def custom_query(self, status: str):
# Drop into raw SQLAlchemy when needed
result = await self.db.execute(
select(Order).where(Order.status == status)
)
return result.scalars().all()
@cacheable(key_prefix, ttl)
Cache-aside read pattern. Checks the cache before calling the method. Stores the result on a miss. Cache key is {key_prefix}:{first_arg}.
@transactional
@cacheable(key_prefix="product", ttl=1800)
async def get_product(self, product_id: int):
return await self.get_by_id(product_id)
@transactional above @cacheable. The cache is checked first; if it hits, the DB session is never used. If it misses, the session is needed to fetch from DB.@cache_evict(key_prefix)
Runs the method first, then deletes all cache keys matching {key_prefix}:*. Use after any write operation.
@transactional
@cache_evict(key_prefix="product")
async def update_product(self, product: Product) -> Product:
return await self.update(product)
@transactional
@cache_evict(key_prefix="product")
async def remove_product(self, product_id: int) -> bool:
return await self.delete(product_id)
DatabaseSessionFactory
Singleton async SQLAlchemy engine and session factory. Initialised once at startup.
from viveka_db import DatabaseSessionFactory
from viveka_common.config import ConfigService
# Initialize at startup
DatabaseSessionFactory(config_service=ConfigService())
# Access engine anywhere
engine = DatabaseSessionFactory.get_instance().engine
# Open a standalone session (for background tasks)
async with DatabaseSessionFactory.get_instance().open_session() as session:
await session.execute(...)
DbMiddleware
Starlette middleware that manages the database session lifecycle for every HTTP request. Analogous to Spring's OpenSessionInViewFilter.
from fastapi import FastAPI
from viveka_db import DbMiddleware, DatabaseSessionFactory
app = FastAPI()
app.add_middleware(
DbMiddleware,
engine=DatabaseSessionFactory.get_instance().engine
)
Session Lifecycle
Understanding when sessions are created, committed, and closed:
HTTP Request path:
DbMiddleware → creates AsyncSession → stores in ContextVar
↓
Route → Service → @transactional method
detects session in ContextVar, uses it directly
↓
DbMiddleware → commit on success / rollback on exception / close
Background Task path:
@transactional method
no session in ContextVar
→ opens session via DatabaseSessionFactory.open_session()
→ commit on success / rollback on exception / close
→ logs a warning (task usage is intentionally visible)