Building a robust trading bot requires more than just implementing clever strategies. You need to ensure your application can handle errors gracefully and respect the rate limits imposed by the Dmarket API. In this article, we'll explore comprehensive error handling techniques and strategies for optimizing your API usage to create a reliable trading system.

Understanding API Errors

When working with the Dmarket API, you may encounter various types of errors, including:

  • Authentication Errors: Issues with API keys or signatures
  • Validation Errors: Invalid parameters or request formats
  • Rate Limit Errors: Too many requests in a short time period
  • Server Errors: Internal issues on Dmarket's side
  • Network Errors: Connection issues between your application and Dmarket

Each type of error requires a different handling approach to ensure your trading bot continues operating effectively.

Implementing Robust Error Handling

Let's start by improving our API client with more sophisticated error handling:


# Enhanced API client with comprehensive error handling
import time
import logging
import requests
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout

class DmarketAPIError(Exception):
    """Base exception for Dmarket API errors"""
    def __init__(self, message, status_code=None, response=None):
        self.message = message
        self.status_code = status_code
        self.response = response
        super().__init__(self.message)

class AuthenticationError(DmarketAPIError):
    """Raised when there are issues with authentication"""
    pass

class ValidationError(DmarketAPIError):
    """Raised when the request contains invalid parameters"""
    pass

class RateLimitError(DmarketAPIError):
    """Raised when rate limits are exceeded"""
    pass

class ServerError(DmarketAPIError):
    """Raised when Dmarket's servers encounter an error"""
    pass

