Skip to content

Authentication API

🔐 JWT-Based Authentication

Quantbot uses JWT (JSON Web Tokens) for secure API authentication. This provides stateless authentication with access and refresh tokens for optimal security and performance.

Base URL

All authentication endpoints are prefixed with:

/api/v1/auth/

Authentication Flow

1. User Registration

First-time users must register an account:

POST /api/v1/users/register

2. User Login

Authenticate with email and password to receive JWT tokens:

POST /api/v1/auth/login

3. API Access

Use the access token in the Authorization header for all API requests:

Authorization: Bearer <access_token>

4. Token Refresh

When the access token expires, use the refresh token to get new tokens:

POST /api/v1/auth/refresh

Endpoints

User Registration

Register a new user account.

POST /api/v1/users/register

Request Body

{
  "email": "user@example.com",
  "password": "securepassword123",
  "full_name": "John Doe"
}

Response

Status Code: 200 OK

{
  "id": 1,
  "email": "user@example.com",
  "full_name": "John Doe",
  "is_active": true,
  "created_at": "2025-01-15T10:30:00Z",
  "updated_at": null
}

Error Responses

400 Bad Request - Validation errors or email already exists

{
  "detail": "Email already registered"
}


User Login

Authenticate user and receive JWT tokens.

POST /api/v1/auth/login

Request Body (Form Data)

Content-Type: application/x-www-form-urlencoded

username=user@example.com&password=securepassword123

Note: Despite the field name "username", you should provide the user's email address.

Response

Status Code: 200 OK

{
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
  "refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
  "token_type": "bearer"
}

Error Responses

401 Unauthorized - Invalid credentials

{
  "detail": "Incorrect username or password"
}

400 Bad Request - Inactive user

{
  "detail": "Inactive user"
}


Token Refresh

Refresh the access token using a valid refresh token.

POST /api/v1/auth/refresh

Request Body (Form Data)

Content-Type: application/x-www-form-urlencoded

refresh_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...

Response

Status Code: 200 OK

{
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
  "refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9...",
  "token_type": "bearer"
}

Error Responses

401 Unauthorized - Invalid refresh token

{
  "detail": "Invalid refresh token"
}


Get Current User

Get information about the currently authenticated user.

GET /api/v1/auth/me

Headers

Authorization: Bearer <access_token>

Response

Status Code: 200 OK

{
  "id": 1,
  "email": "user@example.com",
  "full_name": "John Doe",
  "is_active": true,
  "created_at": "2025-01-15T10:30:00Z",
  "updated_at": null
}

User Logout

Logout and revoke the current refresh token.

POST /api/v1/auth/logout

Headers

Authorization: Bearer <access_token>

Response

Status Code: 200 OK

{
  "message": "Successfully logged out"
}

Logout All Devices

Logout from all devices by revoking all refresh tokens for the user.

POST /api/v1/auth/logout-all

Headers

Authorization: Bearer <access_token>

Response

Status Code: 200 OK

{
  "message": "Successfully logged out from all devices"
}

Token Information

Access Tokens

  • Purpose: Authenticate API requests
  • Expiration: 30 minutes (configurable)
  • Usage: Include in Authorization: Bearer <token> header
  • Storage: Store securely in memory or secure storage

Refresh Tokens

  • Purpose: Generate new access tokens
  • Expiration: 7 days (configurable)
  • Usage: Submit to /auth/refresh endpoint
  • Storage: Store securely (HTTP-only cookies recommended)

Security Best Practices

Token Storage

// ✅ Good: Store in memory or secure storage
class AuthManager {
  constructor() {
    this.accessToken = null;
    this.refreshToken = null;
  }

  setTokens(accessToken, refreshToken) {
    this.accessToken = accessToken;
    // Store refresh token in HTTP-only cookie or secure storage
    this.refreshToken = refreshToken;
  }

  getAuthHeader() {
    return this.accessToken ? `Bearer ${this.accessToken}` : null;
  }
}

// ❌ Bad: Don't store in localStorage for production
localStorage.setItem('access_token', token); // Vulnerable to XSS

Automatic Token Refresh

class APIClient {
  constructor() {
    this.authManager = new AuthManager();
  }

  async makeRequest(url, options = {}) {
    // Add auth header
    options.headers = {
      ...options.headers,
      'Authorization': this.authManager.getAuthHeader()
    };

    let response = await fetch(url, options);

    // Handle token expiration
    if (response.status === 401) {
      const refreshed = await this.refreshToken();
      if (refreshed) {
        // Retry with new token
        options.headers['Authorization'] = this.authManager.getAuthHeader();
        response = await fetch(url, options);
      }
    }

    return response;
  }

  async refreshToken() {
    try {
      const response = await fetch('/api/v1/auth/refresh', {
        method: 'POST',
        headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
        body: `refresh_token=${this.authManager.refreshToken}`
      });

      if (response.ok) {
        const tokens = await response.json();
        this.authManager.setTokens(tokens.access_token, tokens.refresh_token);
        return true;
      }
    } catch (error) {
      console.error('Token refresh failed:', error);
    }

    return false;
  }
}

