Skip to main content

API Integration Guide

Complete guide for integrating third-party applications with Synthetic Data Studio's REST API, including authentication, error handling, and best practices.

� Authentication​

JWT Token Authentication​

All API requests require authentication using JWT tokens.

Obtaining Access Tokens​

# Register a new user
curl -X POST "http://localhost:8000/auth/register" \
-H "Content-Type: application/json" \
-d '{
"email": "api-user@example.com",
"password": "secure-password"
}'

# Login to get tokens
curl -X POST "http://localhost:8000/auth/login" \
-H "Content-Type: application/json" \
-d '{
"email": "api-user@example.com",
"password": "secure-password"
}'

Response:

{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 1800
}

Using Tokens in Requests​

# Include token in Authorization header
curl -X GET "http://localhost:8000/datasets/" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

Token Management​

Token Expiration​

  • Access tokens expire in 30 minutes by default
  • Implement automatic token refresh for long-running integrations

Refresh Tokens (Optional)​

# Use refresh token to get new access token
curl -X POST "http://localhost:8000/auth/refresh" \
-H "Authorization: Bearer refresh_token_here"

Authentication Errors​

{
"detail": "Not authenticated",
"type": "authentication_error",
"status_code": 401
}
{
"detail": "Token has expired",
"type": "token_expired",
"status_code": 401
}

� Client Libraries​

Python Client​

Installation​

pip install requests pydantic

Basic Usage​

import requests
from typing import Optional, Dict, Any

class SynthStudioClient:
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.token: Optional[str] = None

def login(self, email: str, password: str) -> Dict[str, Any]:
"""Authenticate and store access token."""
response = requests.post(
f"{self.base_url}/auth/login",
json={"email": email, "password": password}
)
response.raise_for_status()
data = response.json()
self.token = data["access_token"]
return data

def _headers(self) -> Dict[str, str]:
"""Get headers with authentication."""
headers = {"Content-Type": "application/json"}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
return headers

def upload_dataset(self, file_path: str) -> Dict[str, Any]:
"""Upload a dataset file."""
with open(file_path, "rb") as f:
response = requests.post(
f"{self.base_url}/datasets/upload",
files={"file": f},
headers={"Authorization": f"Bearer {self.token}"}
)
response.raise_for_status()
return response.json()

def generate_synthetic_data(
self,
dataset_id: str,
generator_type: str = "ctgan",
num_rows: int = 1000
) -> Dict[str, Any]:
"""Generate synthetic data."""
response = requests.post(
f"{self.base_url}/generators/dataset/{dataset_id}/generate",
json={
"generator_type": generator_type,
"num_rows": num_rows
},
headers=self._headers()
)
response.raise_for_status()
return response.json()

# Usage example
client = SynthStudioClient()
client.login("user@example.com", "password")

# Upload dataset
dataset = client.upload_dataset("data.csv")
dataset_id = dataset["id"]

# Generate synthetic data
result = client.generate_synthetic_data(dataset_id, "dp-ctgan", 500)
print(f"Generation started: {result}")

JavaScript/Node.js Client​

Installation​

npm install axios

Basic Usage​

const axios = require('axios');

class SynthStudioClient {
constructor(baseURL = 'http://localhost:8000') {
this.client = axios.create({ baseURL });
this.token = null;
}

async login(email, password) {
const response = await this.client.post('/auth/login', {
email,
password
});
this.token = response.data.access_token;
this.client.defaults.headers.common['Authorization'] = `Bearer ${this.token}`;
return response.data;
}

async uploadDataset(filePath) {
const FormData = require('form-data');
const fs = require('fs');

const form = new FormData();
form.append('file', fs.createReadStream(filePath));

const response = await this.client.post('/datasets/upload', form, {
headers: {
...form.getHeaders(),
'Authorization': `Bearer ${this.token}`
}
});
return response.data;
}

async generateSyntheticData(datasetId, options = {}) {
const defaultOptions = {
generator_type: 'ctgan',
num_rows: 1000,
...options
};

const response = await this.client.post(
`/generators/dataset/${datasetId}/generate`,
defaultOptions
);
return response.data;
}

async getEvaluation(evaluationId) {
const response = await this.client.get(`/evaluations/${evaluationId}`);
return response.data;
}
}

// Usage
const client = new SynthStudioClient();
await client.login('user@example.com', 'password');

const dataset = await client.uploadDataset('data.csv');
const result = await client.generateSyntheticData(dataset.id, {
generator_type: 'dp-ctgan',
num_rows: 500
});

Asynchronous Operations​

Background Job Handling​

Many operations (data generation, evaluation) run asynchronously.

Polling for Completion​

import time

def wait_for_completion(client, generator_id, timeout=300):
"""Wait for generation to complete."""
start_time = time.time()

while time.time() - start_time < timeout:
response = client.get(f"/generators/{generator_id}")
status = response.json()["status"]

