Step-by-step tutorial for building a complete REST API for a blog
REST API for a blog with:
- Users (post authors)
- Posts (blog articles)
- Comments on posts
- Tags (many-to-many relationship)
- Search and filtering
- Transactions for atomic operations
| Topic | Description |
|---|---|
| Models | Defining tables with Model and Field |
| Migrations | Auto-generating and applying schema changes |
| Stub files | Auto-generating .pyi for typing and IDE support |
| CRUD | Create, Read, Update, Delete |
| Foreign Keys | One-to-many relationships with cascade deletion |
| Many-to-Many | Relationships through junction tables |
| Joins & Prefetch | Efficient loading of related data |
| Q expressions | Complex search conditions (OR, AND, NOT) |
| Pagination | Paginated output with metadata |
| Transactions | Atomic operations with auto-rollback |
- Python 3.10+
- SQLite 3.35+ (built into Python)
- Part 1: Project Setup
- Part 2: Users
- Part 3: Posts
- Part 4: Comments
- Part 5: Tags (M2M)
- Part 6: Search and Filtering
- Part 7: Transactions
- Summary
python -m venv venv
source venv/bin/activate # Linux/macOS
# or
venv\Scripts\activate # Windowspip install oxyde fastapi uvicorn email-validatorCreate the following structure:
project/
├── main.py # FastAPI application + entry point
├── models.py # Oxyde models
├── routes/
│ ├── __init__.py
│ └── users.py # User routes
├── migrations/ # Will be created automatically
└── oxyde_config.py # Will be created via oxyde init
Create folders and empty files:
mkdir -p routes
touch main.py models.py
touch routes/__init__.py routes/users.pyRun the interactive setup:
oxyde initAnswer the questions:
- Models module(s):
models - Dialect:
sqlite - Database URL:
sqlite:///blog.db - Migrations directory:
migrations
This will create the file oxyde_config.py:
"""Oxyde ORM configuration."""
MODELS = ["models"]
DIALECT = "sqlite"
MIGRATIONS_DIR = "migrations"
DATABASES = {
"default": f"sqlite:///blog.db",
}Since we want to use the database next to our application, according to the documentation, we modify it slightly:
"""Oxyde ORM configuration."""
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent
MODELS = ["models"]
DIALECT = "sqlite"
MIGRATIONS_DIR = "migrations"
DATABASES = {
"default": f"sqlite:///{BASE_DIR}/blog.db",
}Open main.py and add:
import uvicorn
from fastapi import FastAPI
from oxyde import db
from oxyde_config import DATABASES
app = FastAPI(
title="Blog API",
description="REST API for blog with Oxyde ORM",
version="0.1.0",
lifespan=db.lifespan(**DATABASES)
)
@app.get("/")
async def root():
return {"message": "Blog API is running!"}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)What's happening here:
db.lifespan(**DATABASES)— FastAPI lifecycle context manager. It automatically:- Opens database connection on application startup
- Closes connection on shutdown
**DATABASESunpacks the dictionary from config asdefault="sqlite:///..."
Start the server:
python main.pyOpen in browser: http://localhost:8000
Expected result:
{"message": "Blog API is running!"}Swagger UI is available at: http://localhost:8000/docs
Open models.py and add:
from datetime import datetime
from oxyde import Model, Field
class User(Model):
"""Blog user."""
id: int | None = Field(default=None, db_pk=True)
username: str = Field(db_unique=True, db_index=True)
email: str = Field(db_unique=True)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
class Meta:
is_table = True
table_name = "users"Code breakdown:
| Element | Description |
|---|---|
Model |
Base class for all Oxyde models. Inherits from Pydantic BaseModel |
Field(db_pk=True) |
Primary key. id will be auto-incrementing |
Field(db_unique=True) |
Creates UNIQUE constraint in database |
Field(db_index=True) |
Creates index for fast lookup |
Field(db_default="CURRENT_TIMESTAMP") |
Default value at database level |
int | None = Field(default=None) |
Nullable field. None means the database will assign the value |
class Meta: is_table = True |
Required! Indicates this is a database table |
table_name = "users" |
Table name. If not specified — class name will be used |
oxyde makemigrationsOutput:
📝 Creating migrations...
0️⃣ Loading models...
✅ Imported 1 module(s)
1️⃣ Extracting schema from models...
✅ Found 1 table(s): users
2️⃣ Replaying existing migrations...
📁 Creating migrations directory: /path/migrations
✅ Replayed 0 migration(s)
3️⃣ Computing diff...
✅ Found 1 operation(s):
- Create table: users
4️⃣ Generating migration file...
✅ Created: migrations/0001_create_users_table.py
5️⃣ Generating type stubs...
Generated stub: /path/models.pyi
✅ Generated 1 stub file(s)
Check the created file migrations/0001_create_users_table.py — it contains SQL for creating the table.
Besides the migration, Oxyde creates a models.pyi file — this is a stub file with types for your IDE:
models.pyi
Contents:
from datetime import datetime
class User:
id: int | None
username: str
email: str
created_at: datetime | None
...Why this is useful:
- Autocompletion in IDE (PyCharm, VS Code)
- Hints for FK fields (e.g.,
post.author_idfor FKauthor) - Static type checking (mypy, pyright)
Stub files are updated automatically with each oxyde makemigrations.
oxyde migrateOutput:
⏳ Applying migrations...
Found 1 pending migration(s):
- 0001_create_users_table
Migrating to latest...
✅ Applied 1 migration(s)
- 0001_create_users_table
The users table is now created in blog.db.
Open routes/users.py:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, EmailStr
from models import User
router = APIRouter(prefix="/users", tags=["users"])
# --- Request Schemas ---
class UserCreate(BaseModel):
"""Schema for creating a user."""
username: str
email: EmailStr
class UserUpdate(BaseModel):
"""Schema for updating a user."""
username: str | None = None
email: EmailStr | None = None
# --- Endpoints ---
@router.get("")
async def list_users():
"""Get list of all users."""
return await User.objects.all()
@router.get("/{user_id}")
async def get_user(user_id: int):
"""Get user by ID."""
user = await User.objects.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("", status_code=201)
async def create_user(data: UserCreate):
"""Create a new user."""
# Check username uniqueness
if await User.objects.filter(username=data.username).exists():
raise HTTPException(status_code=400, detail="Username already taken")
# Check email uniqueness
if await User.objects.filter(email=data.email).exists():
raise HTTPException(status_code=400, detail="Email already taken")
return await User.objects.create(**data.model_dump())
@router.patch("/{user_id}")
async def update_user(user_id: int, data: UserUpdate):
"""Update user."""
user = await User.objects.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Update only provided fields
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
await user.save()
return user
@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int):
"""Delete user."""
count = await User.objects.filter(id=user_id).delete()
if not count:
raise HTTPException(status_code=404, detail="User not found")Key Oxyde methods:
| Method | Description |
|---|---|
User.objects.all() |
Get all records |
User.objects.get_or_none(id=1) |
Get by ID or None |
User.objects.filter(username="john") |
Filtering (returns QuerySet) |
User.objects.filter(...).exists() |
Check existence |
User.objects.create(**data) |
Create record |
user.save() |
Save changes |
User.objects.filter(...).delete() |
Delete records (returns count) |
Update main.py:
import uvicorn
from fastapi import FastAPI
from oxyde import db
from oxyde_config import DATABASES
from routes import users
app = FastAPI(
title="Blog API",
description="REST API for blog with Oxyde ORM",
version="0.1.0",
lifespan=db.lifespan(**DATABASES)
)
app.include_router(users.router)
@app.get("/")
async def root():
return {"message": "Blog API is running!"}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)Restart the server (or it will restart automatically with --reload).
Open Swagger UI: http://localhost:8000/docs
POST /users with body:
{"username": "alice", "email": "alice@example.com"}Expected response (201):
{
"id": 1,
"username": "alice",
"email": "alice@example.com",
"created_at": "2024-01-15T10:30:00.123456"
}Create another one:
{"username": "bob", "email": "bob@example.com"}GET /users
Expected response:
[
{"id": 1, "username": "alice", "email": "alice@example.com", "created_at": "..."},
{"id": 2, "username": "bob", "email": "bob@example.com", "created_at": "..."}
]GET /users/1
Expected response:
{"id": 1, "username": "alice", "email": "alice@example.com", "created_at": "..."}PATCH /users/1 with body:
{"username": "alice_updated"}POST /users with body:
{"username": "alice_updated", "email": "new@example.com"}Expected response (400):
{"detail": "Username already taken"}Update models.py:
from datetime import datetime
from oxyde import Model, Field
class User(Model):
"""Blog user."""
id: int | None = Field(default=None, db_pk=True)
username: str = Field(db_unique=True, db_index=True)
email: str = Field(db_unique=True)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
class Meta:
is_table = True
table_name = "users"
class Post(Model):
"""Blog post."""
id: int | None = Field(default=None, db_pk=True)
title: str
content: str
published: bool = Field(default=False)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
updated_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
# Foreign Key to User
author: User | None = Field(default=None, db_on_delete="CASCADE")
class Meta:
is_table = True
table_name = "posts"New concepts:
| Element | Description |
|---|---|
author: User | None |
Foreign Key to User model |
db_on_delete="CASCADE" |
When User is deleted, all their Posts are deleted |
Field(default=False) |
Default value at Python level |
How FK works in Oxyde:
- A column
author_idis created in the database (field name +_id) - When creating a post, pass
author_id=1, notauthor=user - When loading with
join("author")— theauthorfield is populated with a User object
oxyde makemigrationsOutput:
📝 Creating migrations...
0️⃣ Loading models...
✅ Imported 1 module(s)
1️⃣ Extracting schema from models...
✅ Found 2 table(s): users, posts
2️⃣ Replaying existing migrations...
✅ Replayed 1 migration(s)
3️⃣ Computing diff...
✅ Found 1 operation(s):
- Create table: posts
4️⃣ Generating migration file...
✅ Created: migrations/0002_create_posts_table.py
5️⃣ Generating type stubs...
Generated stub: /path/models.pyi
✅ Generated 1 stub file(s)
oxyde migrateCreate routes/posts.py:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from models import Post
router = APIRouter(prefix="/posts", tags=["posts"])
# --- Schemas ---
class PostCreate(BaseModel):
title: str
content: str
author_id: int
published: bool = False
class PostUpdate(BaseModel):
title: str | None = None
content: str | None = None
published: bool | None = None
# --- Endpoints ---
@router.get("")
async def list_posts(published: bool | None = None, author_id: int | None = None):
"""Get list of posts."""
query = Post.objects
if published is not None:
query = query.filter(published=published)
if author_id is not None:
query = query.filter(author_id=author_id)
return await query.all()
@router.get("/{post_id}")
async def get_post(post_id: int):
"""Get post by ID."""
post = await Post.objects.get_or_none(id=post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@router.post("", status_code=201)
async def create_post(data: PostCreate):
"""Create a new post."""
return await Post.objects.create(**data.model_dump())
@router.patch("/{post_id}")
async def update_post(post_id: int, data: PostUpdate):
"""Update post."""
post = await Post.objects.get_or_none(id=post_id)
if not post:
raise HTTPException(status_code=404, detail="Post not found")
for field, value in data.model_dump(exclude_unset=True).items():
setattr(post, field, value)
await post.save()
return post
@router.delete("/{post_id}", status_code=204)
async def delete_post(post_id: int):
"""Delete post."""
count = await Post.objects.filter(id=post_id).delete()
if not count:
raise HTTPException(status_code=404, detail="Post not found")Key methods:
| Method | Description |
|---|---|
.all() |
All records |
.filter(field=value) |
Filtering |
.get_or_none(id=...) |
Single record or None |
.create(**data) |
Create record |
.save() |
Save changes |
.delete() |
Delete (returns count) |
Add to routes/users.py:
from models import User, Post
# ... existing code ...
@router.get("/{user_id}/posts")
async def list_user_posts(user_id: int):
"""Get all posts by user."""
if not await User.objects.filter(id=user_id).exists():
raise HTTPException(status_code=404, detail="User not found")
return await Post.objects.filter(author_id=user_id).order_by("-created_at").all()Update main.py:
import uvicorn
from fastapi import FastAPI
from oxyde import db
from oxyde_config import DATABASES
from routes import users, posts
app = FastAPI(
title="Blog API",
description="REST API for blog with Oxyde ORM",
version="0.1.0",
lifespan=db.lifespan(**DATABASES)
)
app.include_router(users.router)
app.include_router(posts.router)
@app.get("/")
async def root():
return {"message": "Blog API is running!"}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)POST /posts with body:
{
"title": "My first post",
"content": "Hello, this is my first blog post!",
"author_id": 1,
"published": true
}Expected response (201):
{
"id": 1,
"title": "My first post",
"content": "Hello, this is my first blog post!",
"published": true,
"created_at": "...",
"updated_at": "...",
"author_id": 1,
"author": null
}GET /posts
GET /posts?published=true
GET /posts?author_id=1
GET /users/1/posts
All posts by user with id=1.
Add to models.py:
class Comment(Model):
"""Comment on a post."""
id: int | None = Field(default=None, db_pk=True)
content: str
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
# FK to post
post: Post | None = Field(default=None, db_on_delete="CASCADE")
# FK to comment author
author: User | None = Field(default=None, db_on_delete="CASCADE")
class Meta:
is_table = True
table_name = "comments"To load comments together with a post, add to the Post model:
class Post(Model):
"""Blog post."""
id: int | None = Field(default=None, db_pk=True)
title: str
content: str
published: bool = Field(default=False)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
updated_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
author: User | None = Field(default=None, db_on_delete="CASCADE")
# Reverse relationship — list of comments
comments: list["Comment"] = Field(default=[], db_reverse_fk="post")
class Meta:
is_table = True
table_name = "posts"What is db_reverse_fk:
- This is a virtual field, it doesn't create a column in the database
db_reverse_fk="post"points to thepostfield in theCommentmodel- Used with
prefetch("comments")to load related records
oxyde makemigrations
oxyde migrateCreate routes/comments.py:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from models import Comment, Post, User
router = APIRouter(tags=["comments"])
class CommentCreate(BaseModel):
content: str
author_id: int
class CommentUpdate(BaseModel):
content: str
# --- Comments on a Post ---
@router.get("/posts/{post_id}/comments")
async def list_post_comments(post_id: int):
"""Get comments on a post."""
if not await Post.objects.filter(id=post_id).exists():
raise HTTPException(status_code=404, detail="Post not found")
return await Comment.objects.filter(post_id=post_id).join("author").order_by("created_at").all()
@router.post("/posts/{post_id}/comments", status_code=201)
async def create_comment(post_id: int, data: CommentCreate):
"""Add comment to a post."""
if not await Post.objects.filter(id=post_id).exists():
raise HTTPException(status_code=404, detail="Post not found")
if not await User.objects.filter(id=data.author_id).exists():
raise HTTPException(status_code=400, detail="Author not found")
return await Comment.objects.create(
content=data.content,
post_id=post_id,
author_id=data.author_id
)
# --- Operations on Specific Comment ---
@router.get("/comments/{comment_id}")
async def get_comment(comment_id: int):
"""Get comment by ID."""
comment = await Comment.objects.filter(id=comment_id).join("author", "post").first()
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
return comment
@router.patch("/comments/{comment_id}")
async def update_comment(comment_id: int, data: CommentUpdate):
"""Update comment."""
comment = await Comment.objects.get_or_none(id=comment_id)
if not comment:
raise HTTPException(status_code=404, detail="Comment not found")
comment.content = data.content
await comment.save()
return comment
@router.delete("/comments/{comment_id}", status_code=204)
async def delete_comment(comment_id: int):
"""Delete comment."""
count = await Comment.objects.filter(id=comment_id).delete()
if not count:
raise HTTPException(status_code=404, detail="Comment not found")New features:
| Method | Description |
|---|---|
.join("author", "post") |
Multiple JOIN — loads both author and post |
Comment.objects.create(post_id=..., author_id=...) |
Creating with FK using _id suffix |
Add a new endpoint to routes/posts.py:
@router.get("/{post_id}/full")
async def get_post_full(post_id: int):
"""Get post with all comments."""
post = await Post.objects.filter(id=post_id).join("author").prefetch("comments").first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return postHow prefetch() works:
- Main query is executed (SELECT * FROM posts WHERE id=...)
- Second query is executed (SELECT * FROM comments WHERE post_id IN (...))
- Results are combined in Python
Difference between join and prefetch:
| Method | How it works | When to use |
|---|---|---|
join() |
LEFT JOIN in one query | FK (single object) |
prefetch() |
Separate query + Python merge | Reverse FK, M2M (list of objects) |
Update main.py:
import uvicorn
from fastapi import FastAPI
from oxyde import db
from oxyde_config import DATABASES
from routes import users, posts, comments
app = FastAPI(
title="Blog API",
description="REST API for blog with Oxyde ORM",
version="0.1.0",
lifespan=db.lifespan(**DATABASES)
)
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(comments.router)
@app.get("/")
async def root():
return {"message": "Blog API is running!"}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)POST /posts/1/comments with body:
{"content": "Great post!", "author_id": 2}Expected response (201):
{
"id": 1,
"content": "Great post!",
"created_at": "...",
"post_id": 1,
"author_id": 2,
"post": null,
"author": null
}Add a few more comments.
GET /posts/1/comments
All comments on the post with authors.
GET /posts/1/full
Expected response:
{
"id": 1,
"title": "My first post",
"content": "...",
"published": true,
"created_at": "...",
"updated_at": "...",
"author_id": 1,
"author": {"id": 1, "username": "alice_updated", ...},
"comments": [
{"id": 1, "content": "Great post!", "author_id": 2, ...},
{"id": 2, "content": "I agree!", "author_id": 1, ...}
]
}GET /comments/1
Comment with nested author and post.
Important: The
Tagclass must be defined above thePostclass inmodels.py, otherwiselist[Tag]will causeNameError.
Add to models.py (between User and Post):
class Tag(Model):
"""Tag for posts."""
id: int | None = Field(default=None, db_pk=True)
name: str = Field(db_unique=True)
slug: str = Field(db_unique=True, db_index=True)
class Meta:
is_table = True
table_name = "tags"
class PostTag(Model):
"""Many-to-many relationship between Post and Tag."""
id: int | None = Field(default=None, db_pk=True)
post: Post | None = Field(default=None, db_on_delete="CASCADE")
tag: Tag | None = Field(default=None, db_on_delete="CASCADE")
class Meta:
is_table = True
table_name = "post_tags"
unique_together = [("post", "tag")] # Unique pairWhat's new here:
| Element | Description |
|---|---|
unique_together |
Composite UNIQUE constraint. A tag can only be added to a post once |
PostTag |
Junction table for M2M relationship |
Update the Post model:
class Post(Model):
"""Blog post."""
id: int | None = Field(default=None, db_pk=True)
title: str
content: str
published: bool = Field(default=False)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
updated_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
author: User | None = Field(default=None, db_on_delete="CASCADE")
comments: list["Comment"] = Field(default=[], db_reverse_fk="post")
# Many-to-Many relationship with tags
tags: list[Tag] = Field(default=[], db_m2m=True, db_through="PostTag")
class Meta:
is_table = True
table_name = "posts"M2M in Oxyde:
db_m2m=True— indicates this is an M2M relationshipdb_through="PostTag"— name of the junction model- The field is virtual, used with
prefetch("tags")
oxyde makemigrations
oxyde migrateCreate routes/tags.py:
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import re
from models import Tag, Post, PostTag
router = APIRouter(prefix="/tags", tags=["tags"])
class TagCreate(BaseModel):
name: str
def slugify(text: str) -> str:
"""Simple function to create slug."""
text = text.lower().strip()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[\s_-]+', '-', text)
return text
@router.get("")
async def list_tags():
"""Get all tags."""
return await Tag.objects.order_by("name").all()
@router.post("", status_code=201)
async def create_tag(data: TagCreate):
"""Create tag."""
slug = slugify(data.name)
if await Tag.objects.filter(slug=slug).exists():
raise HTTPException(status_code=400, detail="Tag already exists")
return await Tag.objects.create(name=data.name, slug=slug)
@router.get("/{slug}")
async def get_tag(slug: str):
"""Get tag by slug."""
tag = await Tag.objects.get_or_none(slug=slug)
if not tag:
raise HTTPException(status_code=404, detail="Tag not found")
return tag
@router.delete("/{slug}", status_code=204)
async def delete_tag(slug: str):
"""Delete tag."""
count = await Tag.objects.filter(slug=slug).delete()
if not count:
raise HTTPException(status_code=404, detail="Tag not found")
@router.get("/{slug}/posts")
async def list_posts_by_tag(slug: str):
"""Get all posts with this tag."""
tag = await Tag.objects.get_or_none(slug=slug)
if not tag:
raise HTTPException(status_code=404, detail="Tag not found")
# Get post_id through junction table
post_tags = await PostTag.objects.filter(tag_id=tag.id).all()
post_ids = [pt.post_id for pt in post_tags]
if not post_ids:
return []
return await Post.objects.filter(id__in=post_ids).join("author").all()Add to routes/posts.py:
from models import Post, User, Tag, PostTag
# ... existing code ...
class PostTagUpdate(BaseModel):
tag_ids: list[int]
@router.put("/{post_id}/tags")
async def set_post_tags(post_id: int, data: PostTagUpdate):
"""Set tags for post (replaces existing)."""
if not await Post.objects.filter(id=post_id).exists():
raise HTTPException(status_code=404, detail="Post not found")
# Check that all tags exist
existing_tags = await Tag.objects.filter(id__in=data.tag_ids).all()
if len(existing_tags) != len(data.tag_ids):
raise HTTPException(status_code=400, detail="One or more tags not found")
# Remove old relationships
await PostTag.objects.filter(post_id=post_id).delete()
# Create new ones
for tag_id in data.tag_ids:
await PostTag.objects.create(post_id=post_id, tag_id=tag_id)
return {"post_id": post_id, "tag_ids": data.tag_ids}
@router.post("/{post_id}/tags/{tag_id}", status_code=201)
async def add_tag_to_post(post_id: int, tag_id: int):
"""Add tag to post."""
if not await Post.objects.filter(id=post_id).exists():
raise HTTPException(status_code=404, detail="Post not found")
if not await Tag.objects.filter(id=tag_id).exists():
raise HTTPException(status_code=404, detail="Tag not found")
# Check if already added
if await PostTag.objects.filter(post_id=post_id, tag_id=tag_id).exists():
raise HTTPException(status_code=400, detail="Tag already added to post")
return await PostTag.objects.create(post_id=post_id, tag_id=tag_id)
@router.delete("/{post_id}/tags/{tag_id}", status_code=204)
async def remove_tag_from_post(post_id: int, tag_id: int):
"""Remove tag from post."""
count = await PostTag.objects.filter(post_id=post_id, tag_id=tag_id).delete()
if not count:
raise HTTPException(status_code=404, detail="Relationship not found")New lookups:
| Lookup | Description |
|---|---|
id__in=[1, 2, 3] |
SQL: id IN (1, 2, 3) |
Update list_posts in routes/posts.py to load tags:
@router.get("")
async def list_posts(
published: bool | None = Query(None, description="Filter by publication status"),
author_id: int | None = Query(None, description="Filter by author"),
):
"""Get list of posts."""
query = Post.objects.join("author").prefetch("tags").order_by("-created_at")
if published is not None:
query = query.filter(published=published)
if author_id is not None:
query = query.filter(author_id=author_id)
return await query.all()Update get_post and get_post_full in routes/posts.py to return tags:
@router.get("/{post_id}")
async def get_post(post_id: int):
"""Get post by ID."""
post = await Post.objects.filter(id=post_id).prefetch("tags").first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
@router.get("/{post_id}/full")
async def get_post_full(post_id: int):
"""Get post with all comments."""
post = await Post.objects.filter(id=post_id).join("author").prefetch("comments", "tags").first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return postUpdate main.py:
import uvicorn
from fastapi import FastAPI
from oxyde import db
from oxyde_config import DATABASES
from routes import users, posts, comments, tags
app = FastAPI(
title="Blog API",
description="REST API for blog with Oxyde ORM",
version="0.1.0",
lifespan=db.lifespan(**DATABASES)
)
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(comments.router)
app.include_router(tags.router)
@app.get("/")
async def root():
return {"message": "Blog API is running!"}
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)POST /tags
{"name": "Python"}{"name": "FastAPI"}{"name": "Oxyde"}GET /tags
[
{"id": 1, "name": "FastAPI", "slug": "fastapi"},
{"id": 2, "name": "Oxyde", "slug": "oxyde"},
{"id": 3, "name": "Python", "slug": "python"}
]POST /posts/1/tags/1 — add FastAPI tag
POST /posts/1/tags/3 — add Python tag
GET /posts/1
{
"id": 1,
"title": "My first post",
"...",
"tags": [
{"id": 1, "name": "FastAPI", "slug": "fastapi"},
{"id": 3, "name": "Python", "slug": "python"}
]
}GET /tags/python/posts
All posts with Python tag.
PUT /posts/1/tags
{"tag_ids": [2, 3]}Now the post has Oxyde and Python tags (FastAPI removed).
Add to routes/posts.py:
from oxyde import Q
@router.get("/search")
async def search_posts(
q: str = Query(..., min_length=2, description="Search query"),
):
"""Search posts by title and content."""
posts = await Post.objects.filter(
Q(title__icontains=q) | Q(content__icontains=q)
).join("author").prefetch("tags").order_by("-created_at").all()
return postsQ expressions:
| Operator | Description |
|---|---|
Q(...) | Q(...) |
OR — at least one condition |
Q(...) & Q(...) |
AND — both conditions |
~Q(...) |
NOT — negation |
__icontains |
ILIKE '%...%' — case-insensitive search |
Update list_posts for more flexible filtering:
from datetime import datetime
@router.get("")
async def list_posts(
published: bool | None = Query(None, description="Filter by publication status"),
author_id: int | None = Query(None, description="Filter by author"),
created_after: datetime | None = Query(None, description="Posts after date"),
created_before: datetime | None = Query(None, description="Posts before date"),
page: int = Query(1, ge=1, description="Page number"),
per_page: int = Query(10, ge=1, le=100, description="Records per page"),
):
"""Get list of posts with filtering and pagination."""
query = Post.objects.join("author").prefetch("tags")
# Filters
if published is not None:
query = query.filter(published=published)
if author_id is not None:
query = query.filter(author_id=author_id)
if created_after is not None:
query = query.filter(created_at__gte=created_after)
if created_before is not None:
query = query.filter(created_at__lt=created_before)
# Count total
total = await query.count()
# Pagination
offset = (page - 1) * per_page
posts = await query.order_by("-created_at").offset(offset).limit(per_page).all()
return {
"items": posts,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}Date lookups:
| Lookup | SQL | Description |
|---|---|---|
created_at__gte=date |
>= date |
After or equal |
created_at__lt=date |
< date |
Before |
created_at__year=2024 |
EXTRACT(YEAR ...) |
By year |
created_at__between=[d1, d2] |
BETWEEN |
In range |
Add a statistics endpoint:
@router.get("/stats")
async def posts_stats():
"""Post statistics."""
total = await Post.objects.count()
published = await Post.objects.filter(published=True).count()
drafts = await Post.objects.filter(published=False).count()
return {
"total": total,
"published": published,
"drafts": drafts
}GET /posts/search?q=first
Will find posts containing "first" in title or content.
GET /posts?page=1&per_page=5
{
"items": [...],
"total": 12,
"page": 1,
"per_page": 5,
"pages": 3
}GET /posts?created_after=2024-01-01T00:00:00
GET /posts?published=true&author_id=1&page=1&per_page=10
GET /posts/stats
{"total": 5, "published": 3, "drafts": 2}Add to routes/posts.py:
from oxyde.db import transaction
class PostCreateWithTags(BaseModel):
title: str
content: str
author_id: int
published: bool = False
tag_ids: list[int] = []
@router.post("/with-tags", status_code=201)
async def create_post_with_tags(data: PostCreateWithTags):
"""
Create post with tags atomically.
If something goes wrong — everything is rolled back.
"""
# Checks before transaction
if not await User.objects.filter(id=data.author_id).exists():
raise HTTPException(status_code=400, detail="Author not found")
if data.tag_ids:
existing_tags = await Tag.objects.filter(id__in=data.tag_ids).all()
if len(existing_tags) != len(data.tag_ids):
raise HTTPException(status_code=400, detail="One or more tags not found")
# Atomic operation
async with transaction.atomic():
# Create post
post = await Post.objects.create(
title=data.title,
content=data.content,
author_id=data.author_id,
published=data.published
)
# Add tags
for tag_id in data.tag_ids:
await PostTag.objects.create(post_id=post.id, tag_id=tag_id)
return postTransactions in Oxyde:
from oxyde.db import transaction
async with transaction.atomic():
# All operations inside — one transaction
# On exception — automatic ROLLBACK
# On success — automatic COMMITasync with transaction.atomic():
user = await User.objects.create(username="alice", email="alice@test.com")
try:
async with transaction.atomic(): # Savepoint
await Post.objects.create(title="Test", content="Content", author_id=user.id)
raise ValueError("Error!")
except ValueError:
pass # Only Post is rolled back
# User will still be createdAdd an endpoint for user statistics in routes/users.py:
from models import User, Post, Comment
@router.get("/{user_id}/stats")
async def get_user_stats(user_id: int):
"""User statistics."""
user = await User.objects.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
posts_count = await Post.objects.filter(author_id=user_id).count()
comments_count = await Comment.objects.filter(author_id=user_id).count()
published_posts = await Post.objects.filter(author_id=user_id, published=True).count()
return {
"user": user,
"posts_count": posts_count,
"published_posts_count": published_posts,
"comments_count": comments_count
}For reference — final version of models.py:
from datetime import datetime
from oxyde import Model, Field
class User(Model):
"""Blog user."""
id: int | None = Field(default=None, db_pk=True)
username: str = Field(db_unique=True, db_index=True)
email: str = Field(db_unique=True)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
class Meta:
is_table = True
table_name = "users"
class Tag(Model):
"""Tag for posts."""
id: int | None = Field(default=None, db_pk=True)
name: str = Field(db_unique=True)
slug: str = Field(db_unique=True, db_index=True)
class Meta:
is_table = True
table_name = "tags"
class Post(Model):
"""Blog post."""
id: int | None = Field(default=None, db_pk=True)
title: str
content: str
published: bool = Field(default=False)
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
updated_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
author: User | None = Field(default=None, db_on_delete="CASCADE")
comments: list["Comment"] = Field(default=[], db_reverse_fk="post")
tags: list[Tag] = Field(default=[], db_m2m=True, db_through="PostTag")
class Meta:
is_table = True
table_name = "posts"
class Comment(Model):
"""Comment on a post."""
id: int | None = Field(default=None, db_pk=True)
content: str
created_at: datetime | None = Field(default=None, db_default="CURRENT_TIMESTAMP")
post: Post | None = Field(default=None, db_on_delete="CASCADE")
author: User | None = Field(default=None, db_on_delete="CASCADE")
class Meta:
is_table = True
table_name = "comments"
class PostTag(Model):
"""Many-to-many relationship between Post and Tag."""
id: int | None = Field(default=None, db_pk=True)
post: Post | None = Field(default=None, db_on_delete="CASCADE")
tag: Tag | None = Field(default=None, db_on_delete="CASCADE")
class Meta:
is_table = True
table_name = "post_tags"
unique_together = [("post", "tag")]POST /posts/with-tags
{
"title": "New post with tags",
"content": "This post was created atomically with tags",
"author_id": 1,
"published": true,
"tag_ids": [1, 2]
}GET /users/1/stats
{
"user": {"id": 1, "username": "alice_updated", ...},
"posts_count": 3,
"published_posts_count": 2,
"comments_count": 1
}- Models — defining with
Model,Field,Meta - Migrations —
oxyde makemigrationsandoxyde migrate - Stub files — auto-generating
.pyifor typing and IDE autocompletion - CRUD —
create(),all(),get_or_none(),save(),delete() - Foreign Keys —
db_on_delete, automatic_idcolumns - Joins —
join()for eager loading related models - Reverse FK —
db_reverse_fkandprefetch() - Many-to-Many —
db_m2m,db_through, junction tables - Filtering — lookups (
__gte,__icontains,__in) - Q expressions — complex conditions with OR, AND, NOT
- Pagination —
offset(),limit(),count() - Transactions —
transaction.atomic()for atomic operations - SQL defaults —
db_default="CURRENT_TIMESTAMP"
| Method | Path | Description |
|---|---|---|
| GET | /users |
List of users |
| GET | /users/{id} |
User by ID |
| POST | /users |
Create user |
| PATCH | /users/{id} |
Update user |
| DELETE | /users/{id} |
Delete user |
| GET | /users/{id}/posts |
User's posts |
| GET | /users/{id}/stats |
User statistics |
| Method | Path | Description |
|---|---|---|
| GET | /posts |
List of posts (with pagination) |
| GET | /posts/search?q= |
Search posts |
| GET | /posts/stats |
Post statistics |
| GET | /posts/{id} |
Post by ID |
| GET | /posts/{id}/full |
Post with comments |
| POST | /posts |
Create post |
| POST | /posts/with-tags |
Create post with tags |
| PATCH | /posts/{id} |
Update post |
| DELETE | /posts/{id} |
Delete post |
| PUT | /posts/{id}/tags |
Set tags |
| POST | /posts/{id}/tags/{tag_id} |
Add tag |
| DELETE | /posts/{id}/tags/{tag_id} |
Remove tag |
| Method | Path | Description |
|---|---|---|
| GET | /posts/{id}/comments |
Comments on post |
| POST | /posts/{id}/comments |
Add comment |
| GET | /comments/{id} |
Comment by ID |
| PATCH | /comments/{id} |
Update comment |
| DELETE | /comments/{id} |
Delete comment |
| Method | Path | Description |
|---|---|---|
| GET | /tags |
List of tags |
| POST | /tags |
Create tag |
| GET | /tags/{slug} |
Tag by slug |
| DELETE | /tags/{slug} |
Delete tag |
| GET | /tags/{slug}/posts |
Posts with tag |
MIT License