Deployment Guide

This guide covers deploying ServicePytan applications in production environments, focusing on security, reliability, and best practices for internal use.

Production Deployment Overview

Deployment Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Application   │────│   ServicePytan  │────│  ServiceTitan   │
│    Server       │    │    Library      │    │      API        │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  Configuration  │    │    Logging &    │    │   Rate Limits   │
│  & Secrets      │    │   Monitoring    │    │  & Quotas       │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Key Components

  1. Application Layer - Your business logic and ServicePytan integration

  2. Configuration Management - Secure credential and environment management

  3. Monitoring & Logging - Observability and debugging capabilities

  4. Error Handling & Retry - Resilience against API failures

  5. Security Controls - Secure credential storage and access controls

Environment Configuration

Production Environment Setup

# production_config.py
import os
from servicepytan.auth import servicepytan_connect, ApiEnvironment

def get_production_connection():
    """Production connection with proper error handling"""
    try:
        return servicepytan_connect(
            # Use environment variables for production
            client_id=os.environ['SERVICETITAN_CLIENT_ID'],
            client_secret=os.environ['SERVICETITAN_CLIENT_SECRET'],
            app_key=os.environ['SERVICETITAN_APP_KEY'],
            tenant_id=os.environ['SERVICETITAN_TENANT_ID'],
            timezone=os.environ.get('SERVICETITAN_TIMEZONE', 'UTC'),
            api_environment=ApiEnvironment.PRODUCTION
        )
    except KeyError as e:
        raise EnvironmentError(f"Missing required environment variable: {e}")

def get_integration_connection():
    """Integration environment for testing"""
    return servicepytan_connect(
        client_id=os.environ['SERVICETITAN_INTEGRATION_CLIENT_ID'],
        client_secret=os.environ['SERVICETITAN_INTEGRATION_CLIENT_SECRET'],
        app_key=os.environ['SERVICETITAN_INTEGRATION_APP_KEY'],
        tenant_id=os.environ['SERVICETITAN_INTEGRATION_TENANT_ID'],
        api_environment=ApiEnvironment.INTEGRATION
    )

Environment Variables

Required Production Variables:

# ServiceTitan Production Credentials
SERVICETITAN_CLIENT_ID=cid.production_client_id
SERVICETITAN_CLIENT_SECRET=cs2.production_client_secret
SERVICETITAN_APP_KEY=ak1.production_app_key
SERVICETITAN_TENANT_ID=production_tenant_id

# Optional Configuration
SERVICETITAN_TIMEZONE=America/New_York
SERVICETITAN_API_ENVIRONMENT=production

# Application Configuration
LOG_LEVEL=INFO
RETRY_ATTEMPTS=3
BATCH_SIZE=200

Integration/Testing Variables:

# ServiceTitan Integration Credentials  
SERVICETITAN_INTEGRATION_CLIENT_ID=cid.integration_client_id
SERVICETITAN_INTEGRATION_CLIENT_SECRET=cs2.integration_client_secret
SERVICETITAN_INTEGRATION_APP_KEY=ak1.integration_app_key
SERVICETITAN_INTEGRATION_TENANT_ID=integration_tenant_id

Security Best Practices

Credential Management

1. Use Secrets Management Systems

AWS Secrets Manager:

import boto3
import json
from botocore.exceptions import ClientError

def get_servicetitan_credentials(secret_name, region_name="us-east-1"):
    """Retrieve ServiceTitan credentials from AWS Secrets Manager"""
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        secret = json.loads(get_secret_value_response['SecretString'])
        
        return servicepytan_connect(
            client_id=secret['SERVICETITAN_CLIENT_ID'],
            client_secret=secret['SERVICETITAN_CLIENT_SECRET'],
            app_key=secret['SERVICETITAN_APP_KEY'],
            tenant_id=secret['SERVICETITAN_TENANT_ID'],
            api_environment="production"
        )
    except ClientError as e:
        raise Exception(f"Failed to retrieve credentials: {e}")

# Usage
conn = get_servicetitan_credentials("prod/servicetitan/api")

Azure Key Vault:

from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential

