Source code for evesrp.auth.testoauth

from __future__ import absolute_import
from flask import request, abort, current_app, json
import six
from sqlalchemy.orm.exc import NoResultFound

from .oauth import OAuthMethod, OAuthUser
from .. import db
from .models import Group, Pilot

[docs]class TestOAuth(OAuthMethod):
[docs] def __init__(self, devtest=False, **kwargs): """:py:class:`~.AuthMethod` using TEST Auth's OAuth-based API for authentication and authorization. :param list admins: Two types of values are accepted as values in this list, either a string specifying a user's primary character's name, or their Auth ID as an integer. :param bool devtest: Testing parameter that changes the default domain for URLs from '' to '`. Default: ``False``. :param str authorize_url: The URL to request OAuth authorization tokens. Default: ``''``. :param str access_token_url: The URL for OAuth token exchange. Default: ``''``. :param str base_str: The base URL for API requests. Default: ``''``. :param dict request_token_params: Additional parameters to include with the authorization token request. Default: ``{'scope': 'private-read'}``. :param str access_token_method: HTTP Method to use for exchanging authorization tokens for access tokens. Default: ``'POST'``. :param str name: The name for this authentication method. Default: ``'Test OAuth'``. """ if not devtest: domain = '' else: domain = '' self.base_url = domain + '/api/3.0/' kwargs.setdefault('authorize_url', domain + '/o2/authorize/') kwargs.setdefault('access_token_url', domain + '/o2/token/') kwargs.setdefault('scope', ['read_profile']) kwargs.setdefault('method', 'POST') kwargs.setdefault('app_key', 'TEST_OAUTH') kwargs.setdefault('name', u'Test OAuth') super(TestOAuth, self).__init__(**kwargs)
def _get_user_data(self): if not hasattr(request, '_auth_user_data'): resp = self.session.get(self.base_url + 'profile') try: current_app.logger.debug(u"Test OAuth API response: {}".format( resp.text)) request._auth_user_data = resp.json() except TypeError: abort(500, u"Error in receiving OAuth response: {}".format( resp)) return request._auth_user_data def get_user(self): data = self._get_user_data() primary_character = data[u'primary_character'][u'name'] user_id = data[u'id'] try: user = TestOAuthUser.query.filter_by(auth_id=data[u'id'], # The primary character can change = primary_character except NoResultFound: user = TestOAuthUser(primary_character, user_id, db.session.add(user) db.session.commit() return user def is_admin(self, user): data = self._get_user_data() return super(TestOAuth, self).is_admin(user) or \ user.auth_id in self.admins or \ data[u'is_staff'] or \ data[u'is_superuser'] def get_pilots(self): data = self._get_user_data() # The Auth API will duplicate characters when there's more than one API # key for them. pilots = {} for character in data[u'characters']: pilot = Pilot.query.get(int(character[u'id'])) if pilot is None: pilot = Pilot(None, character[u'name'], character[u'id']) pilots[character[u'id']] = pilot pilots = list(pilots.values()) return pilots def get_groups(self): data = self._get_user_data() groups = [] for group_info in data[u'groups']: group_name = group_info[u'name'] group_id = group_info[u'id'] try: group = TestOAuthGroup.query.filter_by(auth_id=group_id, except NoResultFound: group = TestOAuthGroup(group_name, group_id, db.session.add(group) if != group_name: = group_name groups.append(group) db.session.commit() return groups
class TestOAuthUser(OAuthUser): id = db.Column(db.Integer, db.ForeignKey(, primary_key=True) auth_id = db.Column(db.Integer, nullable=False, unique=True, index=True) def __init__(self, username, auth_id, authmethod, groups=None, **kwargs): self.auth_id = auth_id super(TestOAuthUser, self).__init__(username, authmethod, **kwargs) class TestOAuthGroup(Group): id = db.Column(db.Integer, db.ForeignKey(, primary_key=True) auth_id = db.Column(db.Integer, nullable=False, unique=True, index=True) def __init__(self, name, auth_id, authmethod, **kwargs): self.auth_id = auth_id super(TestOAuthGroup, self).__init__(name, authmethod, **kwargs)