from __future__ import absolute_import
from base64 import urlsafe_b64encode
from itertools import groupby
import os
import pickle
from flask import url_for
import six
from six.moves import filter
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm.collections import attribute_mapped_collection, collection
from .. import db
from . import AuthMethod, PermissionType
from ..util import AutoID, Timestamped, AutoName, unistr, ensure_unicode
from ..models import Action, Modifier, Request
if six.PY3:
unicode = str
users_groups = db.Table('users_groups', db.Model.metadata,
db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
db.Column('group_id', db.Integer, db.ForeignKey('group.id')))
@unistr
[docs]class Entity(db.Model, AutoID, AutoName):
"""Private class for shared functionality between :py:class:`User` and
:py:class:`Group`.
This class defines a number of helper methods used indirectly by User and
Group subclasses such as automatically defining the table name and mapper
arguments.
This class should `not` be inherited from directly, instead either
:py:class:`User` or :py:class:`Group` should be used.
"""
#: The name of the entity. Usually a nickname.
name = db.Column(db.String(100, convert_unicode=True), nullable=False)
#: Polymorphic discriminator column.
type_ = db.Column(db.String(50, convert_unicode=True))
#: :py:class:`Permission`\s associated specifically with this entity.
entity_permissions = db.relationship('Permission', back_populates='entity',
collection_class=set,
cascade='save-update,merge,refresh-expire,expunge',
lazy='dynamic')
#: The name of the :py:class:`AuthMethod` for this entity.
authmethod = db.Column(db.String(50, convert_unicode=True), nullable=False)
@declared_attr
def __mapper_args__(cls):
"""SQLAlchemy late-binding attribute to set mapper arguments.
Obviates subclasses from having to specify polymorphic identities.
"""
cls_name = unicode(cls.__name__)
args = {'polymorphic_identity': cls_name}
if cls_name == u'Entity':
args['polymorphic_on'] = cls.type_
return args
def __init__(self, name, authmethod, **kwargs):
self.name = ensure_unicode(name)
self.authmethod = ensure_unicode(authmethod)
super(Entity, self).__init__(**kwargs)
def __repr__(self):
return "{x.__class__.__name__}('{x.name}')".format(x=self)
def __unicode__(self):
return u"{x.name}".format(x=self)
[docs] def has_permission(self, permissions, division_or_request=None):
"""Returns if this entity has been granted a permission in a division.
If ``division_or_request`` is ``None``, this method checks if this
group has the given permission in `any` division.
:param permissions: The series of permissions to check
:type permissions: iterable
:param division_or_request: The division to check. May also be ``None``
or an SRP request.
:type division: :py:class:`Division` or :py:class:`~.models.Request`
:rtype: bool
"""
if permissions in PermissionType.all:
permissions = (permissions,)
# admin permission includes the reviewer and payer privileges
if PermissionType.admin not in permissions and \
PermissionType.elevated.issuperset(permissions):
if self.has_permission(PermissionType.admin, division_or_request):
return True
perms = self.permissions.filter(Permission.permission.in_(permissions))
if division_or_request is not None:
# requests have a 'division' attribute, so we check for that
if hasattr(division_or_request, 'division'):
division = division_or_request.division
else:
division = division_or_request
perms = perms.filter_by(division=division)
return db.session.query(perms.exists()).all()[0][0]
def _json(self, extended=False):
try:
parent = super(Entity, self)._json(extended)
except AttributeError:
parent = {}
parent[u'name'] = self.name
parent[u'source'] = self.authmethod
return parent
[docs]class APIKey(db.Model, AutoID, AutoName, Timestamped):
"""Represents an API key for use with the :ref:`external-api`."""
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
#: The :py:class:`User` this key belongs to.
user = db.relationship('User', back_populates='api_keys',
cascade='save-update,merge,refresh-expire,expunge')
#: The raw key data.
key = db.Column(db.LargeBinary(32), nullable=False)
def __init__(self, user):
self.user = user
self.key = os.urandom(32)
@property
[docs] def hex_key(self):
"""The key data in a modified base-64 format safe for use in URLs."""
return urlsafe_b64encode(self.key).decode('utf-8').replace(u'=', u',')
def _json(self, extended=False):
try:
parent = super(APIKey, self)._json(extended)
except AttributeError:
parent = {}
parent[u'key'] = self.hex_key
parent[u'timestamp'] = self.timestamp
return parent
[docs]class User(Entity):
"""User base class.
Represents users who can submit, review and/or pay out requests. It also
supplies a number of convenience methods for subclasses.
"""
id = db.Column(db.Integer, db.ForeignKey('entity.id'), primary_key=True)
#: If the user is an administrator. This allows the user to create and
#: administer divisions.
admin = db.Column(db.Boolean(name='admin_bool'), nullable=False,
default=False)
#: :py:class:`~.Request`\s this user has submitted.
requests = db.relationship(Request, back_populates='submitter')
#: :py:class:`~.Action`\s this user has performed on requests.
actions = db.relationship(Action, back_populates='user')
#: :py:class:`~.Pilot`\s associated with this user.
pilots = db.relationship('Pilot', back_populates='user',
collection_class=set)
#: :py:class:`Group`\s this user is a member of
groups = db.relationship('Group', secondary=users_groups,
back_populates='users', collection_class=set)
notes = db.relationship('Note', back_populates='user',
order_by='desc(Note.timestamp)', foreign_keys='Note.user_id')
notes_made = db.relationship('Note', back_populates='noter',
order_by='desc(Note.timestamp)', foreign_keys='Note.noter_id')
api_keys = db.relationship(APIKey, back_populates='user')
@hybrid_property
def permissions(self):
"""All :py:class:`Permission` objects associated with this user."""
groups = db.session.query(users_groups.c.group_id.label('group_id'))\
.filter(users_groups.c.user_id==self.id).subquery()
group_perms = db.session.query(Permission)\
.join(groups, groups.c.group_id==Permission.entity_id)
user_perms = db.session.query(Permission)\
.join(User)\
.filter(User.id==self.id)
perms = user_perms.union(group_perms)
return perms
@permissions.expression
def permissions(cls):
groups = db.select([users_groups.c.group_id])\
.where(users_groups.c.user_id==cls.id).alias()
group_permissions = db.select([Permission])\
.where(Permission.entity_id.in_(groups)).alias()
user_permissions = db.select([Permission])\
.where(Permission.entity_id==cls.id)
return user_permissions.union(group_permissions)
@property
[docs] def is_authenticated(self):
"""Part of the interface for Flask-Login."""
return True
@property
[docs] def is_active(self):
"""Part of the interface for Flask-Login."""
return True
@property
[docs] def is_anonymous(self):
"""Part of the interface for Flask-Login."""
return False
[docs] def get_id(self):
"""Part of the interface for Flask-Login."""
return str(self.id)
[docs] def submit_divisions(self):
"""Get a list of the divisions this user is able to submit requests to.
:returns: A list of tuples. The tuples are in the form (division.id,
division.name)
:rtype: list
"""
submit_perms = self.permissions\
.filter_by(permission=PermissionType.submit)\
.subquery()
divisions = db.session.query(Division).join(submit_perms)\
.order_by(Division.name)
# Remove duplicates and sort divisions by name
choices = []
for name, group in groupby(divisions, lambda d: d.name):
choices.append((six.next(group).id, name))
return choices
def _json(self, extended=False):
try:
parent = super(User, self)._json(extended)
except AttributeError:
parent = {}
parent[u'href'] = url_for('api.user_detail', user_id=self.id)
return parent
[docs]class Note(db.Model, AutoID, Timestamped, AutoName):
"""A note about a particular :py:class:`User`."""
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
#: The :py:class:`User` this note refers to.
user = db.relationship(User, back_populates='notes',
cascade='save-update,merge,refresh-expire,expunge',
foreign_keys=[user_id])
#: The actual contents of this note.
content = db.Column(db.Text(convert_unicode=True), nullable=False)
noter_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
#: The author of this note.
noter = db.relationship(User, back_populates='notes_made',
cascade='save-update,merge,refresh-expire,expunge',
foreign_keys=[noter_id])
def __init__(self, user, noter, note):
self.user = user
self.noter = noter
self.content = ensure_unicode(note)
@unistr
[docs]class Pilot(db.Model, AutoID, AutoName):
"""Represents an in-game character."""
# Character names in Eve are resticted to ASCII, but use unicode for
# consistency with the rest of the database columns (and if they ever
# decide to lift this restriction).
#: The name of the character
name = db.Column(db.String(150, convert_unicode=True), nullable=False)
#: The id of the User this character belongs to.
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=True)
#: The User this character belongs to.
user = db.relationship(User, back_populates='pilots')
#: The Requests filed with lossmails from this character.
requests = db.relationship(Request, back_populates='pilot',
collection_class=list, order_by=Request.timestamp.desc())
[docs] def __init__(self, user, name, id_):
"""Create a new Pilot instance.
:param user: The user this character belpongs to.
:type user: :py:class:`~.User`
:param str name: The name of this character.
:param int id_: The CCP-given characterID number.
"""
self.user = user
self.name = ensure_unicode(name)
self.id = id_
def __repr__(self):
return "{x.__class__.__name__}({x.user}, '{x.name}', {x.id})".format(
x=self)
def __unicode__(self):
return self.name
def _json(self, extended=False):
try:
parent = super(Pilot, self)._json(extended)
except AttributeError:
parent = {}
parent[u'name'] = self.name
if extended:
parent[u'user'] = self.user
parent[u'requests'] = self.requests
return parent
[docs]class Group(Entity):
"""Base class for a group of users.
Represents a group of users. Usable for granting permissions to submit,
evaluate and pay.
"""
id = db.Column(db.Integer, db.ForeignKey('entity.id'), primary_key=True)
#: :py:class:`User` s that belong to this group.
users = db.relationship(User, secondary=users_groups,
back_populates='groups', collection_class=set)
#: Synonym for :py:attr:`entity_permissions`
permissions = db.synonym('entity_permissions')
def _json(self, extended=False):
try:
parent = super(Group, self)._json(extended)
except AttributeError:
parent = {}
parent[u'href'] = url_for('api.group_detail', group_id=self.id)
if extended:
parent[u'count'] = len(self.users)
return parent
[docs]class Permission(db.Model, AutoID, AutoName):
__table_args__ = (
db.UniqueConstraint('division_id', 'entity_id', 'permission',
name='division_entity_permission'),
)
division_id = db.Column(db.Integer, db.ForeignKey('division.id'),
nullable=False)
#: The division this permission is granting access to
division = db.relationship('Division',
back_populates='division_permissions',
cascade='save-update,merge,refresh-expire,expunge')
entity_id = db.Column(db.Integer, db.ForeignKey('entity.id'),
nullable=False)
#: The :py:class:`Entity` being granted access
entity = db.relationship(Entity, back_populates='entity_permissions',
cascade='save-update,merge,refresh-expire,expunge')
#: The permission being granted.
permission = db.Column(PermissionType.db_type(), nullable=False)
[docs] def __init__(self, division, permission, entity):
"""Create a Permission object granting an entity access to a division.
"""
self.division = division
self.entity = entity
self.permission = permission
def __repr__(self):
return ("{x.__class__.__name__}('{x.permission}', {x.entity}, "
"{x.division})").format(x=self)
@unistr
[docs]class Division(db.Model, AutoID, AutoName):
"""A reimbursement division.
A division has (possibly non-intersecting) groups of people that can submit
requests, review requests, and pay out requests.
"""
#: The name of this division.
name = db.Column(db.String(128, convert_unicode=True), nullable=False)
#: All :py:class:`Permission`\s associated with this division.
division_permissions = db.relationship(Permission,
back_populates='division', cascade='all,delete-orphan',
collection_class=set)
#: :py:class:`Request` s filed under this division.
requests = db.relationship(Request, back_populates='division',
cascade='all,delete-orphan')
division_transformers = db.relationship(TransformerRef,
collection_class=attribute_mapped_collection('attribute_name'),
back_populates='division', cascade='all,delete-orphan')
#: A mapping of attribute names to :py:class:`~.transformer.Transformer`
#: instances.
transformers = association_proxy(
'division_transformers',
'transformer',
creator=lambda attr, trans:
TransformerRef(attribute_name=attr, transformer=trans))
@property
[docs] def permissions(self):
"""The permissions objects for this division, mapped via their
permission names.
"""
class _PermProxy(object):
def __init__(self, perms):
self.perms = perms
def __getitem__(self, key):
return set(filter(lambda x: x.permission == key, self.perms))
return _PermProxy(self.division_permissions)
def __init__(self, name):
self.name = ensure_unicode(name)
def __repr__(self):
return "{x.__class__.__name__}('{x.name}')".format(x=self)
def __unicode__(self):
return u"{}".format(self.name)
def _json(self, extended=False):
try:
parent = super(Division, self)._json(extended)
except AttributeError:
parent = {}
parent[u'href'] = url_for('divisions.get_division_details',
division_id=self.id)
parent[u'name'] = self.name
if extended:
entities = {}
for perm in PermissionType.all:
members = []
for member in [p.entity for p in self.permissions[perm]]:
members.append(member._json(extended))
entities[perm.name] = members
parent[u'entities'] = entities
return parent