class EnhancedDmarketAPI:
    def __init__(self, public_key, secret_key, api_url="https://api.dmarket.com", logger=None):
        self.public_key = public_key
        self.secret_key = secret_key.encode('utf-8')
        self.api_url = api_url
        self.logger = logger or logging.getLogger(__name__)
        
        # Set up session for connection pooling
        self.session = requests.Session()
    
    def make_request(self, method, endpoint, params=None, body=None, retry_count=3, 
                    retry_delay=1, retry_backoff=2, retry_on_status=None):
        """
        Make an API request with comprehensive error handling and retry logic
        
        Args:
            method: HTTP method (GET, POST, etc.)
            endpoint: API endpoint
            params: Query parameters
            body: Request body for POST/PUT requests
            retry_count: Number of retries for retryable errors
            retry_delay: Initial delay between retries (seconds)
            retry_backoff: Multiplier for retry delay after each attempt
            retry_on_status: List of HTTP status codes to retry on (default: [429, 500, 502, 503, 504])
        
        Returns:
            Parsed JSON response
            
        Raises:
            AuthenticationError: If authentication fails
            ValidationError: If request validation fails
            RateLimitError: If rate limits are exceeded
            ServerError: If server encounters an error
            DmarketAPIError: For other API-related errors
            Exception: For unexpected errors
        """
        url = self.api_url + endpoint
        headers = self._generate_signature(method, endpoint, body)
        
        if retry_on_status is None:
            retry_on_status = [429, 500, 502, 503, 504]
        
        attempt = 0
        last_exception = None
        
        while attempt < retry_count:
            try:
                self.logger.debug(f"Making {method} request to {endpoint} (Attempt {attempt+1}/{retry_count})")
                
                if method == 'GET':
                    response = self.session.get(url, headers=headers, params=params, timeout=10)
                elif method == 'POST':
                    response = self.session.post(url, headers=headers, json=body, timeout=10)
                elif method == 'PUT':
                    response = self.session.put(url, headers=headers, json=body, timeout=10)
                elif method == 'DELETE':
                    response = self.session.delete(url, headers=headers, timeout=10)
                else:
                    raise ValueError(f"Unsupported HTTP method: {method}")
                
                # If we get a successful response, parse and return it
                if response.status_code < 400:
                    return self._parse_response(response)
                
                # Handle specific error status codes
                self._handle_error_response(response)
                
                # If we reach here, the error wasn't handled specifically
                # Check if we should retry based on status code
                if response.status_code in retry_on_status:
                    attempt += 1
                    if attempt < retry_count:
                        sleep_time = retry_delay * (retry_backoff ** attempt)
                        self.logger.warning(f"Request failed with status {response.status_code}. "
                                          f"Retrying in {sleep_time} seconds...")
                        time.sleep(sleep_time)
                        continue
                
                # If we shouldn't retry or have exhausted retries, raise a generic error
                raise DmarketAPIError(
                    f"Request failed with status code {response.status_code}",
                    status_code=response.status_code,
                    response=response.text
                )
                
            except (ConnectionError, Timeout) as e:
                # Network-related errors are generally retryable
                attempt += 1
                last_exception = e
                if attempt < retry_count:
                    sleep_time = retry_delay * (retry_backoff ** attempt)
                    self.logger.warning(f"Network error: {str(e)}. Retrying in {sleep_time} seconds...")
                    time.sleep(sleep_time)
                else:
                    self.logger.error(f"Request failed after {retry_count} attempts due to network errors")
                    raise DmarketAPIError(f"Network error: {str(e)}")
            
            except (AuthenticationError, ValidationError) as e:
                # Don't retry auth/validation errors as they'll likely fail again
                raise e
            
            except RateLimitError as e:
                # Rate limit errors need a longer backoff
                attempt += 1
                if attempt < retry_count:
                    # Use a longer delay for rate limit errors
                    sleep_time = retry_delay * (retry_backoff ** attempt) * 2
                    self.logger.warning(f"Rate limit exceeded. Retrying in {sleep_time} seconds...")
                    time.sleep(sleep_time)
                else:
                    raise e
            
            except Exception as e:
                # Unexpected errors
                self.logger.error(f"Unexpected error during API request: {str(e)}")
                raise
        
        # If we've exhausted retries, raise the last exception
        if last_exception:
            raise DmarketAPIError(f"Request failed after {retry_count} attempts: {str(last_exception)}")
    
    def _generate_signature(self, method, endpoint, body=None):
        # Implementation of signature generation (as in previous examples)
        # ...
        
    def _parse_response(self, response):
        """Parse API response, handling JSON decode errors"""
        try:
            return response.json()
        except json.JSONDecodeError as e:
            self.logger.error(f"Failed to parse JSON response: {str(e)}")
            self.logger.debug(f"Response text: {response.text}")
            raise DmarketAPIError(f"Invalid JSON response: {str(e)}")
    
    def _handle_error_response(self, response):
        """Handle specific error responses from the API"""
        status_code = response.status_code
        
        try:
            error_data = response.json()
        except json.JSONDecodeError:
            error_data = {"message": response.text}
        
        error_message = error_data.get("message", "Unknown error")
        
        if status_code == 401:
            raise AuthenticationError(
                f"Authentication failed: {error_message}",
                status_code=status_code,
                response=error_data
            )
        
        elif status_code == 400:
            raise ValidationError(
                f"Invalid request: {error_message}",
                status_code=status_code,
                response=error_data
            )
        
        elif status_code == 429:
            raise RateLimitError(
                f"Rate limit exceeded: {error_message}",
                status_code=status_code,
                response=error_data
            )
        
        elif status_code >= 500:
            raise ServerError(
                f"Server error: {error_message}",
                status_code=status_code,
                response=error_data
            )
                    

Implementing Exponential Backoff

When encountering rate limits or server errors, implementing an exponential backoff strategy can help your application recover gracefully:


def exponential_backoff(base_delay, retry_count, jitter=0.1):
    """
    Calculate delay time using exponential backoff with jitter
    
    Args:
        base_delay: Base delay in seconds
        retry_count: Current retry attempt (0-based)
        jitter: Random jitter factor (0-1) to add randomness to delay
    
    Returns:
        Delay time in seconds
    """
    import random
    
    # Calculate exponential delay: base_delay * 2^retry_count
    delay = base_delay * (2 ** retry_count)
    
    # Add random jitter to prevent thundering herd problem
    if jitter > 0:
        delay = delay * (1 + random.uniform(-jitter, jitter))
    
    return delay
                    