if status == "completed":
return response.json()
elif status == "failed":
raise Exception("Generation failed")

time.sleep(5) # Wait 5 seconds

raise TimeoutError("Operation timed out")

# Usage
result = wait_for_completion(client, generator_id)

Webhook Notifications (Future Feature)​

# Configure webhook endpoint
webhook_config = {
"url": "https://your-app.com/webhooks/synth-studio",
"events": ["generation.completed", "evaluation.completed"],
"secret": "your-webhook-secret"
}

# Register webhook (when implemented)
client.post("/webhooks/register", json=webhook_config)

Data Synchronization​

Batch Operations​

Bulk Dataset Upload​

def upload_multiple_datasets(client, file_paths):
"""Upload multiple datasets."""
results = []
for file_path in file_paths:
try:
result = client.upload_dataset(file_path)
results.append({"file": file_path, "success": True, "data": result})
except Exception as e:
results.append({"file": file_path, "success": False, "error": str(e)})
return results

# Usage
files = ["dataset1.csv", "dataset2.csv", "dataset3.csv"]
results = upload_multiple_datasets(client, files)

Batch Evaluation​

def evaluate_multiple_generators(client, generator_ids):
"""Run evaluations for multiple generators."""
evaluations = []
for gen_id in generator_ids:
try:
# Start evaluation
eval_result = client.post("/evaluations/run", json={
"generator_id": gen_id,
"dataset_id": "original-dataset-id"
})

evaluations.append({
"generator_id": gen_id,
"evaluation_id": eval_result["evaluation_id"],
"status": "started"
})
except Exception as e:
evaluations.append({
"generator_id": gen_id,
"error": str(e)
})
return evaluations

Incremental Sync​

Change Detection​

def get_dataset_changes(client, last_sync_timestamp):
"""Get datasets modified since last sync."""
response = client.get("/datasets/", params={
"modified_after": last_sync_timestamp.isoformat(),
"limit": 100
})
return response.json()

def sync_datasets(client, last_sync):
"""Synchronize datasets with local system."""
changes = get_dataset_changes(client, last_sync)

for dataset in changes:
# Process each changed dataset
local_copy = download_dataset(client, dataset["id"])
update_local_record(dataset)

return len(changes)

Error Handling​

HTTP Status Codes​

Status CodeMeaningAction
200SuccessProcess response
201CreatedResource created successfully
400Bad RequestCheck request parameters
401UnauthorizedRefresh token or re-authenticate
403ForbiddenCheck permissions
404Not FoundVerify resource exists
422Validation ErrorCheck data format
429Too Many RequestsImplement rate limiting
500Server ErrorRetry with exponential backoff

Error Response Format​

{
"detail": "Dataset not found",
"type": "resource_not_found",
"status_code": 404,
"timestamp": "2025-11-27T10:30:00Z"
}
{
"detail": "Validation failed",
"type": "validation_error",
"status_code": 422,
"errors": [
{
"field": "num_rows",
"message": "Must be between 100 and 100000",
"code": "range_error"
}
]
}

Retry Logic​

import time
import random

def retry_request(func, max_retries=3, backoff_factor=2):
"""Retry API requests with exponential backoff."""
for attempt in range(max_retries):
try:
return func()
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise e

# Exponential backoff with jitter
delay = backoff_factor ** attempt + random.uniform(0, 1)
time.sleep(delay)

# Usage
result = retry_request(lambda: client.get("/datasets/"))

Rate Limiting​

Understanding Limits​

  • Authenticated requests: 1000 per minute
  • File uploads: 10 per minute
  • Data generation: 5 concurrent jobs
  • API calls: 5000 per hour

Rate Limit Headers​

X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 999
X-RateLimit-Reset: 1638360000
X-RateLimit-Retry-After: 60

Handling Rate Limits​

def handle_rate_limit(response):
"""Handle rate limit responses."""
if response.status_code == 429:
retry_after = int(response.headers.get("X-RateLimit-Retry-After", 60))
print(f"Rate limited. Retry after {retry_after} seconds.")
time.sleep(retry_after)
return True
return False

# Usage in client
def make_request_with_retry(self, method, url, **kwargs):
while True:
response = requests.request(method, url, **kwargs)
if not handle_rate_limit(response):
return response

Monitoring Integration​

Health Checks​

def check_api_health(base_url):
"""Check API availability."""
try:
response = requests.get(f"{base_url}/health", timeout=5)
return {
"available": response.status_code == 200,
"response_time": response.elapsed.total_seconds(),
"status": response.json().get("status")
}
except Exception as e:
return {
"available": False,
"error": str(e)
}

# Usage
health = check_api_health("http://localhost:8000")
if not health["available"]:
print(f"API unavailable: {health['error']}")

Metrics Collection​

class APIMetrics:
def __init__(self):
self.requests_total = 0
self.requests_failed = 0
self.response_times = []