def get_servicetitan_credentials_azure(vault_url):
    """Retrieve ServiceTitan credentials from Azure Key Vault"""
    credential = DefaultAzureCredential()
    client = SecretClient(vault_url=vault_url, credential=credential)
    
    try:
        client_id = client.get_secret("servicetitan-client-id").value
        client_secret = client.get_secret("servicetitan-client-secret").value
        app_key = client.get_secret("servicetitan-app-key").value
        tenant_id = client.get_secret("servicetitan-tenant-id").value
        
        return servicepytan_connect(
            client_id=client_id,
            client_secret=client_secret,
            app_key=app_key,
            tenant_id=tenant_id,
            api_environment="production"
        )
    except Exception as e:
        raise Exception(f"Failed to retrieve credentials: {e}")

2. Environment Separation

# config.py
import os
from enum import Enum

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"

class Config:
    def __init__(self):
        self.environment = Environment(os.environ.get('ENVIRONMENT', 'development'))
        self.debug = self.environment != Environment.PRODUCTION
        
    def get_servicetitan_connection(self):
        if self.environment == Environment.PRODUCTION:
            return get_production_connection()
        elif self.environment == Environment.STAGING:
            return get_staging_connection()
        else:
            return get_development_connection()

# Usage
config = Config()
conn = config.get_servicetitan_connection()

Access Controls

1. Principle of Least Privilege

class ServiceTitanService:
    """Wrapper service with role-based access"""
    
    def __init__(self, user_role, conn):
        self.conn = conn
        self.user_role = user_role
        
        # Define role permissions
        self.permissions = {
            'read_only': ['get_one', 'get_all', 'get_page'],
            'operator': ['get_one', 'get_all', 'get_page', 'patch'],
            'admin': ['get_one', 'get_all', 'get_page', 'post', 'patch', 'delete']
        }
    
    def _check_permission(self, operation):
        allowed_ops = self.permissions.get(self.user_role, [])
        if operation not in allowed_ops:
            raise PermissionError(f"Role '{self.user_role}' not allowed to perform '{operation}'")
    
    def get_jobs(self, filters=None):
        self._check_permission('get_all')
        endpoint = servicepytan.Endpoint("jpm", "jobs", conn=self.conn)
        return endpoint.get_all(filters or {})
    
    def update_job(self, job_id, data):
        self._check_permission('patch')
        endpoint = servicepytan.Endpoint("jpm", "jobs", conn=self.conn)
        return endpoint.patch(job_id, data)

# Usage
service = ServiceTitanService('read_only', conn)
jobs = service.get_jobs({'active': 'true'})

2. API Key Rotation

import datetime
import logging

class CredentialRotationService:
    def __init__(self, secrets_manager):
        self.secrets_manager = secrets_manager
        self.logger = logging.getLogger(__name__)
    
    def check_credential_age(self, secret_name):
        """Check if credentials need rotation"""
        metadata = self.secrets_manager.describe_secret(SecretId=secret_name)
        last_changed = metadata['LastChangedDate']
        age_days = (datetime.datetime.now(tz=last_changed.tzinfo) - last_changed).days
        
        if age_days > 90:  # Rotate every 90 days
            self.logger.warning(f"Credentials for {secret_name} are {age_days} days old")
            return True
        return False
    
    def rotate_credentials(self, secret_name):
        """Initiate credential rotation"""
        self.logger.info(f"Starting credential rotation for {secret_name}")
        # Implement rotation logic based on your infrastructure
        pass

Monitoring and Logging

Application Logging

import logging
import structlog
from datetime import datetime

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.add_log_level,
        structlog.processors.add_logger_name,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer()
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    logger_factory=structlog.WriteLoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

class ServiceTitanLogger:
    """Enhanced logging for ServiceTitan API interactions"""
    
    def __init__(self, service_name):
        self.logger = structlog.get_logger().bind(service=service_name)
    
    def log_api_call(self, endpoint, method, params=None, duration=None, error=None):
        """Log API call details"""
        log_data = {
            'endpoint': f"{endpoint.folder}/{endpoint.endpoint}",
            'method': method,
            'timestamp': datetime.utcnow().isoformat(),
        }
        
        if params:
            log_data['params'] = params
        if duration:
            log_data['duration_ms'] = duration * 1000
        if error:
            log_data['error'] = str(error)
            self.logger.error("API call failed", **log_data)
        else:
            self.logger.info("API call successful", **log_data)

# Usage
api_logger = ServiceTitanLogger("data_sync_service")

