Skip to main content

Python / Flask Integration

Integrate ConsentKeys authentication into your Python Flask application using standard OAuth 2.0 / OIDC protocols.

Prerequisites

  • A ConsentKeys Client ID and Secret from the Developer Portal
  • Python 3.8+
  • Flask 2.0+
Redirect URI Configuration

In the Developer Portal, configure your redirect URI to point to your Flask app (e.g., http://localhost:5000/auth/callback for local dev or https://yourapp.com/auth/callback for production).

Installation

pip install Flask requests PyJWT cryptography

Step 1: Flask App Setup

app.py
from flask import Flask, redirect, session, url_for, request, jsonify
from functools import wraps
import os
import secrets
import hashlib
import base64
import requests
from urllib.parse import urlencode
from jwt import PyJWKClient, decode as jwt_decode

app = Flask(__name__)
app.secret_key = os.environ.get('SESSION_SECRET', secrets.token_hex(32))

# ConsentKeys configuration
CONSENTKEYS_BASE_URL = 'https://pseudoidc.consentkeys.com'
CLIENT_ID = os.environ.get('CONSENTKEYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('CONSENTKEYS_CLIENT_SECRET')
REDIRECT_URI = 'http://localhost:5000/auth/callback'

# Endpoints
AUTH_URL = f'{CONSENTKEYS_BASE_URL}/auth'
TOKEN_URL = f'{CONSENTKEYS_BASE_URL}/token'
USERINFO_URL = f'{CONSENTKEYS_BASE_URL}/userinfo'
JWKS_URL = f'{CONSENTKEYS_BASE_URL}/.well-known/jwks.json'

# Initialize JWKS client for token verification
jwks_client = PyJWKClient(JWKS_URL)

Step 2: PKCE Implementation

utils/pkce.py
import secrets
import hashlib
import base64

def generate_code_verifier():
"""Generate a random code verifier for PKCE"""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')

def generate_code_challenge(verifier):
"""Generate code challenge from verifier"""
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')

def generate_state():
"""Generate random state for CSRF protection"""
return secrets.token_urlsafe(16)

Step 3: Authentication Routes

routes/auth.py
from flask import Blueprint, redirect, session, url_for, request, jsonify
from utils.pkce import generate_code_verifier, generate_code_challenge, generate_state
import requests
from urllib.parse import urlencode

auth_bp = Blueprint('auth', __name__)

@auth_bp.route('/login')
def login():
"""Initiate OAuth login flow"""
# Generate PKCE parameters
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()

# Store in session
session['code_verifier'] = code_verifier
session['oauth_state'] = state

# Build authorization URL
params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}

auth_url = f'{AUTH_URL}?{urlencode(params)}'
return redirect(auth_url)

@auth_bp.route('/callback')
def callback():
"""Handle OAuth callback"""
# Check for errors
error = request.args.get('error')
if error:
error_description = request.args.get('error_description', error)
return jsonify({'error': error_description}), 400

# Get authorization code and state
code = request.args.get('code')
state = request.args.get('state')

if not code or not state:
return jsonify({'error': 'Missing code or state'}), 400

# Verify state (CSRF protection)
stored_state = session.get('oauth_state')
if not stored_state or state != stored_state:
return jsonify({'error': 'Invalid state parameter'}), 400

# Get code verifier
code_verifier = session.get('code_verifier')
if not code_verifier:
return jsonify({'error': 'Missing code verifier'}), 400

try:
# Exchange code for tokens
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code_verifier': code_verifier,
}

token_response = requests.post(TOKEN_URL, data=token_data)
token_response.raise_for_status()
tokens = token_response.json()

# Verify ID token
signing_key = jwks_client.get_signing_key_from_jwt(tokens['id_token'])
payload = jwt_decode(
tokens['id_token'],
signing_key.key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=CONSENTKEYS_BASE_URL,
)

# Get user info
userinfo_response = requests.get(
USERINFO_URL,
headers={'Authorization': f"Bearer {tokens['access_token']}"}
)
userinfo_response.raise_for_status()
user_info = userinfo_response.json()

# Store in session
session['access_token'] = tokens['access_token']
session['user'] = user_info

# Clean up temporary session data
session.pop('code_verifier', None)
session.pop('oauth_state', None)

return redirect(url_for('dashboard'))

