Building a robust trading bot requires more than just implementing clever strategies. You need to ensure your application can handle errors gracefully and respect the rate limits imposed by the Dmarket API. In this article, we'll explore comprehensive error handling techniques and strategies for optimizing your API usage to create a reliable trading system.
Understanding API Errors
When working with the Dmarket API, you may encounter various types of errors, including:
- Authentication Errors: Issues with API keys or signatures
- Validation Errors: Invalid parameters or request formats
- Rate Limit Errors: Too many requests in a short time period
- Server Errors: Internal issues on Dmarket's side
- Network Errors: Connection issues between your application and Dmarket
Each type of error requires a different handling approach to ensure your trading bot continues operating effectively.
Implementing Robust Error Handling
Let's start by improving our API client with more sophisticated error handling:
# Enhanced API client with comprehensive error handling
import time
import logging
import requests
import json
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
class DmarketAPIError(Exception):
"""Base exception for Dmarket API errors"""
def __init__(self, message, status_code=None, response=None):
self.message = message
self.status_code = status_code
self.response = response
super().__init__(self.message)
class AuthenticationError(DmarketAPIError):
"""Raised when there are issues with authentication"""
pass
class ValidationError(DmarketAPIError):
"""Raised when the request contains invalid parameters"""
pass
class RateLimitError(DmarketAPIError):
"""Raised when rate limits are exceeded"""
pass
class ServerError(DmarketAPIError):
"""Raised when Dmarket's servers encounter an error"""
pass
class EnhancedDmarketAPI:
def __init__(self, public_key, secret_key, api_url="https://api.dmarket.com", logger=None):
self.public_key = public_key
self.secret_key = secret_key.encode('utf-8')
self.api_url = api_url
self.logger = logger or logging.getLogger(__name__)
# Set up session for connection pooling
self.session = requests.Session()
def make_request(self, method, endpoint, params=None, body=None, retry_count=3,
retry_delay=1, retry_backoff=2, retry_on_status=None):
"""
Make an API request with comprehensive error handling and retry logic
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint
params: Query parameters
body: Request body for POST/PUT requests
retry_count: Number of retries for retryable errors
retry_delay: Initial delay between retries (seconds)
retry_backoff: Multiplier for retry delay after each attempt
retry_on_status: List of HTTP status codes to retry on (default: [429, 500, 502, 503, 504])
Returns:
Parsed JSON response
Raises:
AuthenticationError: If authentication fails
ValidationError: If request validation fails
RateLimitError: If rate limits are exceeded
ServerError: If server encounters an error
DmarketAPIError: For other API-related errors
Exception: For unexpected errors
"""
url = self.api_url + endpoint
headers = self._generate_signature(method, endpoint, body)
if retry_on_status is None:
retry_on_status = [429, 500, 502, 503, 504]
attempt = 0
last_exception = None
while attempt < retry_count:
try:
self.logger.debug(f"Making {method} request to {endpoint} (Attempt {attempt+1}/{retry_count})")
if method == 'GET':
response = self.session.get(url, headers=headers, params=params, timeout=10)
elif method == 'POST':
response = self.session.post(url, headers=headers, json=body, timeout=10)
elif method == 'PUT':
response = self.session.put(url, headers=headers, json=body, timeout=10)
elif method == 'DELETE':
response = self.session.delete(url, headers=headers, timeout=10)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# If we get a successful response, parse and return it
if response.status_code < 400:
return self._parse_response(response)
# Handle specific error status codes
self._handle_error_response(response)
# If we reach here, the error wasn't handled specifically
# Check if we should retry based on status code
if response.status_code in retry_on_status:
attempt += 1
if attempt < retry_count:
sleep_time = retry_delay * (retry_backoff ** attempt)
self.logger.warning(f"Request failed with status {response.status_code}. "
f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
continue
# If we shouldn't retry or have exhausted retries, raise a generic error
raise DmarketAPIError(
f"Request failed with status code {response.status_code}",
status_code=response.status_code,
response=response.text
)
except (ConnectionError, Timeout) as e:
# Network-related errors are generally retryable
attempt += 1
last_exception = e
if attempt < retry_count:
sleep_time = retry_delay * (retry_backoff ** attempt)
self.logger.warning(f"Network error: {str(e)}. Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
self.logger.error(f"Request failed after {retry_count} attempts due to network errors")
raise DmarketAPIError(f"Network error: {str(e)}")
except (AuthenticationError, ValidationError) as e:
# Don't retry auth/validation errors as they'll likely fail again
raise e
except RateLimitError as e:
# Rate limit errors need a longer backoff
attempt += 1
if attempt < retry_count:
# Use a longer delay for rate limit errors
sleep_time = retry_delay * (retry_backoff ** attempt) * 2
self.logger.warning(f"Rate limit exceeded. Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
raise e
except Exception as e:
# Unexpected errors
self.logger.error(f"Unexpected error during API request: {str(e)}")
raise
# If we've exhausted retries, raise the last exception
if last_exception:
raise DmarketAPIError(f"Request failed after {retry_count} attempts: {str(last_exception)}")
def _generate_signature(self, method, endpoint, body=None):
# Implementation of signature generation (as in previous examples)
# ...
def _parse_response(self, response):
"""Parse API response, handling JSON decode errors"""
try:
return response.json()
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse JSON response: {str(e)}")
self.logger.debug(f"Response text: {response.text}")
raise DmarketAPIError(f"Invalid JSON response: {str(e)}")
def _handle_error_response(self, response):
"""Handle specific error responses from the API"""
status_code = response.status_code
try:
error_data = response.json()
except json.JSONDecodeError:
error_data = {"message": response.text}
error_message = error_data.get("message", "Unknown error")
if status_code == 401:
raise AuthenticationError(
f"Authentication failed: {error_message}",
status_code=status_code,
response=error_data
)
elif status_code == 400:
raise ValidationError(
f"Invalid request: {error_message}",
status_code=status_code,
response=error_data
)
elif status_code == 429:
raise RateLimitError(
f"Rate limit exceeded: {error_message}",
status_code=status_code,
response=error_data
)
elif status_code >= 500:
raise ServerError(
f"Server error: {error_message}",
status_code=status_code,
response=error_data
)
Implementing Exponential Backoff
When encountering rate limits or server errors, implementing an exponential backoff strategy can help your application recover gracefully:
def exponential_backoff(base_delay, retry_count, jitter=0.1):
"""
Calculate delay time using exponential backoff with jitter
Args:
base_delay: Base delay in seconds
retry_count: Current retry attempt (0-based)
jitter: Random jitter factor (0-1) to add randomness to delay
Returns:
Delay time in seconds
"""
import random
# Calculate exponential delay: base_delay * 2^retry_count
delay = base_delay * (2 ** retry_count)
# Add random jitter to prevent thundering herd problem
if jitter > 0:
delay = delay * (1 + random.uniform(-jitter, jitter))
return delay
Pro Tip
Adding a small random jitter to your backoff times helps prevent the "thundering herd" problem, where multiple clients might retry at exactly the same moment after an error.
Understanding Dmarket's Rate Limits
Dmarket imposes rate limits to ensure fair API usage and system stability. While the exact limits may change over time, here are some general guidelines:
- Most endpoints are limited to 60-120 requests per minute
- Some high-volume endpoints (like market data) may have higher limits
- Trading endpoints (buy/sell) often have lower limits
- Rate limits are typically calculated on a rolling window basis
Implementing a Rate Limiter
To avoid hitting rate limits, we can implement a rate limiter that manages our API request frequency:
import time
import threading
from collections import deque
class RateLimiter:
"""
Rate limiter that tracks API calls and enforces limits
"""
def __init__(self, max_calls, time_frame):
"""
Initialize rate limiter
Args:
max_calls: Maximum number of calls allowed in the time frame
time_frame: Time frame in seconds
"""
self.max_calls = max_calls
self.time_frame = time_frame
self.calls = deque()
self.lock = threading.Lock()
def add_call(self):
"""Record a new API call"""
current_time = time.time()
# Acquire lock to make this thread-safe
with self.lock:
# Remove calls outside the time frame
while self.calls and self.calls[0] < current_time - self.time_frame:
self.calls.popleft()
# Add the new call
self.calls.append(current_time)
def wait_if_needed(self):
"""
Check if we need to wait before making another call
If needed, sleep until a call slot becomes available
Returns:
Time waited in seconds (0 if no wait was needed)
"""
current_time = time.time()
with self.lock:
# Clean up old calls first
while self.calls and self.calls[0] < current_time - self.time_frame:
self.calls.popleft()
if len(self.calls) < self.max_calls:
# We're under the limit, no need to wait
return 0
# Calculate required wait time: when the oldest call will expire
# plus a small buffer to account for processing time
wait_time = (self.calls[0] + self.time_frame) - current_time + 0.01
# Wait outside the lock to avoid blocking other threads
if wait_time > 0:
time.sleep(wait_time)
return wait_time
return 0
def __call__(self, func):
"""
Decorator to rate-limit a function
Example usage:
@rate_limiter
def api_call():
# function body
"""
def wrapper(*args, **kwargs):
self.wait_if_needed()
result = func(*args, **kwargs)
self.add_call()
return result
return wrapper
Here's how to integrate the rate limiter with our API client:
class RateLimitedDmarketAPI(EnhancedDmarketAPI):
"""API client with built-in rate limiting"""
def __init__(self, public_key, secret_key, api_url="https://api.dmarket.com", logger=None):
super().__init__(public_key, secret_key, api_url, logger)
# Create rate limiters for different endpoint types
self.market_rate_limiter = RateLimiter(max_calls=100, time_frame=60) # 100 per minute
self.trading_rate_limiter = RateLimiter(max_calls=30, time_frame=60) # 30 per minute
self.account_rate_limiter = RateLimiter(max_calls=60, time_frame=60) # 60 per minute
# Map endpoints to their respective rate limiters
self.endpoint_limiters = {
# Market data endpoints
"/exchange/v1/market/items": self.market_rate_limiter,
"/exchange/v1/market/items/": self.market_rate_limiter,
# Trading endpoints
"/exchange/v1/offers/buy": self.trading_rate_limiter,
"/exchange/v1/offers/create": self.trading_rate_limiter,
# Account endpoints
"/account/v1/balance": self.account_rate_limiter,
"/inventory/v1/user/items": self.account_rate_limiter
}
def make_request(self, method, endpoint, params=None, body=None, **kwargs):
"""Override make_request to apply appropriate rate limiting"""
# Find the right rate limiter for this endpoint
limiter = None
for prefix, rate_limiter in self.endpoint_limiters.items():
if endpoint.startswith(prefix):
limiter = rate_limiter
break
# Default to market rate limiter if not found
limiter = limiter or self.market_rate_limiter
# Apply rate limiting
wait_time = limiter.wait_if_needed()
if wait_time > 0:
self.logger.debug(f"Rate limiting: waited {wait_time:.2f}s before calling {endpoint}")
# Make the actual request
result = super().make_request(method, endpoint, params, body, **kwargs)
# Record this call
limiter.add_call()
return result
Optimizing API Usage
Beyond error handling and rate limiting, there are several strategies to optimize your API usage:
1. Implement Caching
Cache API responses to avoid redundant requests, especially for relatively static data:
import time
from functools import wraps
from typing import Dict, Any, Callable, Optional
class CacheEntry:
"""Entry in the cache with expiration time"""
def __init__(self, data: Any, expiry: float):
self.data = data
self.expiry = expiry
def is_expired(self) -> bool:
"""Check if this cache entry has expired"""
return time.time() > self.expiry
class ResponseCache:
"""Cache for API responses"""
def __init__(self):
self.cache: Dict[str, CacheEntry] = {}
def get(self, key: str) -> Optional[Any]:
"""Get item from cache if it exists and is not expired"""
if key in self.cache:
entry = self.cache[key]
if not entry.is_expired():
return entry.data
# Remove expired entry
del self.cache[key]
return None
def set(self, key: str, data: Any, ttl: int = 60) -> None:
"""
Add item to cache with expiration time
Args:
key: Cache key
data: Data to cache
ttl: Time to live in seconds
"""
expiry = time.time() + ttl
self.cache[key] = CacheEntry(data, expiry)
def clear(self) -> None:
"""Clear the entire cache"""
self.cache.clear()
def remove_expired(self) -> int:
"""Remove all expired entries from cache"""
expired_keys = [k for k, v in self.cache.items() if v.is_expired()]
for key in expired_keys:
del self.cache[key]
return len(expired_keys)
def cached(cache: ResponseCache, ttl: int = 60, key_func: Optional[Callable] = None):
"""
Decorator to cache function results
Args:
cache: Cache instance to use
ttl: Time to live in seconds
key_func: Function to generate cache key from args and kwargs
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
if key_func:
cache_key = key_func(*args, **kwargs)
else:
# Default key is function name + stringified args + kwargs
args_str = ','.join(str(arg) for arg in args)
kwargs_str = ','.join(f"{k}={v}" for k, v in sorted(kwargs.items()))
cache_key = f"{func.__name__}({args_str};{kwargs_str})"
# Check cache
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Example usage in API client
class CachedDmarketAPI(RateLimitedDmarketAPI):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = ResponseCache()
@cached(ttl=60) # Cache market data for 60 seconds
def get_market_items(self, game_id, title=None, limit=100):
"""Get items available on the market (with caching)"""
endpoint = "/exchange/v1/market/items"
params = {
'gameId': game_id,
'limit': limit
}
if title:
params['title'] = title
return self.make_request('GET', endpoint, params=params)
2. Batch Requests
Combine multiple operations into a single API request when possible:
def get_multiple_item_prices(self, item_ids, game_id):
"""
Get prices for multiple items in a single request
Args:
item_ids: List of item IDs
game_id: Game ID
Returns:
Dictionary mapping item IDs to their prices
"""
if not item_ids:
return {}
# If API supports batch requests, use it
endpoint = "/exchange/v1/market/items/batch"
body = {
'gameId': game_id,
'itemIds': item_ids
}
result = self.make_request('POST', endpoint, body=body)
# Process the results into a dictionary
prices = {}
for item in result.get('items', []):
item_id = item.get('itemId')
price = float(item.get('price', {}).get('amount', 0))
prices[item_id] = price
return prices
3. Use Pagination Efficiently
When retrieving large sets of data, use pagination to process results in chunks:
def get_all_inventory_items(self, game_id, batch_size=100):
"""
Get all inventory items using pagination
Args:
game_id: Game ID
batch_size: Number of items to fetch per request
Returns:
List of all inventory items
"""
all_items = []
offset = 0
while True:
endpoint = "/inventory/v1/user/items"
params = {
'gameId': game_id,
'limit': batch_size,
'offset': offset
}
batch = self.make_request('GET', endpoint, params=params)
# Add items from this batch
items = batch.get('objects', [])
all_items.extend(items)
# Check if we've retrieved all items
if len(items) < batch_size or batch.get('total', 0) <= offset + batch_size:
break
# Update offset for next batch
offset += batch_size
return all_items
Monitoring and Debugging
To effectively manage your API usage, implement comprehensive logging and monitoring:
import logging
import time
from functools import wraps
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("api_activity.log"),
logging.StreamHandler()
]
)
# Create loggers
api_logger = logging.getLogger('dmarket.api')
perf_logger = logging.getLogger('dmarket.performance')
def log_api_call(func):
"""Decorator to log API calls with timing information"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# Extract endpoint from args (assumes first arg is self, second is method, third is endpoint)
method = args[1] if len(args) > 1 else 'UNKNOWN'
endpoint = args[2] if len(args) > 2 else 'UNKNOWN'
api_logger.info(f"API Call: {method} {endpoint}")
try:
result = func(*args, **kwargs)
# Log success
elapsed = time.time() - start_time
perf_logger.info(f"API Call completed: {method} {endpoint} in {elapsed:.3f}s")
return result
except Exception as e:
# Log failure
elapsed = time.time() - start_time
api_logger.error(f"API Call failed: {method} {endpoint} in {elapsed:.3f}s - {str(e)}")
# Re-raise the exception
raise
return wrapper
# Apply to API client's make_request method
EnhancedDmarketAPI.make_request = log_api_call(EnhancedDmarketAPI.make_request)
Track These Metrics
- Request Success Rate: Percentage of successful API calls
- Response Times: Average and percentile response times
- Rate Limit Usage: How close you are to hitting limits
- Error Frequency: Types and frequency of errors
- Retry Statistics: How often retries are needed
Conclusion
Robust error handling and intelligent rate limiting are essential components of a reliable trading bot. By implementing the techniques outlined in this article, you can create a resilient application that gracefully handles errors, respects rate limits, and optimizes API usage.
Remember that even the best error handling can't compensate for poor API usage patterns. Always design your bot to be considerate of API resources, caching where appropriate, batching requests when possible, and distributing load across time to avoid hitting rate limits.
In our next article, we'll cover security best practices to protect your API keys and trading assets.