Source code for oauth2app.authorize

#-*- coding: utf-8 -*-


"""OAuth 2.0 Authorization"""


from urllib import urlencode
from django.http import absolute_http_url_re, HttpResponseRedirect
from .exceptions import OAuth2Exception
from .models import Client, AccessRange, Code, AccessToken, KeyGenerator
from .consts import ACCESS_TOKEN_EXPIRATION, REFRESHABLE
from .consts import CODE, TOKEN, CODE_AND_TOKEN
from .consts import AUTHENTICATION_METHOD, MAC, BEARER, MAC_KEY_LENGTH
from .lib.uri import add_parameters, add_fragments, normalize


[docs]class AuthorizationException(OAuth2Exception): """Authorization exception base class.""" pass
[docs]class MissingRedirectURI(OAuth2Exception): """Neither the request nor the client specify a redirect_url.""" pass
[docs]class UnauthenticatedUser(OAuth2Exception): """The provided user is not internally authenticated, via user.is_authenticated()""" pass
[docs]class UnvalidatedRequest(OAuth2Exception): """The method requested requires a validated request to continue.""" pass
[docs]class InvalidRequest(AuthorizationException): """The request is missing a required parameter, includes an unsupported parameter or parameter value, or is otherwise malformed.""" error = 'invalid_request'
[docs]class InvalidClient(AuthorizationException): """Client authentication failed (e.g. unknown client, no client credentials included, multiple client credentials included, or unsupported credentials type).""" error = 'invalid_client'
[docs]class UnauthorizedClient(AuthorizationException): """The client is not authorized to request an authorization code using this method.""" error = 'unauthorized_client'
[docs]class AccessDenied(AuthorizationException): """The resource owner or authorization server denied the request.""" error = 'access_denied'
[docs]class UnsupportedResponseType(AuthorizationException): """The authorization server does not support obtaining an authorization code using this method.""" error = 'unsupported_response_type'
[docs]class InvalidScope(AuthorizationException): """The requested scope is invalid, unknown, or malformed.""" error = 'invalid_scope'
RESPONSE_TYPES = { "code":CODE, "token":TOKEN}
[docs]class Authorizer(object): """Access authorizer. Validates access credentials and generates a response with an authorization code passed as a parameter to the redirect URI, an access token passed as a URI fragment to the redirect URI, or both. **Kwargs:** * *scope:* An iterable of oauth2app.models.AccessRange objects representing the scope the authorizer can grant. *Default None* * *authentication_method:* Type of token to generate. Possible values are: oauth2app.consts.MAC and oauth2app.consts.BEARER *Default oauth2app.consts.BEARER* * *refreshable:* Boolean value indicating whether issued tokens are refreshable. *Default True* """ client = None access_ranges = None valid = False error = None def __init__( self, scope=None, authentication_method=AUTHENTICATION_METHOD, refreshable=REFRESHABLE, response_type=CODE): if response_type not in [CODE, TOKEN, CODE_AND_TOKEN]: raise OAuth2Exception("Possible values for response_type" " are oauth2app.consts.CODE, oauth2app.consts.TOKEN, " "oauth2app.consts.CODE_AND_TOKEN") self.authorized_response_type = response_type self.refreshable = refreshable if authentication_method not in [BEARER, MAC]: raise OAuth2Exception("Possible values for authentication_method" " are oauth2app.consts.MAC and oauth2app.consts.BEARER") self.authentication_method = authentication_method if scope is None: self.authorized_scope = None elif isinstance(scope, AccessRange): self.authorized_scope = set([scope.key]) else: self.authorized_scope = set([x.key for x in scope]) def __call__(self, request): """Validate the request. Returns an error redirect if the request fails authorization, or a MissingRedirectURI if no redirect_uri is available. **Args:** * *request:* Django HttpRequest object. *Returns HTTP Response redirect*""" try: self.validate(request) except AuthorizationException, e: # The request is malformed or invalid. Automatically # redirects to the provided redirect URL. return self.error_redirect() return self.grant_redirect()
[docs] def validate(self, request): """Validate the request. Raises an AuthorizationException if the request fails authorization, or a MissingRedirectURI if no redirect_uri is available. **Args:** * *request:* Django HttpRequest object. *Returns None*""" self.response_type = request.REQUEST.get('response_type') self.client_id = request.REQUEST.get('client_id') self.redirect_uri = request.REQUEST.get('redirect_uri') self.scope = request.REQUEST.get('scope') if self.scope is not None: self.scope = set(self.scope.split()) self.state = request.REQUEST.get('state') self.user = request.user self.request = request try: self._validate() except AuthorizationException, e: self._check_redirect_uri() self.error = e raise e self.valid = True
def _validate(self): """Validate the request.""" if self.client_id is None: raise InvalidRequest('No client_id') try: self.client = Client.objects.get(key=self.client_id) except Client.DoesNotExist: raise InvalidClient("client_id %s doesn't exist" % self.client_id) # Redirect URI if self.redirect_uri is None: if self.client.redirect_uri is None: raise MissingRedirectURI("No redirect_uri" "provided or registered.") elif self.client.redirect_uri is not None: if normalize(self.redirect_uri) != normalize(self.client.redirect_uri): self.redirect_uri = self.client.redirect_uri raise InvalidRequest("Registered redirect_uri doesn't " "match provided redirect_uri.") self.redirect_uri = self.redirect_uri or self.client.redirect_uri # Check response type if self.response_type is None: raise InvalidRequest('response_type is a required parameter.') if self.response_type not in ["code", "token"]: raise InvalidRequest("No such response type %s" % self.response_type) # Response type if self.authorized_response_type & RESPONSE_TYPES[self.response_type] == 0: raise UnauthorizedClient("Response type %s not allowed." % self.response_type) if not absolute_http_url_re.match(self.redirect_uri): raise InvalidRequest('Absolute URI required for redirect_uri') # Scope if self.authorized_scope is not None and self.scope is None: self.scope = self.authorized_scope if self.scope is not None: self.access_ranges = AccessRange.objects.filter(key__in=self.scope) access_ranges = set(self.access_ranges.values_list('key', flat=True)) difference = access_ranges.symmetric_difference(self.scope) if len(difference) != 0: raise InvalidScope("Following access ranges do not " "exist: %s" % ', '.join(difference)) if self.authorized_scope is not None: new_scope = self.scope - self.authorized_scope if len(new_scope) > 0: raise InvalidScope("Invalid scope: %s" % ','.join(new_scope)) def _check_redirect_uri(self): """Raise MissingRedirectURI if no redirect_uri is available.""" if self.redirect_uri is None: raise MissingRedirectURI('No redirect_uri to send response.') if not absolute_http_url_re.match(self.redirect_uri): raise MissingRedirectURI('Absolute redirect_uri required.')
[docs] def error_redirect(self): """In the event of an error, return a Django HttpResponseRedirect with the appropriate error parameters. Raises MissingRedirectURI if no redirect_uri is available. *Returns HttpResponseRedirect*""" self._check_redirect_uri() if self.error is not None: e = self.error else: e = AccessDenied("Access Denied.") parameters = {'error': e.error, 'error_description': u'%s' % e.message} if self.state is not None: parameters['state'] = self.state redirect_uri = self.redirect_uri if self.authorized_response_type & CODE != 0: redirect_uri = add_parameters(redirect_uri, parameters) if self.authorized_response_type & TOKEN != 0: redirect_uri = add_fragments(redirect_uri, parameters) return HttpResponseRedirect(redirect_uri)
def _query_string(self): """Returns the a url encoded query string useful for resending request parameters when a user authorizes the request via a form POST. Raises UnvalidatedRequest if the request has not been validated. *Returns str*""" if not self.valid: raise UnvalidatedRequest("This request is invalid or has not" "been validated.") parameters = { "response_type":self.response_type, "client_id":self.client_id} if self.redirect_uri is not None: parameters["redirect_uri"] = self.redirect_uri if self.state is not None: parameters["state"] = self.state if self.scope is not None: parameters["scope"] = ' '.join(self.scope) return urlencode(parameters) query_string = property(_query_string)
[docs] def grant_redirect(self): """On successful authorization of the request, return a Django HttpResponseRedirect with the appropriate authorization code parameters or access token URI fragments.. Raises UnvalidatedRequest if the request has not been validated. *Returns HttpResponseRedirect*""" if not self.valid: raise UnvalidatedRequest("This request is invalid or has not " "been validated.") if self.user.is_authenticated(): parameters = {} fragments = {} if self.scope is not None: access_ranges = list(AccessRange.objects.filter(key__in=self.scope)) else: access_ranges = [] if RESPONSE_TYPES[self.response_type] & CODE != 0: code = Code.objects.create( user=self.user, client=self.client, redirect_uri=self.redirect_uri) code.scope = access_ranges code.save() parameters['code'] = code.key if RESPONSE_TYPES[self.response_type] & TOKEN != 0: access_token = AccessToken.objects.create( user=self.user, client=self.client) access_token.scope = access_ranges fragments['access_token'] = access_token.token if access_token.refreshable: fragments['refresh_token'] = access_token.refresh_token fragments['expires_in'] = ACCESS_TOKEN_EXPIRATION if self.scope is not None: fragments['scope'] = ' '.join(self.scope) if self.authentication_method == MAC: access_token.mac_key = KeyGenerator(MAC_KEY_LENGTH)() fragments["mac_key"] = access_token.mac_key fragments["mac_algorithm"] = "hmac-sha-256" fragments["token_type"] = "mac" elif self.authentication_method == BEARER: fragments["token_type"] = "bearer" access_token.save() if self.state is not None: parameters['state'] = self.state redirect_uri = add_parameters(self.redirect_uri, parameters) redirect_uri = add_fragments(redirect_uri, fragments) return HttpResponseRedirect(redirect_uri) else: raise UnauthenticatedUser("Django user object associated with the " "request is not authenticated.")