def record_request(self, response_time, success=True):
self.requests_total += 1
if not success:
self.requests_failed += 1
self.response_times.append(response_time)

def get_metrics(self):
return {
"total_requests": self.requests_total,
"success_rate": (self.requests_total - self.requests_failed) / max(self.requests_total, 1),
"avg_response_time": sum(self.response_times) / max(len(self.response_times), 1),
"error_rate": self.requests_failed / max(self.requests_total, 1)
}

# Usage
metrics = APIMetrics()

# In your client methods
start_time = time.time()
response = requests.get(url)
response_time = time.time() - start_time

metrics.record_request(response_time, response.status_code < 400)

Advanced Integration Patterns​

Streaming Large Datasets​

def download_large_dataset(client, dataset_id, chunk_size=8192):
"""Download large datasets in chunks."""
response = client.get(f"/datasets/{dataset_id}/download", stream=True)
response.raise_for_status()

with open(f"dataset_{dataset_id}.csv", "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)

return f"dataset_{dataset_id}.csv"

Real-time Progress Monitoring​

import asyncio
import websockets

async def monitor_generation_progress(generator_id):
"""Monitor generation progress in real-time."""
uri = "ws://localhost:8000/ws/generation/{generator_id}"

async with websockets.connect(uri) as websocket:
while True:
message = await websocket.recv()
data = json.loads(message)

print(f"Progress: {data['progress']}% - {data['status']}")

if data["status"] in ["completed", "failed"]:
break

# Usage
asyncio.run(monitor_generation_progress("gen-123"))

Service Mesh Integration​

Istio Integration​

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: synth-studio-api
spec:
http:
- match:
- uri:
prefix: "/api"
route:
- destination:
host: synth-studio
timeout: 300s # For long-running generations
retries:
attempts: 3
perTryTimeout: 60s

� Enterprise Integration​

SSO Integration​

SAML 2.0 (Future)​

# SAML authentication flow
def saml_login(saml_response):
"""Process SAML authentication."""
# Validate SAML response
# Extract user information
# Create/update user account
# Generate JWT token
pass

OAuth 2.0​

def oauth_callback(code, state):
"""Handle OAuth callback."""
# Exchange code for tokens
# Get user info from provider
# Create/update user account
# Generate JWT token
pass

Audit Logging​

def log_api_activity(user_id, action, resource, details):
"""Log API activities for compliance."""
audit_entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"action": action,
"resource": resource,
"details": details,
"ip_address": get_client_ip(),
"user_agent": get_user_agent()
}

# Send to audit system
send_to_audit_system(audit_entry)

# Usage in API endpoints
@app.middleware("http")
async def audit_middleware(request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time

log_api_activity(
user_id=get_current_user_id(request),
action=f"{request.method} {request.url.path}",
resource=request.url.path,
details={
"status_code": response.status_code,
"duration": duration,
"user_agent": request.headers.get("user-agent")
}
)

return response

SDKs and Libraries​

Official SDKs (Planned)​

  • Python SDK: pip install synth-studio-sdk
  • JavaScript SDK: npm install synth-studio-sdk
  • Go SDK: go get github.com/synth-studio/sdk-go

Community Libraries​

  • R Integration: install.packages("synthstudio")
  • Java SDK: Maven/Gradle dependency
  • .NET SDK: NuGet package

Troubleshooting Integration Issues​

Common Problems​

Connection Timeouts

Cause: Large datasets, slow networks
Solution: Increase timeout, use streaming, compress data

Authentication Failures

Cause: Expired tokens, clock skew
Solution: Implement token refresh, synchronize clocks

Rate Limit Exceeded

Cause: Too many requests
Solution: Implement queuing, exponential backoff

Data Format Issues

Cause: Incompatible file formats
Solution: Validate formats before upload, use conversion tools

Debug Mode​

# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)

# Add request/response logging
import httpx
httpx_logger = logging.getLogger("httpx")
httpx_logger.setLevel(logging.DEBUG)

Integration Testing​

def test_integration():
"""Test full integration workflow."""
client = SynthStudioClient()

# Test authentication
assert client.login("test@example.com", "password")

# Test dataset upload
dataset = client.upload_dataset("test_data.csv")
assert "id" in dataset

# Test generation
result = client.generate_synthetic_data(dataset["id"])
assert "generator_id" in result

# Test evaluation
evaluation = client.evaluate_generator(result["generator_id"])
assert evaluation["status"] == "completed"

print(" All integration tests passed")

# Run tests
test_integration()

Support and Resources​

Getting Help​

Example Applications​

Webinars and Tutorials​

  • API Integration Basics: Step-by-step video tutorial
  • Advanced Patterns: Webhooks, streaming, batch operations
  • Enterprise Integration: SSO, audit logging, monitoring

Ready to integrate? Start with our Quick Start Tutorial and explore the API documentation at http://localhost:8000/docs.