Python / Flask Integration
Integrate ConsentKeys authentication into your Python Flask application using standard OAuth 2.0 / OIDC protocols.
Prerequisites
- A ConsentKeys Client ID and Secret from the Developer Portal
- Python 3.8+
- Flask 2.0+
Redirect URI Configuration
In the Developer Portal, configure your redirect URI to point to your Flask app (e.g., http://localhost:5000/auth/callback for local dev or https://yourapp.com/auth/callback for production).
Installation
pip install Flask requests PyJWT cryptography
Step 1: Flask App Setup
app.py
from flask import Flask, redirect, session, url_for, request, jsonify
from functools import wraps
import os
import secrets
import hashlib
import base64
import requests
from urllib.parse import urlencode
from jwt import PyJWKClient, decode as jwt_decode
app = Flask(__name__)
app.secret_key = os.environ.get('SESSION_SECRET', secrets.token_hex(32))
# ConsentKeys configuration
CONSENTKEYS_BASE_URL = 'https://pseudoidc.consentkeys.com'
CLIENT_ID = os.environ.get('CONSENTKEYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('CONSENTKEYS_CLIENT_SECRET')
REDIRECT_URI = 'http://localhost:5000/auth/callback'
# Endpoints
AUTH_URL = f'{CONSENTKEYS_BASE_URL}/auth'
TOKEN_URL = f'{CONSENTKEYS_BASE_URL}/token'
USERINFO_URL = f'{CONSENTKEYS_BASE_URL}/userinfo'
JWKS_URL = f'{CONSENTKEYS_BASE_URL}/.well-known/jwks.json'
# Initialize JWKS client for token verification
jwks_client = PyJWKClient(JWKS_URL)
Step 2: PKCE Implementation
utils/pkce.py
import secrets
import hashlib
import base64
def generate_code_verifier():
"""Generate a random code verifier for PKCE"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
def generate_code_challenge(verifier):
"""Generate code challenge from verifier"""
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
def generate_state():
"""Generate random state for CSRF protection"""
return secrets.token_urlsafe(16)
Step 3: Authentication Routes
routes/auth.py
from flask import Blueprint, redirect, session, url_for, request, jsonify
from utils.pkce import generate_code_verifier, generate_code_challenge, generate_state
import requests
from urllib.parse import urlencode
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login')
def login():
"""Initiate OAuth login flow"""
# Generate PKCE parameters
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()
# Store in session
session['code_verifier'] = code_verifier
session['oauth_state'] = state
# Build authorization URL
params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}
auth_url = f'{AUTH_URL}?{urlencode(params)}'
return redirect(auth_url)
@auth_bp.route('/callback')
def callback():
"""Handle OAuth callback"""
# Check for errors
error = request.args.get('error')
if error:
error_description = request.args.get('error_description', error)
return jsonify({'error': error_description}), 400
# Get authorization code and state
code = request.args.get('code')
state = request.args.get('state')
if not code or not state:
return jsonify({'error': 'Missing code or state'}), 400
# Verify state (CSRF protection)
stored_state = session.get('oauth_state')
if not stored_state or state != stored_state:
return jsonify({'error': 'Invalid state parameter'}), 400
# Get code verifier
code_verifier = session.get('code_verifier')
if not code_verifier:
return jsonify({'error': 'Missing code verifier'}), 400
try:
# Exchange code for tokens
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code_verifier': code_verifier,
}
token_response = requests.post(TOKEN_URL, data=token_data)
token_response.raise_for_status()
tokens = token_response.json()
# Verify ID token
signing_key = jwks_client.get_signing_key_from_jwt(tokens['id_token'])
payload = jwt_decode(
tokens['id_token'],
signing_key.key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=CONSENTKEYS_BASE_URL,
)
# Get user info
userinfo_response = requests.get(
USERINFO_URL,
headers={'Authorization': f"Bearer {tokens['access_token']}"}
)
userinfo_response.raise_for_status()
user_info = userinfo_response.json()
# Store in session
session['access_token'] = tokens['access_token']
session['user'] = user_info
# Clean up temporary session data
session.pop('code_verifier', None)
session.pop('oauth_state', None)
return redirect(url_for('dashboard'))
except requests.exceptions.RequestException as e:
return jsonify({'error': f'Token exchange failed: {str(e)}'}), 500
except Exception as e:
return jsonify({'error': f'Authentication failed: {str(e)}'}), 500
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""Clear session and logout"""
session.clear()
return jsonify({'success': True})
@auth_bp.route('/me')
def me():
"""Get current user"""
user = session.get('user')
if not user:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify({'user': user})
Step 4: Protected Route Decorator
utils/auth.py
from flask import session, redirect, url_for, jsonify
from functools import wraps
def login_required(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
# For API routes, return 401
if request.path.startswith('/api/'):
return jsonify({'error': 'Authentication required'}), 401
# For page routes, redirect to login
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function
Step 5: Application Routes
app.py (continued)
from routes.auth import auth_bp
from utils.auth import login_required
# Register blueprints
app.register_blueprint(auth_bp, url_prefix='/auth')
@app.route('/')
def index():
"""Home page"""
user = session.get('user')
if user:
return redirect(url_for('dashboard'))
return '''
<html>
<head><title>Welcome</title></head>
<body>
<h1>Welcome to My App</h1>
<p>Sign in to get started</p>
<a href="/auth/login">
<button>Sign in with ConsentKeys</button>
</a>
</body>
</html>
'''
@app.route('/dashboard')
@login_required
def dashboard():
"""Protected dashboard"""
user = session.get('user')
return f'''
<html>
<head><title>Dashboard</title></head>
<body>
<h1>Dashboard</h1>
<div>
<p>Welcome, {user.get('name', user.get('email'))}!</p>
<p>Email: {user.get('email')}</p>
<p>ID: {user.get('sub')}</p>
</div>
<form action="/auth/logout" method="post">
<button type="submit">Logout</button>
</form>
</body>
</html>
'''
@app.route('/api/data')
@login_required
def api_data():
"""Protected API endpoint"""
return jsonify({
'message': 'This is protected data',
'user': session.get('user')
})
if __name__ == '__main__':
app.run(debug=True, port=5000)
Step 6: Token Refresh (Optional)
utils/tokens.py
import requests
from datetime import datetime, timedelta
def refresh_access_token(refresh_token):
"""Refresh an expired access token"""
token_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
}
response = requests.post(TOKEN_URL, data=token_data)
response.raise_for_status()
return response.json()
def ensure_valid_token():
"""Ensure the access token is valid, refresh if needed"""
expires_at = session.get('token_expires_at')
if not expires_at or datetime.now() >= expires_at:
refresh_token = session.get('refresh_token')
if not refresh_token:
# No refresh token, need to re-authenticate
return False
try:
tokens = refresh_access_token(refresh_token)
session['access_token'] = tokens['access_token']
session['token_expires_at'] = datetime.now() + timedelta(
seconds=tokens['expires_in']
)
return True
except:
return False
return True
Complete Working Example
app.py (full)
from flask import Flask, redirect, session, url_for, request, jsonify, render_template_string
from functools import wraps
import os
import secrets
import hashlib
import base64
import requests
from urllib.parse import urlencode
from jwt import PyJWKClient, decode as jwt_decode
app = Flask(__name__)
app.secret_key = os.environ.get('SESSION_SECRET', secrets.token_hex(32))
# Configuration
CONSENTKEYS_BASE_URL = 'https://pseudoidc.consentkeys.com'
CLIENT_ID = os.environ.get('CONSENTKEYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('CONSENTKEYS_CLIENT_SECRET')
REDIRECT_URI = 'http://localhost:5000/auth/callback'
AUTH_URL = f'{CONSENTKEYS_BASE_URL}/auth'
TOKEN_URL = f'{CONSENTKEYS_BASE_URL}/token'
USERINFO_URL = f'{CONSENTKEYS_BASE_URL}/userinfo'
JWKS_URL = f'{CONSENTKEYS_BASE_URL}/.well-known/jwks.json'
jwks_client = PyJWKClient(JWKS_URL)
# PKCE utilities
def generate_code_verifier():
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')
def generate_code_challenge(verifier):
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')
def generate_state():
return secrets.token_urlsafe(16)
# Auth decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
if request.path.startswith('/api/'):
return jsonify({'error': 'Authentication required'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# Routes
@app.route('/')
def index():
if 'user' in session:
return redirect(url_for('dashboard'))
return '<h1>Welcome</h1><a href="/login"><button>Sign In</button></a>'
@app.route('/login')
def login():
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()
session['code_verifier'] = code_verifier
session['oauth_state'] = state
params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}
return redirect(f'{AUTH_URL}?{urlencode(params)}')
@app.route('/auth/callback')
def callback():
error = request.args.get('error')
if error:
return jsonify({'error': request.args.get('error_description', error)}), 400
code = request.args.get('code')
state = request.args.get('state')
if state != session.get('oauth_state'):
return jsonify({'error': 'Invalid state'}), 400
code_verifier = session.get('code_verifier')
try:
token_response = requests.post(TOKEN_URL, data={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code_verifier': code_verifier,
})
token_response.raise_for_status()
tokens = token_response.json()
signing_key = jwks_client.get_signing_key_from_jwt(tokens['id_token'])
payload = jwt_decode(
tokens['id_token'],
signing_key.key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=CONSENTKEYS_BASE_URL,
)
userinfo_response = requests.get(
USERINFO_URL,
headers={'Authorization': f"Bearer {tokens['access_token']}"}
)
user_info = userinfo_response.json()
session['access_token'] = tokens['access_token']
session['user'] = user_info
session.pop('code_verifier', None)
session.pop('oauth_state', None)
return redirect(url_for('dashboard'))
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/dashboard')
@login_required
def dashboard():
user = session['user']
return f'''
<h1>Dashboard</h1>
<p>Welcome, {user.get('name', user.get('email'))}!</p>
<p>Email: {user.get('email')}</p>
<form action="/logout" method="post">
<button>Logout</button>
</form>
'''
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return redirect(url_for('index'))
if __name__ == '__main__':
app.run(debug=True, port=5000)
Environment Variables
.env
CONSENTKEYS_CLIENT_ID=ck_your_client_id
CONSENTKEYS_CLIENT_SECRET=your_client_secret
SESSION_SECRET=your-secret-key-at-least-32-characters
FLASK_ENV=development
Load them with python-dotenv:
from dotenv import load_dotenv
load_dotenv()
Testing
-
Start ConsentKeys backend:
cd backend
npm run dev -
Start Flask app:
export FLASK_APP=app.py
flask run -
Visit
http://localhost:5000
Django Integration
For Django, use similar principles:
views.py
from django.shortcuts import redirect
from django.http import JsonResponse
import requests
def login(request):
# Generate PKCE and state
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()
# Store in session
request.session['code_verifier'] = code_verifier
request.session['oauth_state'] = state
# Build auth URL
params = {
'response_type': 'code',
'client_id': settings.CONSENTKEYS_CLIENT_ID,
'redirect_uri': settings.REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}
auth_url = f"{settings.AUTH_URL}?{urlencode(params)}"
return redirect(auth_url)
def callback(request):
# Similar token exchange logic
code = request.GET.get('code')
state = request.GET.get('state')
# Verify state and exchange code
# ... (same as Flask example)
return redirect('dashboard')
Security Checklist
- ✅ Client secret stored in environment variables
- ✅ PKCE implemented for authorization code flow
- ✅ State parameter verified for CSRF protection
- ✅ ID token signature verified with JWKS
- ✅ Session cookies are httpOnly
- ✅ HTTPS in production
Troubleshooting
"Invalid signature"
- Ensure JWKS URL is correct and accessible
- Verify token hasn't expired
"State mismatch"
- Clear session and try again
- Ensure session persistence is configured
"Token exchange failed"
- Verify client credentials
- Check redirect URI matches exactly
Next Steps
- Understand the authentication flow
- Learn about magic links
- Check the API reference