|
from django.core.management.base import BaseCommand |
|
from django.contrib.auth.models import User, Permission |
|
from django.contrib.contenttypes.models import ContentType |
|
from django.db import transaction |
|
from django.apps import apps |
|
|
|
class Command(BaseCommand): |
|
help = 'Create a user with access to specified models' |
|
|
|
def add_arguments(self, parser): |
|
parser.add_argument('username', type=str, help='Username for the new user') |
|
parser.add_argument('email', type=str, help='Email for the new user') |
|
parser.add_argument('password', type=str, help='Password for the new user') |
|
parser.add_argument( |
|
'--models', |
|
nargs='+', |
|
type=str, |
|
required=True, |
|
help='List of models in format: app_name.ModelName (e.g., public.LiveVideo public.OtherModel)' |
|
) |
|
parser.add_argument( |
|
'--permissions', |
|
nargs='+', |
|
type=str, |
|
default=['view', 'add', 'change', 'delete'], |
|
help='List of permissions to grant (view, add, change, delete)' |
|
) |
|
|
|
def handle(self, *args, **options): |
|
username = options['username'] |
|
email = options['email'] |
|
password = options['password'] |
|
model_paths = options['models'] |
|
permissions = options['permissions'] |
|
|
|
try: |
|
with transaction.atomic(): |
|
# Create or update user |
|
user, created = User.objects.get_or_create( |
|
username=username, |
|
defaults={ |
|
'email': email, |
|
'is_staff': True, |
|
} |
|
) |
|
|
|
if not created: |
|
user.email = email |
|
self.stdout.write( |
|
self.style.WARNING(f'User {username} already exists. Updating permissions...') |
|
) |
|
|
|
user.set_password(password) |
|
user.is_active = True |
|
user.save() |
|
|
|
# Clear existing permissions |
|
user.user_permissions.clear() |
|
|
|
# Process each model |
|
for model_path in model_paths: |
|
try: |
|
app_label, model_name = model_path.split('.') |
|
model = apps.get_model(app_label, model_name) |
|
content_type = ContentType.objects.get_for_model(model) |
|
model_name_lower = model_name.lower() |
|
|
|
# Create and assign permissions for this model |
|
for permission in permissions: |
|
perm_codename = f'{permission}_{model_name_lower}' |
|
perm_name = f'Can {permission} {model_name_lower}' |
|
|
|
perm, created = Permission.objects.get_or_create( |
|
codename=perm_codename, |
|
content_type=content_type, |
|
defaults={'name': perm_name} |
|
) |
|
|
|
user.user_permissions.add(perm) |
|
|
|
if created: |
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f'Created and granted {app_label}.{perm_codename} ' |
|
f'permission to {username}' |
|
) |
|
) |
|
else: |
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f'Granted existing {app_label}.{perm_codename} ' |
|
f'permission to {username}' |
|
) |
|
) |
|
|
|
except ValueError: |
|
self.stdout.write( |
|
self.style.ERROR( |
|
f'Invalid model format: {model_path}. ' |
|
f'Use format: app_name.ModelName' |
|
) |
|
) |
|
continue |
|
except LookupError: |
|
self.stdout.write( |
|
self.style.ERROR(f'Model not found: {model_path}') |
|
) |
|
continue |
|
|
|
self.stdout.write( |
|
self.style.SUCCESS( |
|
f'Successfully created/updated user {username} ' |
|
f'with permissions for specified models' |
|
) |
|
) |
|
|
|
except Exception as e: |
|
self.stdout.write( |
|
self.style.ERROR(f'Error creating/updating user: {str(e)}') |
|
) |