Async Database Library

Spring JPA-inspired async ORM for Python. Declarative entities, generic repositories, transactions, and cache decorators — built on SQLAlchemy async.

Overview

viveka-kosha maps familiar Spring JPA concepts to idiomatic Python decorators:

Spring JPAviveka-kosha
@Entity + @Table@entity + @table
JpaRepository<T, ID>DbRepo[T]
@Repository@repository
@Transactional@transactional
@Cacheable@cacheable(key_prefix, ttl)
@CacheEvict@cache_evict(key_prefix)
OpenSessionInView filterDbMiddleware
ThreadLocal sessionContextVar session
Requires viveka-grantha: viveka-kosha depends on viveka-grantha for logging and cache management. Install viveka-grantha first, or it will be pulled in automatically via pip.

Installation

$ pip install viveka-kosha

Install with your database driver:

$ pip install "viveka-kosha[postgres]"
$ pip install "viveka-kosha[mysql]"

Add to config.ini:

[database]
url          = postgresql+asyncpg://user:pass@localhost/mydb
pool_size    = 10
max_overflow = 20
echo         = false

Quickstart

from fastapi import FastAPI
from sqlalchemy import Column, Integer, String
from viveka_db import (
    VivekaDbBase, DbRepo, DatabaseSessionFactory, DbMiddleware,
    entity, table, repository, transactional, cacheable, cache_evict
)
from viveka_common.config import ConfigService

# 1. Startup
config = ConfigService()
DatabaseSessionFactory(config_service=config)

# 2. App + middleware
app = FastAPI()
app.add_middleware(DbMiddleware, engine=DatabaseSessionFactory.get_instance().engine)

# 3. Entity
@entity
@table("users")
class User(VivekaDbBase):
    id    = Column(Integer, primary_key=True)
    name  = Column(String(100))
    email = Column(String(255), unique=True)

# 4. Repository
@repository
class UserRepo(DbRepo[User]):
    def __init__(self): super().__init__(User)

    @transactional
    @cacheable(key_prefix="user", ttl=3600)
    async def get(self, user_id: int):
        return await self.get_by_id(user_id)

    @transactional
    async def save(self, user: User) -> User:
        return (await self.insert_all([user]))[0]

@entity

Marks a class as a mapped database entity and registers it in the global entity registry. The class must inherit from VivekaDbBase.

from viveka_db import entity, table, VivekaDbBase
from sqlalchemy import Column, Integer, String, DateTime

@entity
@table("products")
class Product(VivekaDbBase):
    id          = Column(Integer, primary_key=True, autoincrement=True)
    name        = Column(String(200), nullable=False)
    description = Column(String(1000))
    price       = Column(Integer, nullable=False)
Always apply @entity before (above) @table. Python applies decorators bottom-up, so @table runs first to set __tablename__, then @entity registers the class.

@table(name) & VivekaDbBase

@table(name) sets __tablename__ and triggers SQLAlchemy's declarative mapping.

VivekaDbBase is the shared DeclarativeBase all entities must inherit from. This ensures all table definitions share one metadata object, allowing VivekaDbBase.metadata.create_all(engine) to create every table in dependency order.

# Create all tables at startup
async with engine.begin() as conn:
    await conn.run_sync(VivekaDbBase.metadata.create_all)

@repository

Marks a class for auto-discovery. No runtime behaviour — it's a marker analogous to Spring's @Repository.

@repository
class OrderRepo(DbRepo[Order]):
    def __init__(self): super().__init__(Order)

DbRepo[T]

Generic CRUD base class. Extend it and call super().__init__(YourModel).

All methods require @transactional on the calling method — they use self.db which is injected by the decorator.

Built-in Methods

MethodReturnsDescription
insert_all(entities)List[T]Bulk insert, flushes immediately
get_by_id(id)Optional[T]Fetch by primary key
get_all(skip, limit)List[T]Paginated fetch — defaults skip=0, limit=100
update(entity)TMerge entity state and flush
delete(id)boolDelete by primary key — True if found and deleted
delete_entity(entity)boolDelete a loaded entity instance
exists(id)boolCheck existence by primary key
count()intTotal row count for the entity table
find_by_field(field, value)Optional[T]First match on any field by name
find_all_by_field(field, value)List[T]All matches on any field by name

@transactional

Injects an AsyncSession into self.db before the method runs. Works in two modes automatically:

ContextBehaviour
HTTP request (via DbMiddleware)Reuses the existing request session. DbMiddleware commits or rolls back — @transactional does not.
Background task / async workerOpens a dedicated session, commits on success, rolls back on exception, closes when done. Logs a warning so async usage is visible.
@transactional
async def save_order(self, order: Order) -> Order:
    # self.db is an AsyncSession here
    result = await self.insert_all([order])
    return result[0]

@transactional
async def custom_query(self, status: str):
    # Drop into raw SQLAlchemy when needed
    result = await self.db.execute(
        select(Order).where(Order.status == status)
    )
    return result.scalars().all()

@cacheable(key_prefix, ttl)

Cache-aside read pattern. Checks the cache before calling the method. Stores the result on a miss. Cache key is {key_prefix}:{first_arg}.

@transactional
@cacheable(key_prefix="product", ttl=1800)
async def get_product(self, product_id: int):
    return await self.get_by_id(product_id)
Stack @transactional above @cacheable. The cache is checked first; if it hits, the DB session is never used. If it misses, the session is needed to fetch from DB.

@cache_evict(key_prefix)

Runs the method first, then deletes all cache keys matching {key_prefix}:*. Use after any write operation.

@transactional
@cache_evict(key_prefix="product")
async def update_product(self, product: Product) -> Product:
    return await self.update(product)

@transactional
@cache_evict(key_prefix="product")
async def remove_product(self, product_id: int) -> bool:
    return await self.delete(product_id)

DatabaseSessionFactory

Singleton async SQLAlchemy engine and session factory. Initialised once at startup.

from viveka_db import DatabaseSessionFactory
from viveka_common.config import ConfigService

# Initialize at startup
DatabaseSessionFactory(config_service=ConfigService())

# Access engine anywhere
engine = DatabaseSessionFactory.get_instance().engine

# Open a standalone session (for background tasks)
async with DatabaseSessionFactory.get_instance().open_session() as session:
    await session.execute(...)

DbMiddleware

Starlette middleware that manages the database session lifecycle for every HTTP request. Analogous to Spring's OpenSessionInViewFilter.

from fastapi import FastAPI
from viveka_db import DbMiddleware, DatabaseSessionFactory

app = FastAPI()
app.add_middleware(
    DbMiddleware,
    engine=DatabaseSessionFactory.get_instance().engine
)

Session Lifecycle

Understanding when sessions are created, committed, and closed:

HTTP Request path:
  DbMiddleware → creates AsyncSession → stores in ContextVar
      ↓
  Route → Service → @transactional method
              detects session in ContextVar, uses it directly
      ↓
  DbMiddleware → commit on success / rollback on exception / close


Background Task path:
  @transactional method
      no session in ContextVar
      → opens session via DatabaseSessionFactory.open_session()
      → commit on success / rollback on exception / close
      → logs a warning (task usage is intentionally visible)
← viveka-grantha Docs Back to Home →