Pro Tip

Adding a small random jitter to your backoff times helps prevent the "thundering herd" problem, where multiple clients might retry at exactly the same moment after an error.

Understanding Dmarket's Rate Limits

Dmarket imposes rate limits to ensure fair API usage and system stability. While the exact limits may change over time, here are some general guidelines:

  • Most endpoints are limited to 60-120 requests per minute
  • Some high-volume endpoints (like market data) may have higher limits
  • Trading endpoints (buy/sell) often have lower limits
  • Rate limits are typically calculated on a rolling window basis

Implementing a Rate Limiter

To avoid hitting rate limits, we can implement a rate limiter that manages our API request frequency:


import time
import threading
from collections import deque

class RateLimiter:
    """
    Rate limiter that tracks API calls and enforces limits
    """
    def __init__(self, max_calls, time_frame):
        """
        Initialize rate limiter
        
        Args:
            max_calls: Maximum number of calls allowed in the time frame
            time_frame: Time frame in seconds
        """
        self.max_calls = max_calls
        self.time_frame = time_frame
        self.calls = deque()
        self.lock = threading.Lock()
    
    def add_call(self):
        """Record a new API call"""
        current_time = time.time()
        
        # Acquire lock to make this thread-safe
        with self.lock:
            # Remove calls outside the time frame
            while self.calls and self.calls[0] < current_time - self.time_frame:
                self.calls.popleft()
            
            # Add the new call
            self.calls.append(current_time)
    
    def wait_if_needed(self):
        """
        Check if we need to wait before making another call
        If needed, sleep until a call slot becomes available
        
        Returns:
            Time waited in seconds (0 if no wait was needed)
        """
        current_time = time.time()
        
        with self.lock:
            # Clean up old calls first
            while self.calls and self.calls[0] < current_time - self.time_frame:
                self.calls.popleft()
            
            if len(self.calls) < self.max_calls:
                # We're under the limit, no need to wait
                return 0
            
            # Calculate required wait time: when the oldest call will expire
            # plus a small buffer to account for processing time
            wait_time = (self.calls[0] + self.time_frame) - current_time + 0.01
            
        # Wait outside the lock to avoid blocking other threads
        if wait_time > 0:
            time.sleep(wait_time)
            return wait_time
        
        return 0
    
    def __call__(self, func):
        """
        Decorator to rate-limit a function
        
        Example usage:
            @rate_limiter
            def api_call():
                # function body
        """
        def wrapper(*args, **kwargs):
            self.wait_if_needed()
            result = func(*args, **kwargs)
            self.add_call()
            return result
        
        return wrapper
                    

Here's how to integrate the rate limiter with our API client:


