Progressive SQLModel extension with visual ER diagrams.
Start from ORM entities, extend with non-ORM relationships, and generate GraphQL for interaction. Use DefineSubset to declaratively construct response DTOs — field selection, relationship loading, and derived fields all driven by declarations, not imperative code. Visualize your entity graph and data flow through interactive ER diagrams.
flowchart LR
p1["P1: ER Diagram<br/>SQLModel entities + non-ORM relationships<br/>+ visual ER diagram<br/><hr>Focus: Business Modeling"]
--> p2["P2: GraphQL API<br/>@query / @mutation<br/>SDL auto-generation + DataLoader batching<br/><hr>Focus: Business Queries"]
--> p3["P3: Declarative Assembly<br/>DefineSubset DTOs + implicit auto-loading<br/>resolve_* / post_*<br/><hr>Focus: Use Case Construction"]
A model-driven development workflow — from LLM-assisted modeling to production API:
SQLModel Entities → GraphQL (validation) → FastAPI Endpoints → openapi.json → TypeScript SDK
GraphQL for validation, REST for delivery.
During early development, GraphQL's flexible queries let you rapidly verify entity relationships and data shapes. When the model stabilizes, auto-generated FastAPI endpoints produce a typed REST API (OpenAPI spec), which feeds directly into TypeScript SDK generation for frontend integration.
| Stage | What you get |
|---|---|
| Model & Validate | @query / @mutation on SQLModel → auto-generated GraphQL API with DataLoader |
| Build REST APIs | DefineSubset DTOs + ErManager → N+1-safe FastAPI endpoints |
| Deliver to Frontend | openapi.json → TypeScript SDK |
| AI Integration | MCP server exposing the same models to AI assistants |
Roadmap: Auto-generation of FastAPI endpoints from the same SQLModel definitions is in progress. Today, GraphQL auto-generation and Core API (manual DTO) modes are production-ready.
One command to start all demo services:
bash start_all.sh| Service | Port | URL |
|---|---|---|
| demo GraphQL | 8000 | http://localhost:8000/graphql |
| demo CoreAPI | 8001 | http://localhost:8001/api/sprints |
| auth GraphQL | 8002 | http://localhost:8002/graphql |
| auth MCP | 8003 | http://localhost:8003/mcp |
| multi-app MCP | 8004 | http://localhost:8004/mcp |
| demo paginated | 8005 | http://localhost:8005/graphql |
| demo RPC MCP | 8006 | http://localhost:8006/mcp |
| demo RPC FastAPI | 8007 | http://localhost:8007/api/sprints |
Press Ctrl+C to stop all services.
To start individual services:
We reuse one example throughout: Sprint → Task → User.
- A
Sprinthas manyTasks - A
Taskhas oneowner(aUser) - The API also wants derived fields such as
task_countandcontributors
The concepts appear in this order on purpose:
- GraphQL Mode — the fastest path from SQLModel to a running API
- Core API Mode — DefineSubset DTOs for REST endpoints, progressing from implicit auto-loading to
resolve_*,post_*, and cross-layer data flow - MCP Server — expose the same models to AI assistants
- RPC Services — business service classes shared by MCP and web frameworks
| Need | What you write | What the framework does |
|---|---|---|
| GraphQL API | @query / @mutation on SQLModel methods |
Auto-generates SDL, resolves relationships via DataLoader |
| REST / use-case DTOs | DefineSubset + field declarations |
Implicit auto-loading, N+1 prevention, ORM→DTO conversion |
| Derived fields | post_* methods |
Runs after all nested data is resolved |
| Cross-layer data flow | ExposeAs, SendTo, Collector |
Pass context down or aggregate values up |
| Non-ORM relationships | Relationship(...) on entity |
Same DataLoader infra, same auto-loading |
| AI-ready APIs | config_simple_mcp_server(base=...) |
Progressive-disclosure MCP tools |
| Business services | RpcService subclass with async classmethods |
Auto-discovery, SDL introspection, MCP + FastAPI dual serving |
pip install sqlmodel-nexus
pip install sqlmodel-nexus[fastmcp] # with MCP supportThe fastest path: SQLModel + @query decorator → running GraphQL API.
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from sqlmodel import SQLModel, Field, Relationship, select
from sqlmodel_nexus import query, mutation, GraphQLHandler
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
email: str
class Post(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str
author_id: int = Field(foreign_key="user.id")
author: User | None = Relationship(back_populates="posts")
@query
async def get_all(cls, limit: int = 10) -> list['Post']:
"""Get all posts."""
async with get_session() as session:
return (await session.exec(select(cls).limit(limit))).all()
@mutation
async def create(cls, title: str, author_id: int) -> 'Post':
"""Create a new post."""
async with get_session() as session:
post = cls(title=title, author_id=author_id)
session.add(post)
await session.commit()
await session.refresh(post)
return post
handler = GraphQLHandler(base=SQLModel, session_factory=async_session)
class GraphQLRequest(BaseModel):
query: str
app = FastAPI()
@app.get("/graphql", response_class=HTMLResponse)
async def graphiql():
return handler.get_graphiql_html()
@app.post("/graphql")
async def graphql(req: GraphQLRequest):
return await handler.execute(req.query)Run uvicorn app:app and visit http://localhost:8000/graphql.
Relationships are resolved automatically via DataLoader. No selectinload, no manual joins:
{
postGetAll(limit: 5) {
id
title
author { name email }
}
}The framework walks the GraphQL selection tree level-by-level, collects FK values, and batch-loads via DataLoader. One query per relationship, regardless of result size.
Add order_by to list relationships for automatic pagination support:
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str
posts: list["Post"] = Relationship(
back_populates="author",
sa_relationship_kwargs={"order_by": "Post.id"},
)
handler = GraphQLHandler(base=SQLModel, session_factory=async_session, enable_pagination=True){
userGetAll {
name
posts(limit: 3, offset: 0) {
items { title }
pagination { has_more total_count }
}
}
}Skip @query decorators entirely — let the framework generate by_id and by_filter for every entity:
from sqlmodel_nexus import GraphQLHandler, AutoQueryConfig
handler = GraphQLHandler(
base=SQLModel,
session_factory=async_session,
auto_query_config=AutoQueryConfig(session_factory=async_session),
){ userById(id: 1) { name email } }
{ userByFilter(filter: { name: "Alice" }, limit: 5) { id name } }Use Core API mode when you want the same DataLoader-based batching outside GraphQL — for FastAPI REST endpoints, service-layer response assembly, or any use-case DTO.
The concepts progress in order: auto-loading → resolve_* → post_* → cross-layer flow.
The simplest Core API case: select fields from SQLModel entities, declare relationship fields — they load automatically.
from sqlmodel import SQLModel
from sqlmodel_nexus import DefineSubset, ErManager
class UserDTO(DefineSubset):
__subset__ = (User, ("id", "name"))
class TaskDTO(DefineSubset):
__subset__ = (Task, ("id", "title", "owner_id"))
owner: UserDTO | None = None # name matches Task.owner relationship → auto-loaded
class SprintDTO(DefineSubset):
__subset__ = (Sprint, ("id", "name"))
tasks: list[TaskDTO] = [] # name matches Sprint.tasks relationship → auto-loaded
# App startup — once
er = ErManager(base=SQLModel, session_factory=async_session)
Resolver = er.create_resolver()
# Per request
async def get_sprints():
async with async_session() as session:
sprints = (await session.exec(select(Sprint))).all()
dtos = [SprintDTO(id=s.id, name=s.name) for s in sprints]
return await Resolver().resolve(dtos)How it works:
ErManagerdiscovers all SQLModel entities and their ORM relationshipscreate_resolver()returns a Resolver class bound to that entity graph- When resolving, if a field name matches a relationship and the DTO type is compatible with the target entity, it's loaded via DataLoader automatically
- FK fields (like
owner_id) are hidden from serialization output but available internally
This is the Core API equivalent of GraphQL's relationship resolution — same DataLoader batching, zero resolve_* methods needed for standard relationships.
Use resolve_* when implicit auto-loading doesn't fit: the field name doesn't match a relationship, or you need custom logic.
from pydantic_resolve import Loader
async def comments_loader(task_ids: list[int]) -> list[list[Comment]]:
"""Batch load comments for multiple tasks."""
...
class TaskDTO(DefineSubset):
__subset__ = (Task, ("id", "title", "owner_id"))
owner: UserDTO | None = None # implicit — matches Task.owner
comments: list[CommentDTO] = [] # custom — no matching relationship
comment_count: int = 0
def resolve_comments(self, loader=Loader(comments_loader)):
"""Load comments via a custom batch function."""
return loader.load(self.id)
def post_comment_count(self):
return len(self.comments)Loader accepts a DataLoader class or an async batch function:
# By DataLoader class
def resolve_tags(self, loader=Loader(TagLoader)):
return loader.load(self.id)
# By async batch function
async def load_permissions(user_ids):
...
def resolve_permissions(self, loader=Loader(load_permissions)):
return loader.load(self.owner_id)A useful mental model: resolve_* means "this field needs data from outside the current node."
post_* runs after all resolve_* and auto-loading completes for the current subtree. Use it for counts, aggregations, formatting — anything that depends on already-loaded data.
class SprintDTO(DefineSubset):
__subset__ = (Sprint, ("id", "name"))
tasks: list[TaskDTO] = []
task_count: int = 0
contributor_names: list[str] = []
def post_task_count(self):
return len(self.tasks)
def post_contributor_names(self):
return sorted({t.owner.name for t in self.tasks if t.owner})Execution order for one SprintDTO:
- Implicit auto-load →
tasksfilled with TaskDTOs - Each TaskDTO → implicit auto-load →
ownerfilled post_task_count→len(self.tasks)= 2post_contributor_names→ extract unique owner names
| Question | resolve_* |
post_* |
|---|---|---|
| Needs external IO? | Yes | Usually no |
| Runs before descendants are ready? | Yes | No |
| Good for counts, sums, labels? | Sometimes | Yes |
Reach for these tools only when parent and child nodes need to coordinate.
ExposeAs: send ancestor data downward (parent → descendant)SendTo+Collector: send child data upward (descendant → ancestor)
from typing import Annotated
from sqlmodel_nexus import ExposeAs, SendTo, Collector
class SprintDTO(DefineSubset):
__subset__ = (Sprint, ("id", "name"))
name: Annotated[str, ExposeAs('sprint_name')] # expose to descendants
tasks: list[TaskDTO] = []
contributors: list[UserDTO] = []
def post_contributors(self, collector=Collector('contributors')):
return collector.values() # collect from descendants
class TaskDTO(DefineSubset):
__subset__ = (Task, ("id", "title", "owner_id"))
owner: Annotated[UserDTO | None, SendTo('contributors')] = None # send upward
full_title: str = ""
def post_full_title(self, ancestor_context):
return f"{ancestor_context['sprint_name']} / {self.title}" # read from ancestorUse this only when the shape of the tree matters:
- A child needs ancestor context (sprint name, permissions)
- A parent needs to aggregate values from many descendants (contributors, tags)
For relationships that aren't in the ORM (cross-service calls, computed edges), declare them on the entity:
from sqlmodel_nexus import Relationship
async def tags_loader(task_ids: list[int]) -> list[list[Tag]]:
"""Batch load tags for multiple tasks."""
...
class Task(SQLModel, table=True):
__relationships__ = [
Relationship(fk="id", target=list[Tag], name="tags", loader=tags_loader)
]
id: int | None = Field(default=None, primary_key=True)
title: str
class TagDTO(DefineSubset):
__subset__ = (Tag, ("id", "name"))
class TaskDTO(DefineSubset):
__subset__ = (Task, ("id", "title"))
tags: list[TagDTO] = [] # name matches custom relationship → auto-loadedCustom relationships use the same DataLoader infrastructure and work with implicit auto-loading.
Expose your SQLModel APIs to AI assistants with one function call.
from sqlmodel_nexus.mcp import config_simple_mcp_server
mcp = config_simple_mcp_server(base=SQLModel, name="My API")
mcp.run() # stdio modeTools: get_schema(), graphql_query(query), graphql_mutation(mutation).
from sqlmodel_nexus.mcp import create_mcp_server
mcp = create_mcp_server(
apps=[
{"name": "blog", "base": BlogBase, "description": "Blog API"},
{"name": "shop", "base": ShopBase, "description": "Shop API"},
],
name="Multi-App API",
)
mcp.run()Tools include list_apps(), list_queries(app_name), get_query_schema(name, app_name), graphql_query(query, app_name), etc.
pip install sqlmodel-nexus[fastmcp]Define business logic as service classes, expose them to both MCP and web frameworks from a single source of truth.
RpcService subclasses declare async classmethods. The metaclass auto-discovers public methods.
from sqlmodel_nexus.rpc import RpcService
class SprintService(RpcService):
"""Sprint management service."""
@classmethod
async def list_sprints(cls) -> list[SprintSummary]:
"""Get all sprints with task counts."""
stmt = build_dto_select(SprintSummary)
async with async_session() as session:
rows = (await session.exec(stmt)).all()
dtos = [SprintSummary(**dict(row._mapping)) for row in rows]
return await Resolver().resolve(dtos)
@classmethod
async def get_sprint(cls, sprint_id: int) -> SprintSummary | None:
"""Get a sprint by ID."""
stmt = build_dto_select(SprintSummary, where=Sprint.id == sprint_id)
async with async_session() as session:
rows = (await session.exec(stmt)).all()
if not rows:
return None
dto = SprintSummary(**dict(rows[0]._mapping))
return await Resolver().resolve(dto)Three-layer progressive disclosure: discover → inspect → execute.
from sqlmodel_nexus.rpc import create_rpc_mcp_server, RpcServiceConfig
mcp = create_rpc_mcp_server(
services=[
RpcServiceConfig(name="sprint", service=SprintService, description="Sprint ops"),
],
name="Project RPC API",
)
mcp.run() # stdio modeMCP tools provided:
| Tool | Purpose |
|---|---|
list_services() |
Discover available services and method counts |
describe_service(service_name) |
Method signatures (SDL format) + DTO type definitions |
call_rpc(service_name, method_name, params) |
Execute a method |
describe_service returns SDL-style signatures and type definitions:
{
"name": "sprint",
"methods": [
{"name": "list_sprints", "signature_sdl": "list_sprints(): [SprintSummary!]!"},
{"name": "get_sprint", "signature_sdl": "get_sprint(sprint_id: Int!): SprintSummary"}
],
"types": "type SprintSummary {\n id: Int\n name: String!\n tasks: [TaskSummary!]!\n ...\n}"
}The same service classes integrate directly into FastAPI or any async web framework. Routes are thin wrappers — business logic stays in the service.
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/api/sprints", tags=[SprintService.get_tag_name()])
async def get_sprints():
return await SprintService.list_sprints()
@app.get("/api/sprints/{sprint_id}", tags=[SprintService.get_tag_name()])
async def get_sprint(sprint_id: int):
result = await SprintService.get_sprint(sprint_id=sprint_id)
if result is None:
raise HTTPException(status_code=404, detail="Sprint not found")
return resultget_tag_name() returns an OpenAPI-compatible tag name (e.g. SprintService → "sprint"), so /docs groups routes by service automatically.
One service class, two serving modes:
RpcService subclass ──┬── MCP server (AI agents, progressive disclosure)
└── FastAPI routes (REST API, OpenAPI docs)
# GraphQL playground
uv run python -m demo.app
# visit localhost:8000/graphql
# Core API (REST)
uv run uvicorn demo.core_api.app:app --reload
# visit /docs
# MCP server
uv run --with fastmcp python -m demo.mcp_server
# RPC MCP server (stdio or HTTP)
uv run --with fastmcp python -m demo.rpc_mcp_server # stdio
uv run --with fastmcp python -m demo.rpc_mcp_server --http # HTTP on port 8006
# RPC FastAPI — same services served as REST endpoints
uv run uvicorn demo.rpc_fastapi:app --port 8007
# visit /docs to see routes grouped by service tagMIT License