def get_jobs_with_logging(conn, filters):
    import time
    
    endpoint = servicepytan.Endpoint("jpm", "jobs", conn=conn)
    start_time = time.time()
    
    try:
        result = endpoint.get_all(filters)
        duration = time.time() - start_time
        
        api_logger.log_api_call(
            endpoint=endpoint,
            method="get_all",
            params=filters,
            duration=duration
        )
        
        return result
        
    except Exception as e:
        duration = time.time() - start_time
        api_logger.log_api_call(
            endpoint=endpoint,
            method="get_all",
            params=filters,
            duration=duration,
            error=e
        )
        raise

Performance Monitoring

import time
from functools import wraps
from dataclasses import dataclass
from typing import Dict, List
import statistics

@dataclass
class APIMetrics:
    endpoint: str
    call_count: int
    total_duration: float
    avg_duration: float
    error_count: int
    success_rate: float

class PerformanceMonitor:
    def __init__(self):
        self.metrics: Dict[str, List[float]] = {}
        self.errors: Dict[str, int] = {}
        self.calls: Dict[str, int] = {}
    
    def record_call(self, endpoint, duration, success=True):
        """Record API call metrics"""
        key = f"{endpoint.folder}/{endpoint.endpoint}"
        
        if key not in self.metrics:
            self.metrics[key] = []
            self.errors[key] = 0
            self.calls[key] = 0
        
        self.metrics[key].append(duration)
        self.calls[key] += 1
        
        if not success:
            self.errors[key] += 1
    
    def get_metrics(self) -> Dict[str, APIMetrics]:
        """Get aggregated metrics"""
        result = {}
        
        for endpoint, durations in self.metrics.items():
            if durations:
                avg_duration = statistics.mean(durations)
                total_duration = sum(durations)
                call_count = self.calls[endpoint]
                error_count = self.errors[endpoint]
                success_rate = (call_count - error_count) / call_count if call_count > 0 else 0
                
                result[endpoint] = APIMetrics(
                    endpoint=endpoint,
                    call_count=call_count,
                    total_duration=total_duration,
                    avg_duration=avg_duration,
                    error_count=error_count,
                    success_rate=success_rate
                )
        
        return result

# Global monitor instance
monitor = PerformanceMonitor()

def monitored_api_call(func):
    """Decorator to monitor API calls"""
    @wraps(func)
    def wrapper(endpoint, *args, **kwargs):
        start_time = time.time()
        success = True
        
        try:
            result = func(endpoint, *args, **kwargs)
            return result
        except Exception as e:
            success = False
            raise
        finally:
            duration = time.time() - start_time
            monitor.record_call(endpoint, duration, success)
    
    return wrapper

# Apply to ServicePytan methods
servicepytan.Endpoint.get_all = monitored_api_call(servicepytan.Endpoint.get_all)
servicepytan.Endpoint.get_one = monitored_api_call(servicepytan.Endpoint.get_one)

Error Handling and Resilience

Retry Strategies

import time
import random
from functools import wraps
from requests.exceptions import RequestException, Timeout, ConnectionError

class RetryConfig:
    def __init__(self, max_attempts=3, base_delay=1, max_delay=60, backoff_factor=2):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor

def retry_with_backoff(config: RetryConfig = None):
    """Retry decorator with exponential backoff and jitter"""
    if config is None:
        config = RetryConfig()
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(config.max_attempts):
                try:
                    return func(*args, **kwargs)
                
                except (ConnectionError, Timeout, RequestException) as e:
                    last_exception = e
                    
                    if attempt == config.max_attempts - 1:
                        logger.error(f"Max retry attempts reached for {func.__name__}")
                        raise
                    
                    # Calculate delay with exponential backoff and jitter
                    delay = min(
                        config.base_delay * (config.backoff_factor ** attempt),
                        config.max_delay
                    )
                    jitter = random.uniform(0, 0.1) * delay
                    total_delay = delay + jitter
                    
                    logger.warning(
                        f"Attempt {attempt + 1} failed for {func.__name__}, "
                        f"retrying in {total_delay:.2f} seconds: {e}"
                    )
                    time.sleep(total_delay)
            
            raise last_exception
        
        return wrapper
    return decorator

# Apply retry logic to ServicePytan calls
class ResilientServiceTitanClient:
    def __init__(self, conn, retry_config=None):
        self.conn = conn
        self.retry_config = retry_config or RetryConfig()
    
    @retry_with_backoff()
    def get_jobs(self, filters):
        endpoint = servicepytan.Endpoint("jpm", "jobs", conn=self.conn)
        return endpoint.get_all(filters)
    
    @retry_with_backoff()
    def get_customers(self, filters):
        endpoint = servicepytan.Endpoint("crm", "customers", conn=self.conn)
        return endpoint.get_all(filters)