except requests.exceptions.RequestException as e:
return jsonify({'error': f'Token exchange failed: {str(e)}'}), 500
except Exception as e:
return jsonify({'error': f'Authentication failed: {str(e)}'}), 500

@auth_bp.route('/logout', methods=['POST'])
def logout():
"""Clear session and logout"""
session.clear()
return jsonify({'success': True})

@auth_bp.route('/me')
def me():
"""Get current user"""
user = session.get('user')
if not user:
return jsonify({'error': 'Not authenticated'}), 401
return jsonify({'user': user})

Step 4: Protected Route Decorator

utils/auth.py
from flask import session, redirect, url_for, jsonify
from functools import wraps

def login_required(f):
"""Decorator to require authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
# For API routes, return 401
if request.path.startswith('/api/'):
return jsonify({'error': 'Authentication required'}), 401
# For page routes, redirect to login
return redirect(url_for('auth.login'))
return f(*args, **kwargs)
return decorated_function

Step 5: Application Routes

app.py (continued)
from routes.auth import auth_bp
from utils.auth import login_required

# Register blueprints
app.register_blueprint(auth_bp, url_prefix='/auth')

@app.route('/')
def index():
"""Home page"""
user = session.get('user')
if user:
return redirect(url_for('dashboard'))

return '''
<html>
<head><title>Welcome</title></head>
<body>
<h1>Welcome to My App</h1>
<p>Sign in to get started</p>
<a href="/auth/login">
<button>Sign in with ConsentKeys</button>
</a>
</body>
</html>
'''

@app.route('/dashboard')
@login_required
def dashboard():
"""Protected dashboard"""
user = session.get('user')

return f'''
<html>
<head><title>Dashboard</title></head>
<body>
<h1>Dashboard</h1>
<div>
<p>Welcome, {user.get('name', user.get('email'))}!</p>
<p>Email: {user.get('email')}</p>
<p>ID: {user.get('sub')}</p>
</div>
<form action="/auth/logout" method="post">
<button type="submit">Logout</button>
</form>
</body>
</html>
'''

@app.route('/api/data')
@login_required
def api_data():
"""Protected API endpoint"""
return jsonify({
'message': 'This is protected data',
'user': session.get('user')
})

if __name__ == '__main__':
app.run(debug=True, port=5000)

Step 6: Token Refresh (Optional)

utils/tokens.py
import requests
from datetime import datetime, timedelta

def refresh_access_token(refresh_token):
"""Refresh an expired access token"""
token_data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
}

response = requests.post(TOKEN_URL, data=token_data)
response.raise_for_status()
return response.json()

def ensure_valid_token():
"""Ensure the access token is valid, refresh if needed"""
expires_at = session.get('token_expires_at')

if not expires_at or datetime.now() >= expires_at:
refresh_token = session.get('refresh_token')
if not refresh_token:
# No refresh token, need to re-authenticate
return False

try:
tokens = refresh_access_token(refresh_token)
session['access_token'] = tokens['access_token']
session['token_expires_at'] = datetime.now() + timedelta(
seconds=tokens['expires_in']
)
return True
except:
return False

return True

Complete Working Example

app.py (full)
from flask import Flask, redirect, session, url_for, request, jsonify, render_template_string
from functools import wraps
import os
import secrets
import hashlib
import base64
import requests
from urllib.parse import urlencode
from jwt import PyJWKClient, decode as jwt_decode

app = Flask(__name__)
app.secret_key = os.environ.get('SESSION_SECRET', secrets.token_hex(32))

# Configuration
CONSENTKEYS_BASE_URL = 'https://pseudoidc.consentkeys.com'
CLIENT_ID = os.environ.get('CONSENTKEYS_CLIENT_ID')
CLIENT_SECRET = os.environ.get('CONSENTKEYS_CLIENT_SECRET')
REDIRECT_URI = 'http://localhost:5000/auth/callback'

AUTH_URL = f'{CONSENTKEYS_BASE_URL}/auth'
TOKEN_URL = f'{CONSENTKEYS_BASE_URL}/token'
USERINFO_URL = f'{CONSENTKEYS_BASE_URL}/userinfo'
JWKS_URL = f'{CONSENTKEYS_BASE_URL}/.well-known/jwks.json'

jwks_client = PyJWKClient(JWKS_URL)

# PKCE utilities
def generate_code_verifier():
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode('utf-8').rstrip('=')

def generate_code_challenge(verifier):
digest = hashlib.sha256(verifier.encode('utf-8')).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8').rstrip('=')

def generate_state():
return secrets.token_urlsafe(16)

# Auth decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user' not in session:
if request.path.startswith('/api/'):
return jsonify({'error': 'Authentication required'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function

# Routes
@app.route('/')
def index():
if 'user' in session:
return redirect(url_for('dashboard'))
return '<h1>Welcome</h1><a href="/login"><button>Sign In</button></a>'

@app.route('/login')
def login():
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()

session['code_verifier'] = code_verifier
session['oauth_state'] = state

params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}

return redirect(f'{AUTH_URL}?{urlencode(params)}')

@app.route('/auth/callback')
def callback():
error = request.args.get('error')
if error:
return jsonify({'error': request.args.get('error_description', error)}), 400

code = request.args.get('code')
state = request.args.get('state')

if state != session.get('oauth_state'):
return jsonify({'error': 'Invalid state'}), 400

code_verifier = session.get('code_verifier')

try:
token_response = requests.post(TOKEN_URL, data={
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'code_verifier': code_verifier,
})
token_response.raise_for_status()
tokens = token_response.json()

signing_key = jwks_client.get_signing_key_from_jwt(tokens['id_token'])
payload = jwt_decode(
tokens['id_token'],
signing_key.key,
algorithms=['RS256'],
audience=CLIENT_ID,
issuer=CONSENTKEYS_BASE_URL,
)

userinfo_response = requests.get(
USERINFO_URL,
headers={'Authorization': f"Bearer {tokens['access_token']}"}
)
user_info = userinfo_response.json()

session['access_token'] = tokens['access_token']
session['user'] = user_info
session.pop('code_verifier', None)
session.pop('oauth_state', None)

return redirect(url_for('dashboard'))

except Exception as e:
return jsonify({'error': str(e)}), 500

@app.route('/dashboard')
@login_required
def dashboard():
user = session['user']
return f'''
<h1>Dashboard</h1>
<p>Welcome, {user.get('name', user.get('email'))}!</p>
<p>Email: {user.get('email')}</p>
<form action="/logout" method="post">
<button>Logout</button>
</form>
'''

@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return redirect(url_for('index'))

if __name__ == '__main__':
app.run(debug=True, port=5000)

Environment Variables

.env
CONSENTKEYS_CLIENT_ID=ck_your_client_id
CONSENTKEYS_CLIENT_SECRET=your_client_secret
SESSION_SECRET=your-secret-key-at-least-32-characters
FLASK_ENV=development

Load them with python-dotenv:

from dotenv import load_dotenv
load_dotenv()

Testing

  1. Start ConsentKeys backend:

    cd backend
    npm run dev
  2. Start Flask app:

    export FLASK_APP=app.py
    flask run
  3. Visit http://localhost:5000

Django Integration

For Django, use similar principles:

views.py
from django.shortcuts import redirect
from django.http import JsonResponse
import requests

def login(request):
# Generate PKCE and state
code_verifier = generate_code_verifier()
code_challenge = generate_code_challenge(code_verifier)
state = generate_state()

# Store in session
request.session['code_verifier'] = code_verifier
request.session['oauth_state'] = state

# Build auth URL
params = {
'response_type': 'code',
'client_id': settings.CONSENTKEYS_CLIENT_ID,
'redirect_uri': settings.REDIRECT_URI,
'scope': 'openid profile email',
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
}

auth_url = f"{settings.AUTH_URL}?{urlencode(params)}"
return redirect(auth_url)

def callback(request):
# Similar token exchange logic
code = request.GET.get('code')
state = request.GET.get('state')

# Verify state and exchange code
# ... (same as Flask example)

return redirect('dashboard')

Security Checklist

  • ✅ Client secret stored in environment variables
  • ✅ PKCE implemented for authorization code flow
  • ✅ State parameter verified for CSRF protection
  • ✅ ID token signature verified with JWKS
  • ✅ Session cookies are httpOnly
  • ✅ HTTPS in production

Troubleshooting

"Invalid signature"

  • Ensure JWKS URL is correct and accessible
  • Verify token hasn't expired

"State mismatch"

  • Clear session and try again
  • Ensure session persistence is configured

"Token exchange failed"

  • Verify client credentials
  • Check redirect URI matches exactly

Next Steps