Integration Examples

Python Example

import requests
import json
from datetime import datetime, timedelta

class QuantbotClient:
    def __init__(self, base_url="http://localhost:8000"):
        self.base_url = base_url
        self.access_token = None
        self.refresh_token = None
        self.token_expires_at = None

    def register(self, email, password, full_name):
        """Register a new user"""
        response = requests.post(f"{self.base_url}/api/v1/users/register", json={
            "email": email,
            "password": password,
            "full_name": full_name
        })
        response.raise_for_status()
        return response.json()

    def login(self, email, password):
        """Login and store tokens"""
        response = requests.post(f"{self.base_url}/api/v1/auth/login", data={
            "username": email,  # OAuth2 form uses 'username' for email
            "password": password
        })
        response.raise_for_status()

        tokens = response.json()
        self.access_token = tokens["access_token"]
        self.refresh_token = tokens["refresh_token"]
        # Estimate expiration (tokens don't include exp time in response)
        self.token_expires_at = datetime.now() + timedelta(minutes=25)

        return tokens

    def refresh_access_token(self):
        """Refresh the access token"""
        if not self.refresh_token:
            raise ValueError("No refresh token available")

        response = requests.post(f"{self.base_url}/api/v1/auth/refresh", data={
            "refresh_token": self.refresh_token
        })
        response.raise_for_status()

        tokens = response.json()
        self.access_token = tokens["access_token"]
        self.refresh_token = tokens["refresh_token"]
        self.token_expires_at = datetime.now() + timedelta(minutes=25)

        return tokens

    def _get_headers(self):
        """Get headers with authentication"""
        if not self.access_token:
            raise ValueError("Not authenticated - call login() first")

        # Auto-refresh if token is about to expire
        if self.token_expires_at and datetime.now() >= self.token_expires_at:
            self.refresh_access_token()

        return {"Authorization": f"Bearer {self.access_token}"}

    def get_current_user(self):
        """Get current user information"""
        response = requests.get(
            f"{self.base_url}/api/v1/auth/me",
            headers=self._get_headers()
        )
        response.raise_for_status()
        return response.json()

    def logout(self):
        """Logout from current session"""
        if self.access_token:
            requests.post(
                f"{self.base_url}/api/v1/auth/logout",
                headers=self._get_headers()
            )

        self.access_token = None
        self.refresh_token = None
        self.token_expires_at = None

# Usage example
client = QuantbotClient()

# Register new user
user = client.register("john@example.com", "password123", "John Doe")
print(f"Registered user: {user['email']}")

# Login
tokens = client.login("john@example.com", "password123")
print("✅ Logged in successfully")

# Get user info
user_info = client.get_current_user()
print(f"Current user: {user_info['full_name']}")

# Logout
client.logout()
print("✅ Logged out")

JavaScript/TypeScript Example

interface User {
  id: number;
  email: string;
  full_name: string;
  is_active: boolean;
  created_at: string;
  updated_at: string | null;
}

interface Tokens {
  access_token: string;
  refresh_token: string;
  token_type: string;
}

class QuantbotClient {
  private baseURL: string;
  private accessToken: string | null = null;
  private refreshToken: string | null = null;

  constructor(baseURL: string = 'http://localhost:8000') {
    this.baseURL = baseURL;
  }