Circuit Breaker Pattern

import time
from enum import Enum
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60, reset_timeout=30):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.reset_timeout = reset_timeout
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self._lock = Lock()
    
    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                if time.time() - self.last_failure_time > self.reset_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.failure_count = 0
                else:
                    raise Exception("Circuit breaker is OPEN")
            
            try:
                result = func(*args, **kwargs)
                
                if self.state == CircuitState.HALF_OPEN:
                    self.state = CircuitState.CLOSED
                
                self.failure_count = 0
                return result
                
            except Exception as e:
                self.failure_count += 1
                self.last_failure_time = time.time()
                
                if self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN
                
                raise

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=60)

def protected_api_call(endpoint, method, *args, **kwargs):
    """API call protected by circuit breaker"""
    return circuit_breaker.call(getattr(endpoint, method), *args, **kwargs)

Deployment Patterns

Docker Deployment

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Copy requirements first for better caching
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Create non-root user
RUN useradd --create-home --shell /bin/bash app \
    && chown -R app:app /app
USER app

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python health_check.py

EXPOSE 8000

CMD ["python", "main.py"]
# docker-compose.yml
version: '3.8'

services:
  servicepytan-app:
    build: .
    environment:
      - ENVIRONMENT=production
      - SERVICETITAN_CLIENT_ID=${SERVICETITAN_CLIENT_ID}
      - SERVICETITAN_CLIENT_SECRET=${SERVICETITAN_CLIENT_SECRET}
      - SERVICETITAN_APP_KEY=${SERVICETITAN_APP_KEY}
      - SERVICETITAN_TENANT_ID=${SERVICETITAN_TENANT_ID}
      - LOG_LEVEL=INFO
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "python", "health_check.py"]
      interval: 30s
      timeout: 10s
      retries: 3

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: servicepytan-app
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: servicepytan-app
  template:
    metadata:
      labels:
        app: servicepytan-app
    spec:
      containers:
      - name: servicepytan-app
        image: your-registry/servicepytan-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: SERVICETITAN_CLIENT_ID
          valueFrom:
            secretKeyRef:
              name: servicetitan-secrets
              key: client-id
        - name: SERVICETITAN_CLIENT_SECRET
          valueFrom:
            secretKeyRef:
              name: servicetitan-secrets
              key: client-secret
        - name: SERVICETITAN_APP_KEY
          valueFrom:
            secretKeyRef:
              name: servicetitan-secrets
              key: app-key
        - name: SERVICETITAN_TENANT_ID
          valueFrom:
            secretKeyRef:
              name: servicetitan-secrets
              key: tenant-id
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Secret
metadata:
  name: servicetitan-secrets
  namespace: production
type: Opaque
data:
  client-id: <base64-encoded-client-id>
  client-secret: <base64-encoded-client-secret>
  app-key: <base64-encoded-app-key>
  tenant-id: <base64-encoded-tenant-id>

Performance Optimization

Connection Pooling

from threading import Lock
from queue import Queue
import threading

class ConnectionPool:
    def __init__(self, max_connections=10):
        self.max_connections = max_connections
        self.pool = Queue(maxsize=max_connections)
        self.created_connections = 0
        self.lock = Lock()
    
    def get_connection(self):
        """Get connection from pool or create new one"""
        try:
            # Try to get existing connection
            return self.pool.get_nowait()
        except:
            # Create new connection if pool is empty
            with self.lock:
                if self.created_connections < self.max_connections:
                    conn = servicepytan.auth.servicepytan_connect()
                    self.created_connections += 1
                    return conn
                else:
                    # Wait for available connection
                    return self.pool.get()
    
    def return_connection(self, conn):
        """Return connection to pool"""
        try:
            self.pool.put_nowait(conn)
        except:
            # Pool is full, discard connection
            pass

# Global connection pool
connection_pool = ConnectionPool(max_connections=5)

class PooledServiceTitanClient:
    def __init__(self):
        self.local = threading.local()
    
    def _get_connection(self):
        if not hasattr(self.local, 'conn'):
            self.local.conn = connection_pool.get_connection()
        return self.local.conn
    
    def get_jobs(self, filters):
        conn = self._get_connection()
        endpoint = servicepytan.Endpoint("jpm", "jobs", conn=conn)
        return endpoint.get_all(filters)

