System Architecture
This document provides a comprehensive overview of Synthetic Data Studio's system architecture, design principles, and technical implementation.
High-Level Architectureβ
System Overviewβ
Synthetic Data Studio is built as a modern, scalable web application using FastAPI and follows a modular, service-oriented architecture.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SYNTHETIC DATA STUDIO β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
β β Web API β β Background β β AI/LLM β β Storage β β
β β (FastAPI) β β Workers β β Services β β Service β β
β β β β (Celery) β β (Gemini) β β (S3) β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
β β Business β β Data β β Repository β β Core β β
β β Services β β Models β β Layer β β Servicesβ β
β β β β (SQLModel) β β (CRUD) β β β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DATABASE LAYER β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β PostgreSQL β β SQLite β β Redis β β β
β β β (Primary) β β (Dev/Test) β β (Caching) β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Architectural Principlesβ
1. Modular Designβ
- Separation of Concerns: Each module has a single responsibility
- Dependency Injection: Clean interfaces between components
- Plugin Architecture: Extensible service implementations
2. API-First Designβ
- RESTful APIs: Consistent, versioned endpoints
- OpenAPI Specification: Auto-generated API documentation
- Type Safety: Pydantic models for request/response validation
3. Privacy by Designβ
- Differential Privacy: Mathematical privacy guarantees built-in
- Safety Validation: Multi-layer privacy checks
- Audit Trail: Comprehensive logging for compliance
4. Scalability & Performanceβ
- Asynchronous Processing: Background jobs for long-running tasks
- Caching Strategy: Redis for session and result caching
- Resource Optimization: GPU support for ML workloads
Directory Structureβ
Root Levelβ
backend/
βββ app/ # Main application code
βββ docs/ # Documentation
βββ tests/ # Test suites
βββ scripts/ # Utility scripts
βββ requirements*.txt # Python dependencies
βββ pytest.ini # Test configuration
βββ Dockerfile # Container definition
βββ docker-compose.yml # Local development stack
Application Structure (app/)β
app/
βββ main.py # FastAPI application entry point
βββ api.py # API router aggregation
βββ core/ # Core functionality
β βββ config.py # Configuration management
β βββ dependencies.py # Dependency injection
β βββ exceptions.py # Custom exceptions
β βββ security.py # Authentication & authorization
β βββ utils.py # Utility functions
β βββ validators.py # Input validation
βββ auth/ # Authentication module
β βββ models.py # User models
β βββ repositories.py # User data access
β βββ routes.py # Auth endpoints
β βββ schemas.py # Auth request/response models
β βββ services.py # Auth business logic
βββ datasets/ # Dataset management
βββ generators/ # Synthesis orchestration
βββ evaluations/ # Quality assessment
βββ llm/ # AI features
βββ models/ # ML model management
βββ compliance/ # Compliance endpoints
βββ jobs/ # Background job processing
βββ projects/ # Project management
βββ services/ # Business logic services
β βββ synthesis/ # ML synthesis implementations
β βββ privacy/ # Privacy validation & reporting
β βββ llm/ # AI service integrations
βββ database/ # Database layer
β βββ database.py # Connection management
β βββ models/ # Base models
β βββ migrations/ # Schema migrations
βββ storage/ # File storage abstraction
Core Componentsβ
FastAPI Application Layerβ
Main Application (main.py)β
# Application initialization
app = FastAPI(
title="Synthetic Data Studio API",
description="Backend API for Synthetic Data Studio",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Middleware stack
app.add_middleware(CORSMiddleware, ...)
app.add_middleware(AuthenticationMiddleware, ...)
# Router inclusion
app.include_router(api.router)
# Lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize database, cache, etc.
yield
# Shutdown: Cleanup resources
API Router Aggregation (api.py)β
# Centralized router loading
modules_to_load = [
"app.auth.routes",
"app.datasets.routes",
"app.generators.routes",
# ... other modules
]
for module in modules_to_load:
try:
m = __import__(module, fromlist=["router"])
if hasattr(m, "router"):
router.include_router(m.router)
except Exception as e:
logger.error(f"Failed to load {module}: {e}")
Data Models & Persistenceβ
SQLModel Integrationβ
# Base model with common fields
class BaseModel(SQLModel):
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: Optional[datetime] = Field(default=None)
# Example domain model
class Generator(BaseModel, table=True):
name: str
type: str = Field(index=True)
status: str = Field(default="pending")
parameters_json: Optional[str] = Field(default=None)
privacy_config: Optional[Dict] = Field(default=None, sa_column=Column(JSON))
# ... other fields
Repository Patternβ
class GeneratorRepository:
def __init__(self, session: Session):
self.session = session
def create(self, generator: Generator) -> Generator:
self.session.add(generator)
self.session.commit()
self.session.refresh(generator)
return generator
def get_by_id(self, generator_id: UUID) -> Optional[Generator]:
return self.session.get(Generator, generator_id)
def update_status(self, generator_id: UUID, status: str) -> bool:
# Update logic
pass
Service Layer Architectureβ
Business Logic Servicesβ
class GeneratorService:
def __init__(self, repository: GeneratorRepository):
self.repository = repository
async def create_generator(self, request: GeneratorCreateRequest) -> Generator:
# Validation
# Business logic
# Persistence
pass
async def start_generation(self, generator_id: UUID) -> GenerationResponse:
# Orchestration logic
# Background job scheduling
pass
Synthesis Servicesβ
class CTGANService:
def __init__(self, config: DPConfig):
self.config = config
self.model = None
def train(self, data: pd.DataFrame) -> TrainingResult:
# Model training logic
# Privacy accounting
# Progress tracking
pass
def generate(self, num_samples: int) -> pd.DataFrame:
# Sample generation
# Post-processing
pass
Background Processingβ
Celery Integrationβ
# Task definition
@celery_app.task(bind=True)
def generate_synthetic_data(self, generator_id: str):
"""Background task for data generation."""
try:
# Progress updates
self.update_state(state='PROGRESS', meta={'progress': 25})
# Generation logic
result = perform_generation(generator_id)
# Completion
self.update_state(state='SUCCESS', meta={'result': result})
return result
except Exception as e:
self.update_state(state='FAILURE', meta={'error': str(e)})
raise
Job Managementβ
class JobManager:
def __init__(self, celery_app):
self.celery = celery_app
def submit_generation_job(self, generator_id: UUID) -> str:
"""Submit generation job to queue."""
task = generate_synthetic_data.delay(str(generator_id))
return task.id
def get_job_status(self, job_id: str) -> JobStatus:
"""Get job execution status."""
task = self.celery.AsyncResult(job_id)
return JobStatus(
id=job_id,
status=task.status,
progress=task.info.get('progress', 0) if task.info else 0
)
οΏ½ Security Architectureβ
Authentication & Authorizationβ
JWT-Based Authβ
class AuthService:
def __init__(self, secret_key: str, algorithm: str):
self.secret_key = secret_key
self.algorithm = algorithm
def create_access_token(self, data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=30)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
Role-Based Access Controlβ
class PermissionChecker:
def __init__(self, user: User):
self.user = user
def can_access_dataset(self, dataset: Dataset) -> bool:
"""Check if user can access dataset."""
if self.user.role == "admin":
return True
return dataset.created_by == self.user.id
def can_modify_generator(self, generator: Generator) -> bool:
"""Check if user can modify generator."""
return generator.created_by == self.user.id or self.user.role == "admin"
Privacy & Complianceβ
Differential Privacy Frameworkβ
class DPFramework:
def __init__(self, accountant: PrivacyAccountant):
self.accountant = accountant
def validate_config(self, config: DPConfig) -> ValidationResult:
"""Validate DP configuration for safety."""
# Check sampling rate
# Validate noise multiplier
# Verify privacy budget
pass
def apply_noise(self, tensor: torch.Tensor, noise_multiplier: float) -> torch.Tensor:
"""Apply calibrated noise for DP."""
noise = torch.normal(0, noise_multiplier, tensor.shape)
return tensor + noise
Audit Loggingβ
class AuditLogger:
def __init__(self, logger: logging.Logger):
self.logger = logger
def log_privacy_event(self, event: PrivacyEvent):
"""Log privacy-related events for compliance."""
self.logger.info(
"Privacy Event",
extra={
"event_type": event.type,
"user_id": event.user_id,
"resource_id": event.resource_id,
"privacy_params": event.privacy_params,
"timestamp": event.timestamp.isoformat()
}
)
Data Flow Architectureβ
Synthesis Pipelineβ
Raw Data Input
β
Data Validation & Profiling
β
Privacy Configuration Validation
β
Background Job Submission
β
ML Model Training (GPU/CPU)
β
Privacy Accounting & Validation
β
Synthetic Data Generation
β
Quality Evaluation
β
Result Storage & Notification
API Request Flowβ
HTTP Request β Middleware (CORS, Auth) β Route Handler β Service Layer β Repository β Database
β
Response β JSON Serialization β Pydantic Models β Business Logic β Data Access
Background Job Flowβ
API Request β Job Submission β Queue (Redis) β Worker (Celery) β Task Execution β Result Storage β Notification
Configuration Managementβ
Environment-Based Configβ
class Settings(BaseSettings):
# Database
database_url: str = Field(..., env="DATABASE_URL")
# Security
secret_key: str = Field(..., env="SECRET_KEY")
algorithm: str = "HS256"
access_token_expire_minutes: int = 30
# External Services
redis_url: Optional[str] = Field(None, env="REDIS_URL")
s3_bucket: Optional[str] = Field(None, env="S3_BUCKET")
# Feature Flags
enable_dp: bool = True
enable_llm: bool = True
class Config:
env_file = ".env"
case_sensitive = False
Dependency Injectionβ
def get_db() -> Generator[Session, None, None]:
"""Database session dependency."""
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""Current user dependency."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_by_id(db, user_id)
if user is None:
raise credentials_exception
return user
Scalability Considerationsβ
Horizontal Scalingβ
- Stateless API: No server-side session storage
- Database Connection Pooling: Efficient connection management
- Background Job Distribution: Multiple worker processes
- Load Balancing: API gateway for traffic distribution
Performance Optimizationβ
- Async/Await: Non-blocking I/O operations
- Caching: Redis for frequently accessed data
- Database Indexing: Optimized queries
- GPU Support: CUDA acceleration for ML workloads
Monitoring & Observabilityβ
- Structured Logging: JSON-formatted logs
- Metrics Collection: Prometheus-compatible metrics
- Health Checks: Application and dependency monitoring
- Distributed Tracing: Request flow tracking
Testing Architectureβ
Test Pyramidβ
βββββββββββββββ Few (Integration/E2E)
β E2E Tests β
βββββββββββββββ€
βIntegration β Some
β Tests β
βββββββββββββββ€
β Unit Tests β Many
β β
βββββββββββββββ
Test Structureβ
tests/
βββ unit/ # Unit tests
β βββ test_services/ # Service layer tests
β βββ test_models/ # Model tests
β βββ test_utils/ # Utility tests
βββ integration/ # Integration tests
β βββ test_api/ # API endpoint tests
β βββ test_db/ # Database integration
βββ e2e/ # End-to-end tests
β βββ test_workflows/# Complete workflow tests
βββ conftest.py # Test configuration
Deployment Architectureβ
Containerized Deploymentβ
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY app/ ./app/
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Start application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Production Stackβ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Load Balancer β β API Gateway β β Application β
β (Nginx) ββββββ (Kong/Traefik)ββββββ (FastAPI) β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Redis Cache β β PostgreSQL β β Background β
β β β Database β β Workers β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
Integration Pointsβ
External Servicesβ
- AI Providers: Google Gemini, Groq for LLM features
- Cloud Storage: AWS S3, Google Cloud Storage
- Monitoring: Prometheus, Grafana for observability
- Logging: ELK stack for log aggregation
API Ecosystemβ
- REST API: Primary interface for web/mobile clients
- GraphQL: Optional for complex data requirements
- Webhooks: Event-driven integrations
- Streaming: Real-time progress updates
Further Readingβ
- API Examples: Code examples and API usage
- Development Setup: Local development environment
- Testing Guide: Testing strategies and procedures
- Deployment Guide: Production deployment options
Need help understanding the architecture? Check our Development Setup Guide to get started with local development.