How do you isolate tenant data in a single PostgreSQL cluster?
Three patterns exist:
| Approach | Isolation | Overhead |
|---|---|---|
| Separate databases | Strongest | Highest — one connection pool per tenant |
Shared tables with a tenant_id column |
Weakest | Easy to query wrong data, needs RLS |
| Separate schemas in one database | Strong | One pool, native PG namespacing |
We chose schema-per-tenant. You get hard namespace isolation without the operational complexity of hundreds of databases. PostgreSQL's search_path becomes your tenant routing key. Let's get into it.
The Schema Layout
Our PostgreSQL instance holds three distinct schema categories:
┌─────────────────────────────────────────────────────────────┐
│ PostgreSQL 16 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌────────────┐ ┌──────────┐ │
│ │ shared │ │ internal │ │ tenant_abc │ │tenant_xyz│ │
│ ├──────────┤ ├──────────┤ ├────────────┤ ├──────────┤ │
│ │ user │ │ sysconfig│ │ contact │ │ contact │ │
│ │ profile │ │ auditlog │ │ campaign │ │ campaign │ │
│ │ tenant │ │ apiusage │ │ wabaaccount│ │wabaaccount│ │
│ │ business │ │ │ │ phonenumber│ │phonenumber│ │
│ │ aisensy_ │ │ │ │ template │ │ template │ │
│ │ business│ │ │ │ message │ │ message │ │
│ └──────────┘ └──────────┘ └────────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
shared — Cross-tenant identity data. User, Profile, Tenant, Business, AiSensyBusiness. These tables have a schema argument of "shared" in SQLAlchemy and are accessed from any session via the search_path.
internal — Platform-level system tables. Never touches tenant data. Admin routes and health checks live here.
tenant_<slug> — One schema per customer. Created at the moment the tenant completes onboarding. Every row in these tables belongs exclusively to that tenant — no tenant_id discriminator column needed anywhere.
The slug derives directly from the business name at signup. "Acme Corp" becomes acme-corp (slug) and tenant_acme_corp (schema name):
# app/core/utils.py
def generate_slug_from_business_name(business_name: str) -> str:
# Strips legal suffixes, normalises unicode, replaces spaces with hyphens
# "Acme Corp Pvt. Ltd." → "acme-corp"
And in the onboarding route:
# app/accounts/routes/onboarding.py
tenant = Tenant(
schema_name=f"tenant_{slug_name.replace('-', '_')}",
slug=slug_name,
)
The slug is the public-facing URL identifier. The schema name is the PostgreSQL namespace. Hyphens become underscores because PostgreSQL schema names cannot contain hyphens without quoting.
The Data Models
The Tenant model lives in shared and is the single source of truth for tenant identity:
class Tenant(Base):
__tablename__ = "tenant"
__table_args__ = {"schema": "shared"}
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
slug: Mapped[str] = mapped_column(String, index=True, unique=True)
schema_name: Mapped[str] = mapped_column(String, index=True, unique=True)
onboarding_complete: Mapped[bool] = mapped_column(Boolean, default=False)
# ...
@property
def tenant_url(self) -> str:
return f"{settings.FRONTEND_HOST}/{self.slug}"
User and Profile sit in shared too. A Profile links to exactly one Tenant via a nullable tenant_id FK. Before a user completes onboarding there's no tenant yet — that FK being nullable was an intentional design choice.
Tenant-specific models carry no tenant_id column at all:
class Contact(Base):
__tablename__ = "contact"
__table_args__ = {"schema": "tenant"} # ← "tenant" is a placeholder
# No tenant_id column. The schema IS the tenant boundary.
Wait — why schema="tenant" and not schema="tenant_acme_corp"? Because SQLAlchemy model definitions are singletons loaded once at process start. The actual schema target is controlled at runtime via search_path, not at model definition time. The string "tenant" in __table_args__ is just metadata used by Alembic (more on that below).
The Middleware: Extracting Tenant from URL
All tenant-specific API calls travel through URL paths of the form:
/{tenant-slug}/api/v1/contacts
/{tenant-slug}/api/v1/campaigns
Our TenantMiddleware (a Starlette BaseHTTPMiddleware) intercepts every request, extracts the slug from the first path segment, looks up the corresponding Tenant record, and stores it in request.state.tenant.
class TenantMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
path = request.url.path
# Public API routes resolve tenant from API key, not URL
if path.startswith("/api/v1/public"):
request.state.tenant = None
return await call_next(request)
tenant_slug = self.extract_tenant_from_path(path)
if tenant_slug:
tenant = await self.get_tenant_by_slug(tenant_slug)
if tenant:
request.state.tenant = tenant
# Rewrite path: /acme-corp/api/v1/contacts → /api/v1/contacts
new_path = self.remove_tenant_from_path(path, tenant_slug)
request.scope["path"] = new_path
else:
request.state.tenant = None
return await call_next(request)
The path rewrite is critical. FastAPI's router has no idea about tenant slugs — it was registered with routes like /api/v1/contacts. If we didn't strip the prefix, every tenant URL would return 404. We mutate request.scope["path"] in-place; that's Starlette's documented mechanism for path rewriting.
Slug Validation to Prevent Route Conflicts
A naive approach just takes the first path segment as the slug. That would accidentally treat /docs, /api, /static as tenant slugs. Our validator rejects them:
def is_valid_tenant_slug(self, slug: str) -> bool:
if not (3 <= len(slug) <= 63):
return False
if not re.match(r'^[a-zA-Z0-9][a-zA-Z0-9\-_]*[a-zA-Z0-9]$', slug):
return False
reserved_routes = {
'api', 'www', 'docs', 'redoc', 'static', 'assets',
'health', 'metrics', 'auth', 'login', 'public', ...
}
return slug.lower() not in reserved_routes
TTL Cache: Don't Hit Postgres on Every Request
Tenant lookups happen on every single HTTP request. We cannot afford a database round-trip for each one. We built a simple thread-safe TTL cache:
class _TenantCache:
def __init__(self, ttl: int = 60):
self._ttl = ttl
self._store: dict[str, tuple[float, Tenant | None]] = {}
self._lock = Lock()
def get(self, slug: str) -> Tenant | None:
with self._lock:
entry = self._store.get(slug)
if entry and (monotonic() - entry[0]) < self._ttl:
return entry[1]
return None
def put(self, slug: str, tenant: Tenant | None) -> None:
with self._lock:
self._store[slug] = (monotonic(), tenant)
def invalidate(self, slug: str) -> None:
with self._lock:
self._store.pop(slug, None)
_tenant_cache = _TenantCache(ttl=60)
The cache is module-level, so it persists across requests within a worker process. A 60-second TTL means a newly created or deactivated tenant takes up to a minute to reflect. Acceptable for our use case; Redis-backed caching would be the natural next step.
The Session Dependencies: search_path as a Routing Key
The real magic happens here. PostgreSQL's search_path tells the engine which schemas to search when an unqualified table name is used. Setting search_path = tenant_acme_corp, shared, public means:
- First look in
tenant_acme_corp— all the tenant's own tables are found here - Fall back to
shared— soshared.user,shared.tenantetc. are still accessible - Fall back to
public— for extensions
We have three session dependency types defined in app/core/deps.py:
TenantSessionDep — For Tenant-Scoped Routes
async def get_tenant_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
tenant = getattr(request.state, "tenant", None)
if not tenant:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Tenant context required. Include tenant slug in URL path: /{tenant}/api/v1/..."
)
schema_name = _validate_schema_name(tenant.schema_name)
session.info["tenant_schema"] = schema_name
session.info["tenant_id"] = tenant.id
await session.execute(
text('SET search_path TO "' + schema_name + '", shared, public')
)
yield session
TenantSessionDep = Annotated[AsyncSession, Depends(get_tenant_db)]
Notice session.info["tenant_schema"] = schema_name. That's not just metadata bookkeeping — it's the key to one of the trickiest problems we ran into.
InternalSessionDep — For Cross-Tenant Routes
async def get_internal_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
async with async_session_factory() as session:
await session.execute(
text('SET search_path TO shared, internal, public')
)
yield session
InternalSessionDep = Annotated[AsyncSession, Depends(get_internal_db)]
Authentication, registration, onboarding, webhooks from external systems — all use this. No tenant context, no schema switching.
PublicApiSessionDep — For External API Keys
The public REST API authenticates via API keys rather than JWT. The API key middleware resolves the tenant and stores tenant_schema and tenant_id in request.state before the dependency runs:
async def get_public_api_db(request: Request) -> AsyncGenerator[AsyncSession, None]:
tenant_schema = getattr(request.state, "tenant_schema", None)
tenant_id = getattr(request.state, "tenant_id", None)
if not tenant_schema or not tenant_id:
raise HTTPException(status_code=403, detail="Tenant required")
schema_name = _validate_schema_name(tenant_schema)
async with async_session_factory() as session:
session.info["tenant_schema"] = schema_name
session.info["tenant_id"] = tenant_id
await session.execute(
text('SET search_path TO "' + schema_name + '", shared, public')
)
yield session
The Hardest Bug: search_path Resets After Every Commit
This took us a while to fully understand. Here's what happens in PostgreSQL:
SET search_pathapplies to a connection, not a transaction. When you commit and SQLAlchemy returns the connection to the pool, the next statement that checks out that connection gets whateversearch_pathwas set on it — which might be a completely different tenant's schema.
In our case: tenant A's request commits, returns the connection to the pool, tenant B's request gets that same connection, we set B's search path, all good. But within a single long request that does multiple commits (e.g., onboarding), after each commit the connection goes back to the pool, a new checkout might return a different physical connection, and the search_path we set earlier is gone.
SQLAlchemy's Session event system is the correct fix. We register a listener on Session.after_begin:
@event.listens_for(Session, "after_begin")
def _reapply_tenant_search_path(session, transaction, connection):
"""
Re-apply the tenant search_path every time a transaction begins.
SET search_path lasts only for the physical connection it ran on.
A session returns its connection to the pool on every commit, so the
next statement may run on a different connection that never got the SET.
This listener fires on every begin (including the implicit one after a
commit) and re-applies it, keyed off the schema name stashed in session.info.
"""
schema = session.info.get("tenant_schema")
if schema:
connection.exec_driver_sql(
f'SET search_path TO "{schema}", shared, public'
)
This fires every time a transaction begins — including the implicit begin after every commit. Because we stash tenant_schema into session.info at session creation time, the listener always knows which schema to apply even after a pool checkout returns a different physical connection.
The session.info dict is part of the Session object and survives commits. The physical connection doesn't, but session.info does. That's the key insight.
SQL Injection in Schema Names
We were paranoid about this one. A tenant with a malicious slug could potentially inject SQL through the schema name in our SET search_path statement. We added a strict validator at every entry point:
_SCHEMA_RE = re.compile(r'^[a-zA-Z0-9_-]+$')
def _validate_schema_name(name: str) -> str:
"""Validate that a schema name is safe for SQL identifier use."""
if not name or not _SCHEMA_RE.match(name):
raise ValueError(f"Invalid schema name: {name}")
return name
The schema name is always double-quoted in SQL (SET search_path TO "tenant_acme_corp"), which prevents identifier injection. The regex validation on top ensures only [a-zA-Z0-9_-] characters get through. The onboarding endpoint also validates schema names against a compiled regex before attempting CREATE SCHEMA:
_SCHEMA_NAME_RE = re.compile(r"^[a-z0-9_]+$")
# In complete_onboarding:
if not _SCHEMA_NAME_RE.match(schema_name):
logger.error(f"Invalid schema name rejected: {schema_name}")
return APIResponse.error(message="Invalid tenant configuration", ...)
Tenant Provisioning: The Onboarding Flow
When a new business signs up, their schema doesn't exist yet. We create it at the end of onboarding as an atomic, idempotent operation:
@router.post("/complete-onboarding")
async def complete_onboarding(session: InternalSessionDep, current_user: CurrentUser):
# Step 1: Check if schema already exists (idempotency)
existed = (await session.execute(
text("SELECT 1 FROM information_schema.schemata WHERE schema_name = :s"),
{"s": schema_name},
)).scalar() is not None
created_schema_this_attempt = False
if not existed:
conn = await session.connection()
await conn.execute(sa.schema.CreateSchema(schema_name))
await session.commit()
created_schema_this_attempt = True
# Step 2: Run Alembic migrations in a worker thread (Alembic is sync)
try:
await asyncio.to_thread(_run_tenant_alembic_upgrade, schema_name)
except Exception:
# Step 2a: Rollback schema if WE created it and migration failed
if created_schema_this_attempt:
conn = await session.connection()
await conn.execute(sa.schema.DropSchema(schema_name, cascade=True))
await session.commit()
return APIResponse.server_error(message="Failed to create tenant schema. Please retry.")
# Step 3: Verify sentinel tables exist (guard against silent partial migrations)
for table_name in ("wabaaccount", "phonenumber"):
present = (await session.execute(
text("SELECT 1 FROM information_schema.tables WHERE table_schema = :s AND table_name = :t"),
{"s": schema_name, "t": table_name},
)).scalar() is not None
if not present:
return APIResponse.server_error(message="Tenant schema is incomplete. Please retry.")
# Step 4: Mark onboarding complete
tenant.onboarding_complete = True
session.add(tenant)
await session.commit()
Several design decisions worth calling out:
Idempotency. We check information_schema.schemata before creating. If the network drops after schema creation but before the response is delivered, the user can retry and it works correctly.
Migration in a thread. Alembic's command.upgrade is synchronous Python. Running it directly in an async def handler would block the entire event loop. asyncio.to_thread hands it to the thread pool.
Rollback only what we created. If we ran this endpoint on a pre-existing schema (e.g., admin re-running onboarding), we must not drop it on migration failure. The created_schema_this_attempt flag tracks whether this specific invocation created the schema.
Sentinel table verification. Alembic can "succeed" silently with a partially applied migration in edge cases (e.g., empty versions table, config mismatch). Checking that wabaaccount and phonenumber exist before marking onboarding_complete = True prevents us from giving a user a broken dashboard.
The Alembic Configuration: Three Independent Migration Tracks
Managing schema migrations across three schema categories (shared, internal, and N tenant schemas) required a multi-section alembic.ini:
[DEFAULT]
script_location = app/alembic
[internal]
version_locations = app/alembic/internal/versions
[shared]
version_locations = app/alembic/shared/versions
[tenant]
version_locations = app/alembic/tenant/versions
The env.py branches based on which section is active:
current_schema = context.config.attributes.get("schema") \
or context.get_x_argument(as_dictionary=True).get("schema")
if current_schema == "internal":
from app.core.db import Base
target_metadata = Base.metadata
def include_object(object, name, type_, reflected, compare_to):
return type_ == "table" and object.schema == "internal"
elif current_schema == "shared":
from app.accounts.models import *
from app.publicapi.models import *
from app.core.db import Base
target_metadata = Base.metadata
def include_object(object, name, type_, reflected, compare_to):
return type_ == "table" and object.schema == "shared"
else:
# tenant — runs against an actual tenant schema name
from app.accounts.models import *
from app.contacts.models import *
from app.whatsapp.models import *
from app.campaigns.models import *
from app.core.db import Base
target_metadata = Base.metadata
def include_object(object, name, type_, reflected, compare_to):
return type_ == "table" and (object.schema == "tenant" or object.schema is None)
And the connection setup in run_migrations_online applies the target schema to search_path:
connection.execute(text('set search_path to "%s"' % current_schema))
connection.commit()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
include_schemas=True,
)
Running migrations:
# Bootstrap shared schema (users, tenants, businesses)
alembic --name shared upgrade head
# Bootstrap internal schema
alembic --name internal upgrade head
# Migrate one specific tenant (used by onboarding code)
alembic --name tenant -x schema=tenant_acme_corp upgrade head
The -x schema=... flag passes the schema name as an Alembic -x argument. env.py reads it via context.get_x_argument(as_dictionary=True).get("schema").
_run_tenant_alembic_upgrade — Programmatic API
The onboarding route doesn't shell out. It uses Alembic's Python API, which is cleaner and doesn't require subprocess:
def _run_tenant_alembic_upgrade(schema_name: str) -> None:
cfg = AlembicConfig(str(config.BASE_DIR / "alembic.ini"), ini_section="tenant")
cfg.attributes["schema"] = schema_name
alembic_command.upgrade(cfg, "head")
cfg.attributes["schema"] is how we pass the schema name when calling programmatically (vs the -x schema=... CLI approach). env.py reads from context.config.attributes.get("schema") first, so both paths work.
Bulk Tenant Migrations: migrate_tenants.py
Adding a new column to the contacts table means running Alembic against every existing tenant schema. We have a dedicated script for this:
def _get_tenant_schema_names() -> Sequence[str]:
engine = create_engine(str(settings.SQLALCHEMY_DATABASE_URI))
with engine.connect() as conn:
rows = conn.execute(
text("SELECT schema_name FROM shared.tenant ORDER BY schema_name")
).fetchall()
return [row[0] for row in rows]
def _migrate_single_tenant(schema_name: str) -> TenantMigrationResult:
"""Apply alembic upgrade head to one tenant schema. Never raises."""
try:
cfg = AlembicConfig("alembic.ini", ini_section="tenant")
cfg.attributes["schema"] = schema_name
alembic_command.upgrade(cfg, "head")
return TenantMigrationResult(schema_name=schema_name, success=True)
except Exception as exc:
logger.error("tenant_migration.failed schema=%s error=%s", schema_name, exc, exc_info=True)
return TenantMigrationResult(schema_name=schema_name, success=False, error=str(exc))
def migrate_all_tenants() -> MigrationResult:
schema_names = _get_tenant_schema_names()
result = MigrationResult()
for schema_name in schema_names:
r = _migrate_single_tenant(schema_name)
result.succeeded.append(r) if r.success else result.failed.append(r)
return result
Key design choices here:
- One failure never blocks others. Each tenant is migrated independently.
_migrate_single_tenantwraps everything in a try/except and returns a result object instead of raising. Tenant B's migration proceeds even if Tenant A's fails. - Returns a structured result. Callers (Celery beat, CI scripts, CLI) can inspect
result.has_failuresand decide what to do. - Ordered by schema name. Deterministic ordering makes log analysis easier.
In production, we run this via a Celery beat task on every deploy, before the new app code starts serving traffic. A failed migration surfaces in structured logs tagged tenant_migration.failed and gets picked up by our Sentry integration.
Celery Integration: Sync Sessions for Background Workers
Celery workers are not async. FastAPI's session dependencies use AsyncSession and are tied to an HTTP request. Neither applies to a Celery task. We provide two context managers for non-request code:
@asynccontextmanager
async def tenant_session(tenant_id: UUID | str) -> AsyncIterator[AsyncSession]:
"""Tenant-scoped AsyncSession for non-request async code."""
async with async_session_factory() as session:
tenant = await session.get(Tenant, tenant_id)
schema_name = _validate_schema_name(tenant.schema_name)
session.info["tenant_schema"] = schema_name
session.info["tenant_id"] = tenant_id
await session.execute(
text('SET search_path TO "' + schema_name + '", shared, public')
)
yield session
@contextmanager
def tenant_session_sync(tenant_id: UUID | str) -> Iterator[Session]:
"""Tenant-scoped sync Session for Celery tasks."""
with Session(engine) as session:
schema_name = _resolve_schema_name_sync(session, tenant_id)
session.info["tenant_schema"] = schema_name
session.info["tenant_id"] = tenant_id
session.execute(
text('SET search_path TO "' + schema_name + '", shared, public')
)
yield session
The sync variant uses the engine (not async_engine). We keep a dedicated sync SQLAlchemy engine running alongside the async one:
# Sync engine — Celery workers, Alembic, management scripts
engine = create_engine(
str(config.settings.SQLALCHEMY_DATABASE_URI),
pool_size=20,
max_overflow=10,
pool_recycle=300,
pool_pre_ping=False, # psycopg3 pre-ping needs greenlet context; disabled here
)
# Async engine — FastAPI request handlers
async_engine = create_async_engine(
str(config.settings.SQLALCHEMY_DATABASE_URI),
pool_size=20,
max_overflow=10,
pool_recycle=300,
pool_pre_ping=True,
)
Note pool_pre_ping=False on the sync engine. psycopg3's pre-ping mechanism needs a greenlet context when running inside an asyncio event loop. Celery workers don't run in an event loop, and we don't want to pay the overhead of greenlet setup here. We rely on pool_recycle=300 and retry logic instead.
A typical Celery task looks like:
@celery_app.task
def send_campaign_batch(tenant_id: str, campaign_id: str, contact_ids: list[str]):
with tenant_session_sync(tenant_id) as session:
campaign = session.get(Campaign, campaign_id)
contacts = session.query(Contact).filter(Contact.id.in_(contact_ids)).all()
# ... process
The _reapply_tenant_search_path event listener fires here too, so even if a sync commit internally recycles the connection to a new one, the search path is re-applied.
The Dual Engine Architecture
The migration from sync SQLModel to async SQLAlchemy 2.0 gave us one non-obvious constraint: you cannot freely mix sync and async code paths with a single engine. Specifically:
- FastAPI route handlers are
async def, useAsyncSession, and mustawaiteverything - Celery task functions are plain
def, useSession, and cannotawait - Alembic is entirely synchronous
So we run two engines from the same database URI:
┌─────────────────────────────────────────────────────────────┐
│ Database Connection Layer │
│ │
│ ┌────────────────────────┐ ┌────────────────────────┐ │
│ │ async_engine │ │ engine │ │
│ │ (psycopg3 async) │ │ (psycopg3 sync) │ │
│ │ pool_size=20 │ │ pool_size=20 │ │
│ │ pool_pre_ping=True │ │ pool_pre_ping=False │ │
│ └───────────┬────────────┘ └────────────┬────────────┘ │
│ │ │ │
│ FastAPI handlers Celery workers │
│ TenantSessionDep tenant_session_sync │
│ InternalSessionDep Alembic migrations │
│ PublicApiSessionDep Management scripts │
└─────────────────────────────────────────────────────────────┘
Both engines share the same PostgreSQL connection pool from the database side. From the application side they're completely independent; no session object ever crosses the async/sync boundary.
The Request Lifecycle: End to End
Putting it all together, here's what happens when a user GETs /acme-corp/api/v1/contacts:
1. Request arrives at uvicorn
2. CORSMiddleware → passes through
3. AuthenticationMiddleware
→ Decodes JWT → sets request.user
4. TenantMiddleware
→ Extracts "acme-corp" from path
→ Hits TTL cache → miss → async DB lookup
→ Finds Tenant(slug="acme-corp", schema_name="tenant_acme_corp")
→ request.state.tenant = tenant
→ request.scope["path"] = "/api/v1/contacts" (prefix stripped)
5. FastAPI router matches /api/v1/contacts → contacts.list_contacts()
6. get_tenant_db() dependency runs
→ Creates AsyncSession via async_session_factory()
→ Validates schema name: "tenant_acme_corp" ✓
→ session.info["tenant_schema"] = "tenant_acme_corp"
→ await session.execute('SET search_path TO "tenant_acme_corp", shared, public')
7. list_contacts() handler executes
→ SELECT * FROM contact WHERE ... LIMIT 20
→ PostgreSQL resolves "contact" → "tenant_acme_corp"."contact"
→ Returns only acme-corp's contacts
8. Session commits → connection returned to pool
→ _reapply_tenant_search_path fires on next transaction begin
9. Response serialized and returned
Tenant A can never see Tenant B's contacts. The search_path ensures that SELECT * FROM contact only ever hits the correct schema.
Tradeoffs and What We'd Do Differently
What worked extremely well:
- Zero
tenant_idcolumns in tenant-schema tables. Queries are cleaner. There's noWHERE tenant_id = ?clause that a developer could accidentally forget. - Native PostgreSQL schema isolation is rock-solid. There's no application-layer RLS policy to get wrong.
- Alembic's multi-section
alembic.inihandles schema-specific migration tracks elegantly. - The
after_beginevent listener pattern for re-applyingsearch_pathwas the cleanest solution we found.
Sharp edges:
- Schema proliferation. With hundreds of tenants you accumulate hundreds of schemas.
pg_catalog.pg_namespacegrows linearly.\dnin psql becomes noisy. This is manageable at our current scale but becomes a concern at thousands of tenants. - Bulk migrations are slow. Running Alembic against 500 schemas sequentially takes time. We're exploring parallelising
migrate_all_tenants()with a thread pool. - Alembic autogenerate is tricky. When you run
alembic --name tenant revision --autogenerate, it compares against whichever tenant schema you point it at. Theinclude_objectfilter is critical to prevent it from generating migrations for shared tables. - Cross-tenant queries are impossible. If you want aggregated analytics across all tenants, you need to either use a separate data warehouse or write a custom query that
UNION ALLs across schemas dynamically. We keep analytics out of the main database for now. - No built-in tenant isolation at the Postgres level. If application code does something wrong (e.g., a raw
text()query without validating the schema), it could theoretically access the wrong schema. Row-Level Security would add an extra safety net, but we haven't implemented it yet.
The Middleware Stack in main.py
The order of middleware registration matters enormously. Here's ours:
middlewares = [
Middleware(CORSMiddleware, allow_origins=settings.all_cors_origins, ...),
Middleware(AuthenticationMiddleware, backend=OAuth2PasswordBearerAuthenticationBackend()),
Middleware(TenantMiddleware),
]
# Public API middleware added after (innermost = last to run)
app.add_middleware(RateLimitMiddleware)
app.add_middleware(IdempotencyMiddleware)
app.add_middleware(RequestIdMiddleware)
TenantMiddleware runs after auth so the user identity is available in request.user when the tenant lookup happens. Rate limiting is scoped to public API keys and runs before request processing but after tenant context is set.
One critical setting:
app = FastAPI(
...
redirect_slashes=False, # Prevent 307 redirects that lose context in multi-tenant apps
)
FastAPI by default sends a 307 redirect when you hit /acme-corp/api/v1/contacts/ instead of /acme-corp/api/v1/contacts. The redirect drops the tenant prefix because the path rewriting in TenantMiddleware has already happened. Disabling slash redirects eliminates this entire class of bug.
Conclusion
Schema-per-tenant PostgreSQL multitenancy with FastAPI comes down to these core mechanisms:
Tenantmodel insharedstores slug → schema_name mappingTenantMiddlewareextracts slug, resolves Tenant via TTL cache, rewrites path- Session dependencies set
SET search_pathand stashtenant_schemainsession.info after_beginevent listener re-appliessearch_pathafter every commit/pool checkout- Multi-section
alembic.inimanages three independent migration tracks complete_onboardingprovisions schemas viaCREATE SCHEMA+ Alembic programmatic APImigrate_all_tenantsdrives per-deploy bulk schema migrations, failure-isolated per tenant- Dual engine (async for FastAPI, sync for Celery) with
session.infopropagation
The system has been running in production without a cross-tenant data leak since launch. The complexity is concentrated in a handful of files (deps.py, middlewares.py, env.py, onboarding.py) and the rest of the application code just picks the right session dependency type.
If you're starting a similar project today, this architecture scales comfortably to hundreds of tenants on a single PostgreSQL instance. Beyond that, you'll want to evaluate PostgreSQL logical replication to secondary clusters, or re-evaluate whether separate databases per large enterprise customer makes sense for your tiered pricing model.
The Lynkist engineering team — May 2026