class RateLimitedDmarketAPI(EnhancedDmarketAPI):
    """API client with built-in rate limiting"""
    
    def __init__(self, public_key, secret_key, api_url="https://api.dmarket.com", logger=None):
        super().__init__(public_key, secret_key, api_url, logger)
        
        # Create rate limiters for different endpoint types
        self.market_rate_limiter = RateLimiter(max_calls=100, time_frame=60)  # 100 per minute
        self.trading_rate_limiter = RateLimiter(max_calls=30, time_frame=60)   # 30 per minute
        self.account_rate_limiter = RateLimiter(max_calls=60, time_frame=60)   # 60 per minute
        
        # Map endpoints to their respective rate limiters
        self.endpoint_limiters = {
            # Market data endpoints
            "/exchange/v1/market/items": self.market_rate_limiter,
            "/exchange/v1/market/items/": self.market_rate_limiter,
            
            # Trading endpoints
            "/exchange/v1/offers/buy": self.trading_rate_limiter,
            "/exchange/v1/offers/create": self.trading_rate_limiter,
            
            # Account endpoints
            "/account/v1/balance": self.account_rate_limiter,
            "/inventory/v1/user/items": self.account_rate_limiter
        }
    
    def make_request(self, method, endpoint, params=None, body=None, **kwargs):
        """Override make_request to apply appropriate rate limiting"""
        # Find the right rate limiter for this endpoint
        limiter = None
        for prefix, rate_limiter in self.endpoint_limiters.items():
            if endpoint.startswith(prefix):
                limiter = rate_limiter
                break
        
        # Default to market rate limiter if not found
        limiter = limiter or self.market_rate_limiter
        
        # Apply rate limiting
        wait_time = limiter.wait_if_needed()
        if wait_time > 0:
            self.logger.debug(f"Rate limiting: waited {wait_time:.2f}s before calling {endpoint}")
        
        # Make the actual request
        result = super().make_request(method, endpoint, params, body, **kwargs)
        
        # Record this call
        limiter.add_call()
        
        return result
                    

Optimizing API Usage

Beyond error handling and rate limiting, there are several strategies to optimize your API usage:

1. Implement Caching

Cache API responses to avoid redundant requests, especially for relatively static data:


import time
from functools import wraps
from typing import Dict, Any, Callable, Optional

class CacheEntry:
    """Entry in the cache with expiration time"""
    def __init__(self, data: Any, expiry: float):
        self.data = data
        self.expiry = expiry
    
    def is_expired(self) -> bool:
        """Check if this cache entry has expired"""
        return time.time() > self.expiry

class ResponseCache:
    """Cache for API responses"""
    def __init__(self):
        self.cache: Dict[str, CacheEntry] = {}
    
    def get(self, key: str) -> Optional[Any]:
        """Get item from cache if it exists and is not expired"""
        if key in self.cache:
            entry = self.cache[key]
            if not entry.is_expired():
                return entry.data
            # Remove expired entry
            del self.cache[key]
        return None
    
    def set(self, key: str, data: Any, ttl: int = 60) -> None:
        """
        Add item to cache with expiration time
        
        Args:
            key: Cache key
            data: Data to cache
            ttl: Time to live in seconds
        """
        expiry = time.time() + ttl
        self.cache[key] = CacheEntry(data, expiry)
    
    def clear(self) -> None:
        """Clear the entire cache"""
        self.cache.clear()
    
    def remove_expired(self) -> int:
        """Remove all expired entries from cache"""
        expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
        for key in expired_keys:
            del self.cache[key]
        return len(expired_keys)