Caching Strategy

import redis
import json
import hashlib
from datetime import datetime, timedelta

class ServiceTitanCache:
    def __init__(self, redis_url="redis://localhost:6379/0"):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1 hour
    
    def _generate_key(self, endpoint, method, params):
        """Generate cache key from request parameters"""
        key_data = f"{endpoint.folder}/{endpoint.endpoint}/{method}/{json.dumps(params, sort_keys=True)}"
        return f"servicepytan:{hashlib.md5(key_data.encode()).hexdigest()}"
    
    def get(self, endpoint, method, params):
        """Get cached response"""
        key = self._generate_key(endpoint, method, params)
        cached_data = self.redis_client.get(key)
        
        if cached_data:
            return json.loads(cached_data)
        return None
    
    def set(self, endpoint, method, params, data, ttl=None):
        """Cache response data"""
        key = self._generate_key(endpoint, method, params)
        ttl = ttl or self.default_ttl
        
        self.redis_client.setex(
            key,
            ttl,
            json.dumps(data, default=str)
        )

# Usage with caching
cache = ServiceTitanCache()

def cached_api_call(endpoint, method, params, cache_ttl=3600):
    """API call with caching"""
    # Try cache first
    cached_result = cache.get(endpoint, method, params)
    if cached_result:
        logger.info(f"Cache hit for {endpoint.folder}/{endpoint.endpoint}")
        return cached_result
    
    # Make API call
    result = getattr(endpoint, method)(params)
    
    # Cache result
    cache.set(endpoint, method, params, result, cache_ttl)
    logger.info(f"Cached result for {endpoint.folder}/{endpoint.endpoint}")
    
    return result

Health Checks and Monitoring

Application Health Check

# health_check.py
import sys
import time
import logging
from datetime import datetime, timedelta

def check_servicetitan_connection():
    """Check ServiceTitan API connectivity"""
    try:
        conn = servicepytan.auth.servicepytan_connect()
        endpoint = servicepytan.Endpoint("settings", "business-units", conn=conn)
        
        start_time = time.time()
        result = endpoint.get_all({"pageSize": 1})
        response_time = time.time() - start_time
        
        return {
            "status": "healthy",
            "response_time_ms": response_time * 1000,
            "timestamp": datetime.utcnow().isoformat()
        }
    except Exception as e:
        return {
            "status": "unhealthy",
            "error": str(e),
            "timestamp": datetime.utcnow().isoformat()
        }

def check_credentials_age():
    """Check if credentials are approaching expiration"""
    # Implement based on your credential management system
    return {"status": "healthy", "days_until_expiration": 45}

def health_check():
    """Comprehensive health check"""
    checks = {
        "servicetitan_api": check_servicetitan_connection(),
        "credentials": check_credentials_age(),
        "timestamp": datetime.utcnow().isoformat()
    }
    
    overall_status = "healthy" if all(
        check["status"] == "healthy" 
        for check in checks.values() 
        if isinstance(check, dict) and "status" in check
    ) else "unhealthy"
    
    checks["overall_status"] = overall_status
    return checks

if __name__ == "__main__":
    result = health_check()
    print(json.dumps(result, indent=2))
    
    if result["overall_status"] != "healthy":
        sys.exit(1)

Deployment Checklist

Pre-deployment

  • Security Review

    • Credentials stored securely (not in code)

    • API permissions reviewed and minimized

    • Access controls implemented

    • Secrets rotation schedule established

  • Environment Setup

    • Production environment variables configured

    • Integration environment available for testing

    • Network access to ServiceTitan API confirmed

    • DNS resolution working

  • Monitoring Setup

    • Logging configured and tested

    • Health checks implemented

    • Performance monitoring enabled

    • Alert thresholds defined

Post-deployment

  • Validation

    • Health checks passing

    • API connectivity confirmed

    • Authentication working

    • Sample data retrieved successfully

  • Monitoring

    • Logs flowing correctly

    • Metrics being collected

    • Alerts configured

    • Dashboard created

  • Documentation

    • Deployment procedures documented

    • Troubleshooting runbook created

    • Contact information updated

    • Credential rotation procedures documented


Back to Troubleshooting | Back to Getting Started