JSON-RPC Python Implementation Example
This example demonstrates how to implement JSON-RPC 2.0 clients and servers using Python. You'll learn to build robust, production-ready JSON-RPC applications with Flask, handle errors gracefully, and implement authentication.
What You'll Learn
- How to build a JSON-RPC server using Flask
- Creating a flexible Python client with requests library
- Error handling and validation techniques
- Authentication and security best practices
- Testing strategies for JSON-RPC applications
JSON-RPC Python Implementation
Build powerful JSON-RPC applications with Python, leveraging Flask for the server and requests for client communication.
Overview
Python provides excellent libraries for implementing JSON-RPC services. This guide shows how to create a complete JSON-RPC system using Flask for the server and the requests library for the client, with proper error handling and authentication.
Features We'll Build
- Flask-based JSON-RPC server
- Complete client implementation
- User management system
- Authentication middleware
- Comprehensive error handling
- Batch request support
Technologies Used
- Python 3.8+
- Flask web framework
- Requests library
- JWT for authentication
- SQLite for data storage
- Pytest for testing
Dependencies
First, let's set up our Python environment with the required dependencies.
requirements.txt
Flask==2.3.3
requests==2.31.0
PyJWT==2.8.0
cryptography==41.0.7
pytest==7.4.3
pytest-flask==1.3.0
python-dotenv==1.0.0
Installation: Run pip install -r requirements.txt
to install all dependencies.
Flask Server Implementation
Let's build a comprehensive JSON-RPC server with Flask that includes authentication, user management, and proper error handling.
server.py
import json
import jwt
import sqlite3
import hashlib
from datetime import datetime, timedelta
from functools import wraps
from flask import Flask, request, jsonify
from typing import Dict, Any, Optional, List
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-change-in-production'
# Initialize SQLite database
def init_db():
conn = sqlite3.connect('jsonrpc_example.db')
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# Create sessions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
session_data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Insert default admin user
admin_password = hashlib.sha256('admin123'.encode()).hexdigest()
cursor.execute('''
INSERT OR IGNORE INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', ('admin', '[email protected]', admin_password))
conn.commit()
conn.close()
# JSON-RPC Error Classes
class JsonRpcError(Exception):
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
super().__init__(self.message)
class ParseError(JsonRpcError):
def __init__(self, data: Any = None):
super().__init__(-32700, "Parse error", data)
class InvalidRequest(JsonRpcError):
def __init__(self, data: Any = None):
super().__init__(-32600, "Invalid Request", data)
class MethodNotFound(JsonRpcError):
def __init__(self, method: str):
super().__init__(-32601, "Method not found", {"method": method})
class InvalidParams(JsonRpcError):
def __init__(self, data: Any = None):
super().__init__(-32602, "Invalid params", data)
class InternalError(JsonRpcError):
def __init__(self, data: Any = None):
super().__init__(-32603, "Internal error", data)
# Custom application errors
class AuthenticationError(JsonRpcError):
def __init__(self, message: str = "Authentication failed"):
super().__init__(-32001, message)
class PermissionError(JsonRpcError):
def __init__(self, message: str = "Permission denied"):
super().__init__(-32002, message)
class UserNotFoundError(JsonRpcError):
def __init__(self, user_id: int):
super().__init__(-32003, "User not found", {"user_id": user_id})
# Helper functions
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode()).hexdigest()
def generate_token(user_id: int, username: str) -> str:
payload = {
'user_id': user_id,
'username': username,
'exp': datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def verify_token(token: str) -> Dict[str, Any]:
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token has expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
def get_db_connection():
conn = sqlite3.connect('jsonrpc_example.db')
conn.row_factory = sqlite3.Row
return conn
# Authentication decorator
def require_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
raise AuthenticationError("Missing or invalid authorization header")
token = auth_header.split(' ')[1]
user_data = verify_token(token)
request.user = user_data
return f(*args, **kwargs)
return decorated_function
# JSON-RPC Method Registry
class MethodRegistry:
def __init__(self):
self.methods = {}
self.protected_methods = set()
def register(self, name: str, protected: bool = False):
def decorator(func):
self.methods[name] = func
if protected:
self.protected_methods.add(name)
return func
return decorator
def get_method(self, name: str):
if name not in self.methods:
raise MethodNotFound(name)
return self.methods[name]
def is_protected(self, name: str) -> bool:
return name in self.protected_methods
registry = MethodRegistry()
# Public methods (no authentication required)
@registry.register('ping')
def ping() -> str:
return 'pong'
@registry.register('get_server_time')
def get_server_time() -> str:
return datetime.utcnow().isoformat()
@registry.register('echo')
def echo(message: Any) -> Any:
return message
@registry.register('add')
def add(a: float, b: float) -> float:
if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
raise InvalidParams({"message": "Both parameters must be numbers"})
return a + b
@registry.register('authenticate')
def authenticate(username: str, password: str) -> Dict[str, str]:
if not username or not password:
raise InvalidParams({"message": "Username and password are required"})
conn = get_db_connection()
cursor = conn.cursor()
password_hash = hash_password(password)
cursor.execute('''
SELECT id, username, email FROM users
WHERE username = ? AND password_hash = ? AND is_active = 1
''', (username, password_hash))
user = cursor.fetchone()
conn.close()
if not user:
raise AuthenticationError("Invalid username or password")
token = generate_token(user[0], user[1])
return {
"token": token,
"user": {
"id": user[0],
"username": user[1],
"email": user[2]
}
}
# Protected methods (authentication required)
@registry.register('get_current_user', protected=True)
def get_current_user() -> Dict[str, Any]:
user_id = request.user['user_id']
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, created_at FROM users
WHERE id = ? AND is_active = 1
''', (user_id,))
user = cursor.fetchone()
conn.close()
if not user:
raise UserNotFoundError(user_id)
return {
"id": user[0],
"username": user[1],
"email": user[2],
"created_at": user[3]
}
@registry.register('get_users', protected=True)
def get_users(limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
if limit > 100:
raise InvalidParams({"message": "Limit cannot exceed 100"})
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, created_at FROM users
WHERE is_active = 1
ORDER BY created_at DESC
LIMIT ? OFFSET ?
''', (limit, offset))
users = cursor.fetchall()
conn.close()
return [
{
"id": user[0],
"username": user[1],
"email": user[2],
"created_at": user[3]
}
for user in users
]
@registry.register('create_user', protected=True)
def create_user(username: str, email: str, password: str) -> Dict[str, Any]:
if not username or not email or not password:
raise InvalidParams({"message": "Username, email, and password are required"})
if len(password) < 6:
raise InvalidParams({"message": "Password must be at least 6 characters"})
conn = get_db_connection()
cursor = conn.cursor()
try:
password_hash = hash_password(password)
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
cursor.execute('''
SELECT id, username, email, created_at FROM users WHERE id = ?
''', (user_id,))
user = cursor.fetchone()
conn.close()
return {
"id": user[0],
"username": user[1],
"email": user[2],
"created_at": user[3]
}
except sqlite3.IntegrityError as e:
conn.close()
if 'username' in str(e):
raise InvalidParams({"message": "Username already exists"})
elif 'email' in str(e):
raise InvalidParams({"message": "Email already exists"})
else:
raise InternalError({"message": "Database constraint violation"})
@registry.register('update_user', protected=True)
def update_user(user_id: int, **updates) -> Dict[str, Any]:
allowed_fields = {'username', 'email', 'password'}
update_fields = {k: v for k, v in updates.items() if k in allowed_fields and v}
if not update_fields:
raise InvalidParams({"message": "No valid fields to update"})
# Hash password if updating
if 'password' in update_fields:
update_fields['password'] = hash_password(update_fields['password'])
# Update field name to match database column
update_fields['password_hash'] = update_fields.pop('password')
conn = get_db_connection()
cursor = conn.cursor()
# Build dynamic update query
set_clause = ', '.join(f"{field} = ?" for field in update_fields.keys())
values = list(update_fields.values()) + [user_id]
try:
cursor.execute(f'''
UPDATE users SET {set_clause} WHERE id = ? AND is_active = 1
''', values)
if cursor.rowcount == 0:
conn.close()
raise UserNotFoundError(user_id)
conn.commit()
# Return updated user
cursor.execute('''
SELECT id, username, email, created_at FROM users WHERE id = ?
''', (user_id,))
user = cursor.fetchone()
conn.close()
return {
"id": user[0],
"username": user[1],
"email": user[2],
"created_at": user[3]
}
except sqlite3.IntegrityError as e:
conn.close()
if 'username' in str(e):
raise InvalidParams({"message": "Username already exists"})
elif 'email' in str(e):
raise InvalidParams({"message": "Email already exists"})
else:
raise InternalError({"message": "Database constraint violation"})
@registry.register('delete_user', protected=True)
def delete_user(user_id: int) -> Dict[str, bool]:
conn = get_db_connection()
cursor = conn.cursor()
# Soft delete - set is_active to 0
cursor.execute('''
UPDATE users SET is_active = 0 WHERE id = ? AND is_active = 1
''', (user_id,))
if cursor.rowcount == 0:
conn.close()
raise UserNotFoundError(user_id)
conn.commit()
conn.close()
return {"success": True}
# Request processing functions
def create_response(request_id: Any, result: Any = None, error: Dict = None) -> Dict:
response = {"jsonrpc": "2.0", "id": request_id}
if error:
response["error"] = error
else:
response["result"] = result
return response
def create_error_response(request_id: Any, error: JsonRpcError) -> Dict:
error_dict = {
"code": error.code,
"message": error.message
}
if error.data is not None:
error_dict["data"] = error.data
return create_response(request_id, error=error_dict)
def process_single_request(req: Dict) -> Optional[Dict]:
request_id = req.get('id')
is_notification = request_id is None
try:
# Validate JSON-RPC format
if req.get('jsonrpc') != '2.0':
raise InvalidRequest({"message": "Invalid JSON-RPC version"})
if 'method' not in req:
raise InvalidRequest({"message": "Missing method"})
method_name = req['method']
params = req.get('params', {})
# Get method
method = registry.get_method(method_name)
# Check authentication for protected methods
if registry.is_protected(method_name):
# For protected methods, we need to check authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
raise AuthenticationError("Missing or invalid authorization header")
token = auth_header.split(' ')[1]
user_data = verify_token(token)
request.user = user_data
# Execute method
if isinstance(params, dict):
result = method(**params)
elif isinstance(params, list):
result = method(*params)
else:
result = method(params) if params is not None else method()
# Return response (None for notifications)
return None if is_notification else create_response(request_id, result)
except JsonRpcError as e:
if is_notification:
# Log error but don't respond for notifications
app.logger.error(f"Error in notification {req.get('method', 'unknown')}: {e}")
return None
return create_error_response(request_id, e)
except Exception as e:
if is_notification:
app.logger.error(f"Unexpected error in notification {req.get('method', 'unknown')}: {e}")
return None
app.logger.error(f"Unexpected error in method {req.get('method', 'unknown')}: {e}")
return create_error_response(request_id, InternalError())
# Main JSON-RPC endpoint
@app.route('/jsonrpc', methods=['POST'])
def jsonrpc_handler():
try:
# Parse JSON request
try:
data = request.get_json()
except Exception:
return jsonify(create_error_response(None, ParseError()))
if data is None:
return jsonify(create_error_response(None, InvalidRequest()))
# Handle batch requests
if isinstance(data, list):
if len(data) == 0:
return jsonify(create_error_response(None, InvalidRequest({"message": "Empty batch"})))
responses = []
for req in data:
response = process_single_request(req)
if response is not None: # Don't include notification responses
responses.append(response)
# If all requests were notifications, return 204 No Content
if len(responses) == 0:
return '', 204
return jsonify(responses)
# Handle single request
response = process_single_request(data)
# If it's a notification, return 204 No Content
if response is None:
return '', 204
return jsonify(response)
except Exception as e:
app.logger.error(f"Unexpected error in JSON-RPC handler: {e}")
return jsonify(create_error_response(None, InternalError()))
# Health check endpoint
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0"
})
# Initialize database and start server
if __name__ == '__main__':
init_db()
app.run(debug=True, host='0.0.0.0', port=5000)
Python Client Implementation
Now let's create a comprehensive Python client that can interact with our Flask server.
client.py
import requests
import json
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class JsonRpcError(Exception):
def __init__(self, code: int, message: str, data: Any = None):
self.code = code
self.message = message
self.data = data
super().__init__(f"JSON-RPC Error {code}: {message}")
class JsonRpcClient:
def __init__(self, url: str, timeout: int = 30):
self.url = url
self.timeout = timeout
self.session = requests.Session()
self.request_id = 1
self.auth_token = None
# Set default headers
self.session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'Python-JsonRPC-Client/1.0'
})
def _generate_id(self) -> int:
current_id = self.request_id
self.request_id += 1
return current_id
def set_auth_token(self, token: str):
"""Set authentication token for protected methods"""
self.auth_token = token
self.session.headers['Authorization'] = f'Bearer {token}'
def clear_auth_token(self):
"""Clear authentication token"""
self.auth_token = None
if 'Authorization' in self.session.headers:
del self.session.headers['Authorization']
def _send_request(self, payload: Union[Dict, List]) -> Union[Dict, List, None]:
"""Send HTTP request to JSON-RPC server"""
try:
response = self.session.post(
self.url,
json=payload,
timeout=self.timeout
)
# Handle 204 No Content (notifications)
if response.status_code == 204:
return None
# Check for HTTP errors
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
raise JsonRpcError(-32000, "Request timeout")
except requests.exceptions.ConnectionError:
raise JsonRpcError(-32001, "Connection error")
except requests.exceptions.HTTPError as e:
raise JsonRpcError(-32002, f"HTTP error: {e}")
except json.JSONDecodeError:
raise JsonRpcError(-32003, "Invalid JSON response")
def call(self, method: str, params: Any = None) -> Any:
"""Make a JSON-RPC call and return the result"""
request_payload = {
'jsonrpc': '2.0',
'method': method,
'id': self._generate_id()
}
if params is not None:
request_payload['params'] = params
logger.info(f"Calling method: {method}")
response = self._send_request(request_payload)
if response is None:
raise JsonRpcError(-32004, "Unexpected empty response")
if 'error' in response:
error = response['error']
raise JsonRpcError(
error['code'],
error['message'],
error.get('data')
)
return response.get('result')
def notify(self, method: str, params: Any = None):
"""Send a notification (no response expected)"""
notification_payload = {
'jsonrpc': '2.0',
'method': method
}
if params is not None:
notification_payload['params'] = params
logger.info(f"Sending notification: {method}")
self._send_request(notification_payload)
def batch_call(self, requests: List[Dict]) -> List[Dict]:
"""Send a batch of requests"""
batch_payload = []
for req in requests:
request_payload = {
'jsonrpc': '2.0',
'method': req['method']
}
if 'params' in req:
request_payload['params'] = req['params']
# Add ID if it's not a notification
if not req.get('notification', False):
request_payload['id'] = self._generate_id()
batch_payload.append(request_payload)
logger.info(f"Sending batch request with {len(batch_payload)} items")
response = self._send_request(batch_payload)
if response is None:
return [] # All notifications
# Process batch response
results = []
for res in response:
if 'error' in res:
results.append({
'error': JsonRpcError(
res['error']['code'],
res['error']['message'],
res['error'].get('data')
)
})
else:
results.append({'result': res.get('result')})
return results
# Convenience methods for authentication
def authenticate(self, username: str, password: str) -> Dict[str, Any]:
"""Authenticate and store token"""
result = self.call('authenticate', {
'username': username,
'password': password
})
# Store the token for future requests
self.set_auth_token(result['token'])
return result
def logout(self):
"""Clear authentication token"""
self.clear_auth_token()
logger.info("Logged out successfully")
# Usage examples and demonstration
class JsonRpcDemo:
def __init__(self, server_url: str = 'http://localhost:5000/jsonrpc'):
self.client = JsonRpcClient(server_url)
self.current_user = None
def run_demo(self):
"""Run a comprehensive demonstration of JSON-RPC functionality"""
print("=" * 60)
print("JSON-RPC Python Client Demo")
print("=" * 60)
try:
# Test basic methods
self.test_basic_methods()
# Test authentication
self.test_authentication()
# Test user management
self.test_user_management()
# Test batch requests
self.test_batch_requests()
# Test error handling
self.test_error_handling()
except Exception as e:
print(f"Demo failed with error: {e}")
finally:
self.client.logout()
def test_basic_methods(self):
print("
--- Testing Basic Methods ---")
# Test ping
result = self.client.call('ping')
print(f"Ping result: {result}")
# Test server time
server_time = self.client.call('get_server_time')
print(f"Server time: {server_time}")
# Test echo
echo_data = {"message": "Hello, JSON-RPC!", "timestamp": datetime.now().isoformat()}
echo_result = self.client.call('echo', echo_data)
print(f"Echo result: {echo_result}")
# Test arithmetic
sum_result = self.client.call('add', {'a': 15, 'b': 25})
print(f"15 + 25 = {sum_result}")
def test_authentication(self):
print("
--- Testing Authentication ---")
try:
# Authenticate with default admin credentials
auth_result = self.client.authenticate('admin', 'admin123')
self.current_user = auth_result['user']
print(f"Authentication successful: {auth_result['user']}")
# Test protected method
current_user = self.client.call('get_current_user')
print(f"Current user details: {current_user}")
except JsonRpcError as e:
print(f"Authentication failed: {e}")
def test_user_management(self):
print("
--- Testing User Management ---")
if not self.current_user:
print("Skipping user management tests - not authenticated")
return
try:
# Get all users
users = self.client.call('get_users', {'limit': 5})
print(f"Current users: {len(users)} found")
for user in users:
print(f" - {user['username']} ({user['email']})")
# Create a new user
new_user_data = {
'username': f'testuser_{int(datetime.now().timestamp())}',
'email': f'test_{int(datetime.now().timestamp())}@example.com',
'password': 'testpassword123'
}
new_user = self.client.call('create_user', new_user_data)
print(f"Created new user: {new_user}")
# Update the user
updated_user = self.client.call('update_user', {
'user_id': new_user['id'],
'email': f'updated_{int(datetime.now().timestamp())}@example.com'
})
print(f"Updated user: {updated_user}")
# Delete the user
delete_result = self.client.call('delete_user', {'user_id': new_user['id']})
print(f"Deleted user: {delete_result}")
except JsonRpcError as e:
print(f"User management error: {e}")
def test_batch_requests(self):
print("
--- Testing Batch Requests ---")
# Create a batch of requests
batch_requests = [
{'method': 'ping'},
{'method': 'get_server_time'},
{'method': 'add', 'params': {'a': 10, 'b': 20}},
{'method': 'echo', 'params': 'Batch request test'},
{'method': 'ping', 'notification': True} # This is a notification
]
try:
results = self.client.batch_call(batch_requests)
print(f"Batch request completed. Results:")
for i, result in enumerate(results):
if 'error' in result:
print(f" Request {i+1}: Error - {result['error']}")
else:
print(f" Request {i+1}: {result['result']}")
except JsonRpcError as e:
print(f"Batch request error: {e}")
def test_error_handling(self):
print("
--- Testing Error Handling ---")
# Test method not found
try:
self.client.call('nonexistent_method')
except JsonRpcError as e:
print(f"Method not found error: {e}")
# Test invalid parameters
try:
self.client.call('add', {'a': 'not_a_number', 'b': 5})
except JsonRpcError as e:
print(f"Invalid params error: {e}")
# Test authentication error (without token)
old_token = self.client.auth_token
self.client.clear_auth_token()
try:
self.client.call('get_current_user')
except JsonRpcError as e:
print(f"Authentication error: {e}")
# Restore token
if old_token:
self.client.set_auth_token(old_token)
# Additional utility functions
def health_check(server_url: str = 'http://localhost:5000') -> bool:
"""Check if the JSON-RPC server is healthy"""
try:
response = requests.get(f"{server_url}/health", timeout=5)
response.raise_for_status()
health_data = response.json()
print(f"Server health: {health_data}")
return health_data.get('status') == 'healthy'
except Exception as e:
print(f"Health check failed: {e}")
return False
def run_performance_test(server_url: str = 'http://localhost:5000/jsonrpc', num_requests: int = 100):
"""Run a simple performance test"""
import time
client = JsonRpcClient(server_url)
print(f"
--- Performance Test: {num_requests} requests ---")
start_time = time.time()
successful_requests = 0
failed_requests = 0
for i in range(num_requests):
try:
result = client.call('ping')
if result == 'pong':
successful_requests += 1
else:
failed_requests += 1
except Exception:
failed_requests += 1
end_time = time.time()
duration = end_time - start_time
print(f"Total time: {duration:.2f} seconds")
print(f"Successful requests: {successful_requests}")
print(f"Failed requests: {failed_requests}")
print(f"Requests per second: {num_requests / duration:.2f}")
print(f"Average response time: {(duration / num_requests) * 1000:.2f} ms")
if __name__ == '__main__':
# Check server health
if health_check():
# Run the demo
demo = JsonRpcDemo()
demo.run_demo()
# Run performance test
run_performance_test(num_requests=50)
else:
print("Server is not available. Please start the server first.")
Testing Implementation
Let's create comprehensive tests for our JSON-RPC implementation using pytest.
test_jsonrpc.py
import pytest
import json
import tempfile
import os
from server import app, init_db
from client import JsonRpcClient, JsonRpcError
class TestJsonRpcServer:
@pytest.fixture
def client(self):
# Create a temporary database for testing
db_fd, app.config['DATABASE'] = tempfile.mkstemp()
app.config['TESTING'] = True
with app.test_client() as client:
with app.app_context():
init_db()
yield client
os.close(db_fd)
os.unlink(app.config['DATABASE'])
def test_health_endpoint(self, client):
"""Test the health check endpoint"""
response = client.get('/health')
assert response.status_code == 200
data = json.loads(response.data)
assert data['status'] == 'healthy'
def test_ping_method(self, client):
"""Test the ping method"""
payload = {
'jsonrpc': '2.0',
'method': 'ping',
'id': 1
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['result'] == 'pong'
assert data['id'] == 1
def test_add_method(self, client):
"""Test the add method"""
payload = {
'jsonrpc': '2.0',
'method': 'add',
'params': {'a': 5, 'b': 3},
'id': 2
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert data['result'] == 8
def test_method_not_found(self, client):
"""Test method not found error"""
payload = {
'jsonrpc': '2.0',
'method': 'nonexistent_method',
'id': 3
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert 'error' in data
assert data['error']['code'] == -32601
def test_invalid_params(self, client):
"""Test invalid parameters error"""
payload = {
'jsonrpc': '2.0',
'method': 'add',
'params': {'a': 'not_a_number', 'b': 5},
'id': 4
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert 'error' in data
assert data['error']['code'] == -32602
def test_notification(self, client):
"""Test notification (no response expected)"""
payload = {
'jsonrpc': '2.0',
'method': 'ping'
# No 'id' field makes this a notification
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 204 # No Content
def test_batch_request(self, client):
"""Test batch requests"""
payload = [
{'jsonrpc': '2.0', 'method': 'ping', 'id': 1},
{'jsonrpc': '2.0', 'method': 'add', 'params': {'a': 2, 'b': 3}, 'id': 2},
{'jsonrpc': '2.0', 'method': 'ping'} # Notification
]
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert len(data) == 2 # Only non-notification responses
assert data[0]['result'] == 'pong'
assert data[1]['result'] == 5
def test_authentication(self, client):
"""Test authentication flow"""
# First, authenticate
auth_payload = {
'jsonrpc': '2.0',
'method': 'authenticate',
'params': {'username': 'admin', 'password': 'admin123'},
'id': 1
}
response = client.post('/jsonrpc',
data=json.dumps(auth_payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert 'result' in data
assert 'token' in data['result']
token = data['result']['token']
# Now test a protected method
protected_payload = {
'jsonrpc': '2.0',
'method': 'get_current_user',
'id': 2
}
response = client.post('/jsonrpc',
data=json.dumps(protected_payload),
content_type='application/json',
headers={'Authorization': f'Bearer {token}'})
assert response.status_code == 200
data = json.loads(response.data)
assert 'result' in data
assert data['result']['username'] == 'admin'
def test_protected_method_without_auth(self, client):
"""Test protected method without authentication"""
payload = {
'jsonrpc': '2.0',
'method': 'get_current_user',
'id': 1
}
response = client.post('/jsonrpc',
data=json.dumps(payload),
content_type='application/json')
assert response.status_code == 200
data = json.loads(response.data)
assert 'error' in data
assert data['error']['code'] == -32001 # Authentication error
class TestJsonRpcClient:
@pytest.fixture
def mock_server_url(self):
return 'http://localhost:5000/jsonrpc'
def test_client_initialization(self, mock_server_url):
"""Test client initialization"""
client = JsonRpcClient(mock_server_url)
assert client.url == mock_server_url
assert client.auth_token is None
def test_token_management(self, mock_server_url):
"""Test authentication token management"""
client = JsonRpcClient(mock_server_url)
# Set token
test_token = 'test_token_123'
client.set_auth_token(test_token)
assert client.auth_token == test_token
assert client.session.headers['Authorization'] == f'Bearer {test_token}'
# Clear token
client.clear_auth_token()
assert client.auth_token is None
assert 'Authorization' not in client.session.headers
# Integration tests (require running server)
class TestIntegration:
@pytest.fixture
def client(self):
return JsonRpcClient('http://localhost:5000/jsonrpc')
@pytest.mark.integration
def test_full_workflow(self, client):
"""Test complete workflow including authentication and user management"""
try:
# Test basic method
result = client.call('ping')
assert result == 'pong'
# Authenticate
auth_result = client.authenticate('admin', 'admin123')
assert 'token' in auth_result
assert 'user' in auth_result
# Test protected method
user = client.call('get_current_user')
assert user['username'] == 'admin'
# Test user creation
import time
unique_id = int(time.time())
new_user = client.call('create_user', {
'username': f'test_user_{unique_id}',
'email': f'test_{unique_id}@example.com',
'password': 'test123456'
})
assert new_user['username'] == f'test_user_{unique_id}'
# Clean up
client.call('delete_user', {'user_id': new_user['id']})
except JsonRpcError as e:
pytest.fail(f"Integration test failed: {e}")
@pytest.mark.integration
def test_error_handling(self, client):
"""Test error handling in integration environment"""
# Test method not found
with pytest.raises(JsonRpcError) as exc_info:
client.call('nonexistent_method')
assert exc_info.value.code == -32601
# Test invalid params
with pytest.raises(JsonRpcError) as exc_info:
client.call('add', {'a': 'not_a_number', 'b': 5})
assert exc_info.value.code == -32602
if __name__ == '__main__':
# Run tests
pytest.main([__file__, '-v'])
Running the Examples
Step 1: Setup Environment
# Create project directory
mkdir jsonrpc-python-example
cd jsonrpc-python-example
# Create virtual environment
python -m venv venv
# Activate virtual environment
# On Windows:
venvScriptsactivate
# On macOS/Linux:
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
Step 2: Start the Server
# Start the Flask server
python server.py
# The server will start on http://localhost:5000
# JSON-RPC endpoint: http://localhost:5000/jsonrpc
# Health check: http://localhost:5000/health
Step 3: Run the Client Demo
# In a new terminal, run the client demo
python client.py
# Or run specific tests
python -m pytest test_jsonrpc.py -v
# Run integration tests (requires running server)
python -m pytest test_jsonrpc.py::TestIntegration -v -m integration
Python JSON-RPC Best Practices
✅ Best Practices
- Use type hints for better code documentation and IDE support
- Implement proper error handling with custom exception classes
- Use authentication tokens with expiration for security
- Validate input parameters in all method handlers
- Use connection pooling for database operations
- Implement request logging for debugging and monitoring
- Use environment variables for configuration
- Write comprehensive tests including integration tests
❌ Common Pitfalls
- Don't expose sensitive information in error messages
- Don't use weak authentication mechanisms
- Don't ignore request validation
- Don't use synchronous operations for I/O-heavy tasks
- Don't hardcode secret keys in source code
- Don't skip logging important events
- Don't forget to handle database connection errors