def cached(cache: ResponseCache, ttl: int = 60, key_func: Optional[Callable] = None):
    """
    Decorator to cache function results
    
    Args:
        cache: Cache instance to use
        ttl: Time to live in seconds
        key_func: Function to generate cache key from args and kwargs
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Generate cache key
            if key_func:
                cache_key = key_func(*args, **kwargs)
            else:
                # Default key is function name + stringified args + kwargs
                args_str = ','.join(str(arg) for arg in args)
                kwargs_str = ','.join(f"{k}={v}" for k, v in sorted(kwargs.items()))
                cache_key = f"{func.__name__}({args_str};{kwargs_str})"
            
            # Check cache
            cached_result = cache.get(cache_key)
            if cached_result is not None:
                return cached_result
            
            # Execute function and cache result
            result = func(*args, **kwargs)
            cache.set(cache_key, result, ttl)
            return result
        
        return wrapper
    
    return decorator

# Example usage in API client
class CachedDmarketAPI(RateLimitedDmarketAPI):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = ResponseCache()
    
    @cached(ttl=60)  # Cache market data for 60 seconds
    def get_market_items(self, game_id, title=None, limit=100):
        """Get items available on the market (with caching)"""
        endpoint = "/exchange/v1/market/items"
        params = {
            'gameId': game_id,
            'limit': limit
        }
        
        if title:
            params['title'] = title
        
        return self.make_request('GET', endpoint, params=params)
                    

2. Batch Requests

Combine multiple operations into a single API request when possible:


def get_multiple_item_prices(self, item_ids, game_id):
    """
    Get prices for multiple items in a single request
    
    Args:
        item_ids: List of item IDs
        game_id: Game ID
    
    Returns:
        Dictionary mapping item IDs to their prices
    """
    if not item_ids:
        return {}
    
    # If API supports batch requests, use it
    endpoint = "/exchange/v1/market/items/batch"
    body = {
        'gameId': game_id,
        'itemIds': item_ids
    }
    
    result = self.make_request('POST', endpoint, body=body)
    
    # Process the results into a dictionary
    prices = {}
    for item in result.get('items', []):
        item_id = item.get('itemId')
        price = float(item.get('price', {}).get('amount', 0))
        prices[item_id] = price
    
    return prices
                    

3. Use Pagination Efficiently

When retrieving large sets of data, use pagination to process results in chunks:


def get_all_inventory_items(self, game_id, batch_size=100):
    """
    Get all inventory items using pagination
    
    Args:
        game_id: Game ID
        batch_size: Number of items to fetch per request
    
    Returns:
        List of all inventory items
    """
    all_items = []
    offset = 0
    
    while True:
        endpoint = "/inventory/v1/user/items"
        params = {
            'gameId': game_id,
            'limit': batch_size,
            'offset': offset
        }
        
        batch = self.make_request('GET', endpoint, params=params)
        
        # Add items from this batch
        items = batch.get('objects', [])
        all_items.extend(items)
        
        # Check if we've retrieved all items
        if len(items) < batch_size or batch.get('total', 0) <= offset + batch_size:
            break
        
        # Update offset for next batch
        offset += batch_size
    
    return all_items
                    

Monitoring and Debugging

To effectively manage your API usage, implement comprehensive logging and monitoring:


import logging
import time
from functools import wraps

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("api_activity.log"),
        logging.StreamHandler()
    ]
)

# Create loggers
api_logger = logging.getLogger('dmarket.api')
perf_logger = logging.getLogger('dmarket.performance')

def log_api_call(func):
    """Decorator to log API calls with timing information"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        
        # Extract endpoint from args (assumes first arg is self, second is method, third is endpoint)
        method = args[1] if len(args) > 1 else 'UNKNOWN'
        endpoint = args[2] if len(args) > 2 else 'UNKNOWN'
        
        api_logger.info(f"API Call: {method} {endpoint}")
        
        try:
            result = func(*args, **kwargs)
            
            # Log success
            elapsed = time.time() - start_time
            perf_logger.info(f"API Call completed: {method} {endpoint} in {elapsed:.3f}s")
            
            return result
            
        except Exception as e:
            # Log failure
            elapsed = time.time() - start_time
            api_logger.error(f"API Call failed: {method} {endpoint} in {elapsed:.3f}s - {str(e)}")
            
            # Re-raise the exception
            raise
    
    return wrapper

# Apply to API client's make_request method
EnhancedDmarketAPI.make_request = log_api_call(EnhancedDmarketAPI.make_request)
                    

Track These Metrics

  • Request Success Rate: Percentage of successful API calls
  • Response Times: Average and percentile response times
  • Rate Limit Usage: How close you are to hitting limits
  • Error Frequency: Types and frequency of errors
  • Retry Statistics: How often retries are needed

Conclusion

Robust error handling and intelligent rate limiting are essential components of a reliable trading bot. By implementing the techniques outlined in this article, you can create a resilient application that gracefully handles errors, respects rate limits, and optimizes API usage.

Remember that even the best error handling can't compensate for poor API usage patterns. Always design your bot to be considerate of API resources, caching where appropriate, batching requests when possible, and distributing load across time to avoid hitting rate limits.

In our next article, we'll cover security best practices to protect your API keys and trading assets.