Created
May 17, 2024 22:35
-
-
Save JacobFV/0114b11e207135aaeff028975ec4121b to your computer and use it in GitHub Desktop.
has_permissions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class HasPermissions(SQLBaseModel, table=False): | |
""" | |
A base class to handle permission levels for viewing and updating fields in SQLModel entities. | |
This class allows setting permissions at the field level and automatically applies these permissions | |
as SQL policies on the database. | |
Attributes: | |
__owner_field_name__ (ClassVar[Optional[str]]): The name of the field that identifies the owner of the record. | |
Usage: | |
Subclasses should define SQLModelFields with `schema_extra` specifying `view_permission` and `update_permission`. | |
These permissions dictate who can view or update the fields: `ANYONE`, `OWNER`, or `NOBODY`. | |
Examples: | |
class User(HasPermissions): | |
display_name: str = SQLModelField( | |
schema_extra={ | |
"view_permission": PermissionLevel.ANYONE, | |
"update_permission": PermissionLevel.OWNER, | |
} | |
) | |
email: str = SQLModelField( | |
schema_extra={ | |
"view_permission": PermissionLevel.OWNER, | |
"update_permission": PermissionLevel.OWNER, | |
} | |
) | |
private_notes: str = SQLModelField( | |
schema_extra={ | |
"view_permission": PermissionLevel.NOBODY, | |
"update_permission": PermissionLevel.NOBODY, | |
} | |
) | |
# Set the owner field name to apply OWNER level permissions correctly | |
__owner_field_name__ = 'user_id' | |
This will configure the database to: | |
- Allow anyone to view `display_name`. | |
- Allow only the owner (identified by `user_id`) to view and update `email`. | |
- Restrict both viewing and updating of `private_notes` to nobody. | |
Database policies will be automatically generated and applied based on these settings when the `ddl_extras` class method is invoked, typically during database migration or setup. | |
""" | |
__owner_field_name__: ClassVar[Optional[str]] = None | |
__default_view_permission__: ClassVar[PermissionLevel] = PermissionLevel.NOBODY | |
__default_update_permission__: ClassVar[PermissionLevel] = PermissionLevel.NOBODY | |
@classmethod | |
def ddl_extras(cls, session): | |
# Determine fields with different view permissions | |
nobody_can_view_fields = set( | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], "view_permission", cls.__default_view_permission__ | |
) | |
== PermissionLevel.NOBODY | |
) | |
owner_can_view_fields = set( | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], "view_permission", cls.__default_view_permission__ | |
) | |
== PermissionLevel.OWNER | |
) | |
anyone_can_view_fields = set( | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], "view_permission", cls.__default_view_permission__ | |
) | |
== PermissionLevel.ANYONE | |
) | |
# Check for conflicts in view permissions | |
if field_conflicts_nobody_and_somebody_can_read := nobody_can_view_fields.intersection( | |
owner_can_view_fields | anyone_can_view_fields | |
): | |
raise ValueError( | |
f"Permission levels are inconsistent. The following fields are both nobody_can_view and owner_can_view or anyone_can_view: {field_conflicts_nobody_and_somebody_can_read}" | |
) | |
if field_conflicts_distinct_owner_and_role_permissions := owner_can_view_fields.intersection( | |
anyone_can_view_fields | |
): | |
raise ValueError( | |
f"Postgres' security model does not support simultaneously granting distinct permissions to the owner of a record and arbitrary roles. Please make all your fields either nobody_can_view / owner_can_view or nobody_can_view / anyone_can_view, but not both. You will need to change the following fields: {field_conflicts_distinct_owner_and_role_permissions}" | |
) | |
if owner_can_view_fields is not None and cls.__owner_field_name__ is None: | |
raise ValueError( | |
f"__owner_field_name__ must be set if owner_can_view_fields is set. ({cls.__name__} does not have an `__owner_field_name__` set)" | |
) | |
# Apply database permissions/policies based on view permissions | |
if nobody_can_view_fields is not None: | |
session.exec( | |
f"REVOKE SELECT ({', '.join(nobody_can_view_fields)}) ON {cls.__tablename__} FROM anon, authenticated;" | |
) | |
if owner_can_view_fields is not None: | |
session.exec( | |
f"GRANT SELECT ({', '.join(owner_can_view_fields)}) ON {cls.__tablename__} TO authenticated;" | |
) | |
session.exec( | |
f""" | |
CREATE POLICY owner_view ON {cls.__tablename__} FOR SELECT USING (auth.uid() = {cls.__owner_field_name__}); | |
""" | |
) | |
if anyone_can_view_fields is not None: | |
session.exec( | |
f"GRANT SELECT ({', '.join(anyone_can_view_fields)}) ON {cls.__tablename__} TO anon, authenticated;" | |
) | |
# Determine fields with different update permissions | |
nobody_can_update_fields = [ | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], | |
"update_permission", | |
cls.__default_update_permission__, | |
) | |
== PermissionLevel.NOBODY | |
] | |
owner_can_update_fields = [ | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], | |
"update_permission", | |
cls.__default_update_permission__, | |
) | |
== PermissionLevel.OWNER | |
] | |
anyone_can_update_fields = [ | |
f | |
for f in cls.model_fields | |
if getattr( | |
cls.model_fields[f], | |
"update_permission", | |
cls.__default_update_permission__, | |
) | |
== PermissionLevel.ANYONE | |
] | |
# Check for conflicts in update permissions | |
field_conflicts_nobody_and_somebody_can_update = ( | |
nobody_can_update_fields.intersection( | |
owner_can_update_fields | anyone_can_update_fields | |
) | |
) | |
if field_conflicts_nobody_and_somebody_can_update: | |
raise ValueError( | |
f"Permission levels are inconsistent. The following fields are both nobody_can_update and owner_can_update or anyone_can_update: {field_conflicts_nobody_and_somebody_can_update}" | |
) | |
if field_conflicts_distinct_owner_and_role_permissions_update := owner_can_update_fields.intersection( | |
anyone_can_update_fields | |
): | |
raise ValueError( | |
f"Postgres' security model does not support simultaneously granting distinct permissions to the owner of a record and arbitrary roles for updates. Please make all your fields either nobody_can_update / owner_can_update or nobody_can_update / anyone_can_update, but not both. You will need to change the following fields: {field_conflicts_distinct_owner_and_role_permissions_update}" | |
) | |
if owner_can_update_fields is not None and cls.__owner_field_name__ is None: | |
raise ValueError( | |
f"__owner_field_name__ must be set if owner_can_update_fields is set. ({cls.__name__} does not have an `__owner_field_name__` set)" | |
) | |
# Apply database permissions/policies based on update permissions | |
if nobody_can_update_fields is not None: | |
session.exec( | |
f"REVOKE UPDATE ({', '.join(nobody_can_update_fields)}) ON {cls.__tablename__} FROM anon, authenticated;" | |
) | |
if owner_can_update_fields is not None: | |
session.exec( | |
f"GRANT UPDATE ({', '.join(owner_can_update_fields)}) ON {cls.__tablename__} TO authenticated;" | |
) | |
session.exec( | |
f""" | |
CREATE POLICY owner_update ON {cls.__tablename__} FOR UPDATE USING (auth.uid() = {cls.__owner_field_name__}); | |
""" | |
) | |
if anyone_can_update_fields is not None: | |
session.exec( | |
f"GRANT UPDATE ({', '.join(anyone_can_update_fields)}) ON {cls.__tablename__} TO anon, authenticated;" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment