Skip to main content

Advanced Usage

Advanced patterns and use cases for SVA OAuth integration.

Custom OAuth Flow

Implement a custom OAuth flow without using the built-in views:

from sva_oauth_client.client import SVAOAuthClient, SVATokenError
from django.shortcuts import redirect
from django.conf import settings

def custom_oauth_login(request):
"""Custom OAuth login initiation"""
client = SVAOAuthClient(
base_url=settings.SVA_OAUTH_BASE_URL,
client_id=settings.SVA_OAUTH_CLIENT_ID,
client_secret=settings.SVA_OAUTH_CLIENT_SECRET,
redirect_uri=settings.SVA_OAUTH_REDIRECT_URI,
data_token_secret=settings.SVA_DATA_TOKEN_SECRET,
scopes='openid email profile username',
)

# Generate authorization URL
auth_url, code_verifier = client.get_authorization_url()

# Store code verifier in session
request.session['code_verifier'] = code_verifier
request.session['oauth_state'] = 'custom_state'

return redirect(auth_url)

def custom_oauth_callback(request):
"""Custom OAuth callback handler"""
code = request.GET.get('code')
state = request.GET.get('state')
code_verifier = request.session.get('code_verifier')

if not code or not code_verifier:
return redirect('/error/')

client = SVAOAuthClient(
base_url=settings.SVA_OAUTH_BASE_URL,
client_id=settings.SVA_OAUTH_CLIENT_ID,
client_secret=settings.SVA_OAUTH_CLIENT_SECRET,
redirect_uri=settings.SVA_OAUTH_REDIRECT_URI,
data_token_secret=settings.SVA_DATA_TOKEN_SECRET,
)

try:
# Exchange code for tokens
tokens = client.exchange_code_for_tokens(code, code_verifier)

# Store tokens in session
request.session['sva_oauth_access_token'] = tokens['access_token']
request.session['sva_oauth_refresh_token'] = tokens['refresh_token']
request.session['sva_oauth_data_token'] = tokens['data_token']

# Get blocks data
blocks_data = client.get_blocks_data(tokens['data_token'])

# Clear temporary data
del request.session['code_verifier']

return redirect('/dashboard/')
except SVATokenError as e:
return redirect('/error/')

Token Refresh Management

Manual token refresh with custom logic:

from sva_oauth_client.client import get_client_from_settings, SVATokenError
from datetime import datetime, timezone

def refresh_token_if_needed(request):
"""Manually refresh token if needed"""
access_token_expiry = request.session.get('sva_access_token_expiry')

if not access_token_expiry:
return False

expiry_datetime = datetime.fromtimestamp(access_token_expiry, tz=timezone.utc)
now = datetime.now(timezone.utc)
time_until_expiry = (expiry_datetime - now).total_seconds()

if time_until_expiry <= 300: # 5 minutes
refresh_token = request.session.get('sva_oauth_refresh_token')

if refresh_token:
try:
client = get_client_from_settings()
new_tokens = client.refresh_access_token(refresh_token)

# Update session
request.session['sva_oauth_access_token'] = new_tokens['access_token']
if 'refresh_token' in new_tokens:
request.session['sva_oauth_refresh_token'] = new_tokens['refresh_token']
if 'data_token' in new_tokens:
request.session['sva_oauth_data_token'] = new_tokens['data_token']

# Update expiry
new_expires_in = new_tokens.get('expires_in', 3600)
new_expiry = datetime.now(timezone.utc).timestamp() + new_expires_in
request.session['sva_access_token_expiry'] = new_expiry

return True
except SVATokenError:
# Refresh failed
return False

return False

API Integration

Create REST API endpoints with OAuth:

from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from sva_oauth_client.decorators import sva_oauth_required
from sva_oauth_client.utils import get_sva_claims

@api_view(['GET'])
@sva_oauth_required
def api_user_profile(request):
"""API endpoint for user profile"""
claims = get_sva_claims(request)

return Response({
'email': claims.get('email'),
'name': claims.get('name'),
'username': claims.get('username'),
})

@api_view(['GET'])
@sva_oauth_required
def api_user_blocks(request):
"""API endpoint returning all user blocks"""
claims = get_sva_claims(request)

return Response({
'blocks': claims,
'available_blocks': list(claims.keys()),
})

Conditional Block Access

Handle views that work with or without specific blocks:

from sva_oauth_client.decorators import sva_oauth_required
from sva_oauth_client.utils import get_sva_claims

@sva_oauth_required
def flexible_profile_view(request):
"""View that adapts based on available blocks"""
claims = get_sva_claims(request)

context = {
'has_email': 'email' in claims,
'has_phone': 'phone' in claims,
'has_address': 'address' in claims,
}

# Add available data
if 'email' in claims:
context['email'] = claims['email']

if 'phone' in claims:
context['phone'] = claims['phone']

if 'address' in claims:
context['address'] = claims['address']

return render(request, 'flexible_profile.html', context)

Multi-Step Forms

Use OAuth data to pre-fill forms:

@sva_oauth_required
def registration_form(request):
"""Registration form pre-filled with OAuth data"""
claims = get_sva_claims(request)

initial_data = {}

if 'email' in claims:
initial_data['email'] = claims['email']

if 'name' in claims:
# Split name into first/last
name_parts = claims['name'].split(' ', 1)
initial_data['first_name'] = name_parts[0]
if len(name_parts) > 1:
initial_data['last_name'] = name_parts[1]

if 'phone' in claims:
initial_data['phone'] = claims['phone']

form = RegistrationForm(initial=initial_data)
return render(request, 'registration.html', {'form': form})

Session Management

Custom session handling:

from sva_oauth_client.utils import (
is_authenticated,
get_sva_claims,
clear_oauth_session,
)
from datetime import timedelta

@sva_oauth_required
def extend_session_view(request):
"""Extend session expiry"""
if is_authenticated(request.session):
# Extend session by 30 days
request.session.set_expiry(timedelta(days=30))
request.session.modified = True
return JsonResponse({'status': 'session_extended'})
return JsonResponse({'status': 'not_authenticated'}, status=401)

Error Recovery

Implement custom error recovery:

from sva_oauth_client.utils import get_sva_claims
from sva_oauth_client.client import SVATokenError
from django.contrib import messages

@sva_oauth_required
def my_view(request):
try:
claims = get_sva_claims(request)
# Process claims...
except SVATokenError as e:
# Custom error handling
messages.warning(request, 'Your session expired. Please sign in again.')

# Option 1: Redirect to login
return redirect('sva_oauth_client:login')

# Option 2: Try to refresh token
# if refresh_token_if_needed(request):
# return redirect(request.path)
# else:
# return redirect('sva_oauth_client:login')

Webhook Integration

Handle OAuth events via webhooks:

from django.views.decorators.csrf import csrf_exempt
from django.http import JsonResponse
import hmac
import hashlib

@csrf_exempt
def oauth_webhook(request):
"""Handle OAuth webhook events"""
signature = request.headers.get('X-Webhook-Signature')
payload = request.body

# Verify signature
expected_signature = hmac.new(
settings.WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()

if not hmac.compare_digest(signature, expected_signature):
return JsonResponse({'error': 'Invalid signature'}, status=401)

event = json.loads(payload)

if event['type'] == 'user.revoked_access':
# Handle access revocation
user_id = event['data']['user_id']
# Clear user sessions, etc.

return JsonResponse({'status': 'ok'})

Next Steps