  async register(email: string, password: string, fullName: string): Promise<User> {
    const response = await fetch(`${this.baseURL}/api/v1/users/register`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        email,
        password,
        full_name: fullName
      })
    });

    if (!response.ok) {
      throw new Error(`Registration failed: ${response.statusText}`);
    }

    return await response.json();
  }

  async login(email: string, password: string): Promise<Tokens> {
    const response = await fetch(`${this.baseURL}/api/v1/auth/login`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: `username=${encodeURIComponent(email)}&password=${encodeURIComponent(password)}`
    });

    if (!response.ok) {
      throw new Error(`Login failed: ${response.statusText}`);
    }

    const tokens: Tokens = await response.json();
    this.accessToken = tokens.access_token;
    this.refreshToken = tokens.refresh_token;

    return tokens;
  }

  async refreshAccessToken(): Promise<Tokens> {
    if (!this.refreshToken) {
      throw new Error('No refresh token available');
    }

    const response = await fetch(`${this.baseURL}/api/v1/auth/refresh`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: `refresh_token=${encodeURIComponent(this.refreshToken)}`
    });

    if (!response.ok) {
      throw new Error(`Token refresh failed: ${response.statusText}`);
    }

    const tokens: Tokens = await response.json();
    this.accessToken = tokens.access_token;
    this.refreshToken = tokens.refresh_token;

    return tokens;
  }

  private getAuthHeaders(): Record<string, string> {
    if (!this.accessToken) {
      throw new Error('Not authenticated - call login() first');
    }

    return {
      'Authorization': `Bearer ${this.accessToken}`,
      'Content-Type': 'application/json'
    };
  }

  async getCurrentUser(): Promise<User> {
    const response = await fetch(`${this.baseURL}/api/v1/auth/me`, {
      headers: this.getAuthHeaders()
    });

    if (response.status === 401) {
      // Try to refresh token and retry
      await this.refreshAccessToken();
      const retryResponse = await fetch(`${this.baseURL}/api/v1/auth/me`, {
        headers: this.getAuthHeaders()
      });

      if (!retryResponse.ok) {
        throw new Error(`Failed to get user info: ${retryResponse.statusText}`);
      }

      return await retryResponse.json();
    }

    if (!response.ok) {
      throw new Error(`Failed to get user info: ${response.statusText}`);
    }

    return await response.json();
  }

  async logout(): Promise<void> {
    if (this.accessToken) {
      try {
        await fetch(`${this.baseURL}/api/v1/auth/logout`, {
          method: 'POST',
          headers: this.getAuthHeaders()
        });
      } catch (error) {
        console.warn('Logout request failed:', error);
      }
    }

    this.accessToken = null;
    this.refreshToken = null;
  }

  async logoutAllDevices(): Promise<void> {
    await fetch(`${this.baseURL}/api/v1/auth/logout-all`, {
      method: 'POST',
      headers: this.getAuthHeaders()
    });

    this.accessToken = null;
    this.refreshToken = null;
  }

  // Helper method for making authenticated API calls
  async apiCall(endpoint: string, options: RequestInit = {}): Promise<Response> {
    const url = `${this.baseURL}/api/v1${endpoint}`;
    const headers = {
      ...this.getAuthHeaders(),
      ...options.headers
    };

    let response = await fetch(url, { ...options, headers });

    // Handle token expiration
    if (response.status === 401 && this.refreshToken) {
      try {
        await this.refreshAccessToken();
        // Retry with new token
        response = await fetch(url, {
          ...options,
          headers: { ...this.getAuthHeaders(), ...options.headers }
        });
      } catch (error) {
        throw new Error('Authentication failed - please login again');
      }
    }

    return response;
  }
}

// Usage example
const client = new QuantbotClient();

async function example() {
  try {
    // Register
    const user = await client.register('jane@example.com', 'password123', 'Jane Doe');
    console.log('✅ Registered:', user.email);

    // Login
    await client.login('jane@example.com', 'password123');
    console.log('✅ Logged in successfully');

    // Get current user
    const currentUser = await client.getCurrentUser();
    console.log('Current user:', currentUser.full_name);

    // Make an API call using the helper method
    const response = await client.apiCall('/portfolio');
    const portfolio = await response.json();

    // Logout
    await client.logout();
    console.log('✅ Logged out');

  } catch (error) {
    console.error('Error:', error);
  }
}

cURL Examples

# Register a new user
curl -X POST "http://localhost:8000/api/v1/users/register" \
  -H "Content-Type: application/json" \
  -d '{
    "email": "user@example.com",
    "password": "password123",
    "full_name": "John Doe"
  }'

# Login to get tokens
curl -X POST "http://localhost:8000/api/v1/auth/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=user@example.com&password=password123"

# Use access token for API calls
curl -X GET "http://localhost:8000/api/v1/auth/me" \
  -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."

# Refresh access token
curl -X POST "http://localhost:8000/api/v1/auth/refresh" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "refresh_token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."

# Logout
curl -X POST "http://localhost:8000/api/v1/auth/logout" \
  -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."

Error Handling

Common Error Responses

401 Unauthorized - Invalid or expired token

{
  "detail": "Could not validate credentials"
}

403 Forbidden - Inactive user account

{
  "detail": "Inactive user"
}

422 Unprocessable Entity - Validation errors

{
  "detail": [
    {
      "loc": ["body", "email"],
      "msg": "field required",
      "type": "value_error.missing"
    }
  ]
}

Error Handling Best Practices

import requests
from requests.exceptions import RequestException

def handle_api_error(response):
    """Handle API error responses"""
    if response.status_code == 401:
        print("❌ Authentication failed - check your credentials")
    elif response.status_code == 403:
        print("❌ Access forbidden - account may be inactive")
    elif response.status_code == 422:
        errors = response.json().get('detail', [])
        print("❌ Validation errors:")
        for error in errors:
            field = error.get('loc', ['unknown'])[-1]
            message = error.get('msg', 'Invalid value')
            print(f"  - {field}: {message}")
    else:
        print(f"❌ API error: {response.status_code} - {response.text}")

# Usage
try:
    response = requests.post("/api/v1/auth/login", data=login_data)
    response.raise_for_status()
    tokens = response.json()
except RequestException as e:
    handle_api_error(e.response if hasattr(e, 'response') else None)

Development Notes

  • Environment: This API runs locally on http://localhost:8000 for development
  • Production: Update base URLs and implement HTTPS for production deployment
  • Token Security: Store refresh tokens securely (HTTP-only cookies recommended)
  • CORS: Configure CORS settings for your frontend domain
  • Rate Limiting: Implement rate limiting in production environments