Команды с аргументами

Django команды могут принимать различные аргументы и опции для гибкой настройки. Правильное использование аргументов делает команды более мощными и удобными для автоматизации.

Основы создания команд

Каждая Django команда наследуется от BaseCommand и должна реализовывать метод handle(). Аргументы добавляются через add_arguments().

Позиционные аргументы

 1from django.core.management.base import BaseCommand, CommandError
 2from django.core.validators import validate_email
 3from django.contrib.auth.models import User
 4import os
 5
 6class Command(BaseCommand):
 7    help = 'Создание пользователя с указанным email и паролем'
 8
 9    def add_arguments(self, parser):
10        # Позиционный аргумент - обязательный
11        parser.add_argument('username', type=str, help='Имя пользователя')
12        parser.add_argument('email', type=str, help='Email пользователя')
13        parser.add_argument('password', type=str, help='Пароль пользователя')
14
15        # Опциональные аргументы
16        parser.add_argument(
17            '--first-name',
18            type=str,
19            default='',
20            help='Имя (по умолчанию пустое)'
21        )
22        parser.add_argument(
23            '--last-name',
24            type=str,
25            default='',
26            help='Фамилия (по умолчанию пустая)'
27        )
28        parser.add_argument(
29            '--is-staff',
30            action='store_true',
31            help='Сделать пользователя персоналом'
32        )
33        parser.add_argument(
34            '--is-superuser',
35            action='store_true',
36            help='Сделать пользователя суперпользователем'
37        )
38
39    def handle(self, *args, **options):
40        username = options['username']
41        email = options['email']
42        password = options['password']
43        first_name = options['first_name']
44        last_name = options['last_name']
45        is_staff = options['is_staff']
46        is_superuser = options['is_superuser']
47
48        # Валидация email
49        try:
50            validate_email(email)
51        except ValidationError:
52            raise CommandError(f'Неверный email: {email}')
53
54        # Проверка существования пользователя
55        if User.objects.filter(username=username).exists():
56            raise CommandError(f'Пользователь {username} уже существует')
57
58        if User.objects.filter(email=email).exists():
59            raise CommandError(f'Email {email} уже используется')
60
61        # Создание пользователя
62        try:
63            user = User.objects.create_user(
64                username=username,
65                email=email,
66                password=password,
67                first_name=first_name,
68                last_name=last_name,
69                is_staff=is_staff,
70                is_superuser=is_superuser
71            )
72
73            self.stdout.write(
74                self.style.SUCCESS(
75                    f'Пользователь {username} успешно создан'
76                )
77            )
78
79            if is_superuser:
80                self.stdout.write(
81                    self.style.WARNING(
82                        'Внимание: создан суперпользователь!'
83                    )
84                )
85
86        except Exception as e:
87            raise CommandError(f'Ошибка создания пользователя: {e}')

Опции с различными типами данных

  1from django.core.management.base import BaseCommand
  2from django.conf import settings
  3import json
  4import csv
  5
  6class Command(BaseCommand):
  7    help = 'Импорт данных из различных форматов файлов'
  8
  9    def add_arguments(self, parser):
 10        # Обязательный аргумент - путь к файлу
 11        parser.add_argument(
 12            'filepath',
 13            type=str,
 14            help='Путь к файлу для импорта'
 15        )
 16
 17        # Выбор формата файла
 18        parser.add_argument(
 19            '--format',
 20            choices=['csv', 'json', 'xml'],
 21            default='csv',
 22            help='Формат файла (по умолчанию: csv)'
 23        )
 24
 25        # Флаг для тестового режима
 26        parser.add_argument(
 27            '--dry-run',
 28            action='store_true',
 29            help='Тестовый режим без сохранения данных'
 30        )
 31
 32        # Числовые параметры
 33        parser.add_argument(
 34            '--batch-size',
 35            type=int,
 36            default=1000,
 37            help='Размер пакета для обработки (по умолчанию: 1000)'
 38        )
 39
 40        # Логические флаги
 41        parser.add_argument(
 42            '--skip-errors',
 43            action='store_true',
 44            help='Пропускать ошибки и продолжать импорт'
 45        )
 46
 47        parser.add_argument(
 48            '--verbose',
 49            action='store_true',
 50            help='Подробный вывод процесса импорта'
 51        )
 52
 53        # Множественные значения
 54        parser.add_argument(
 55            '--exclude-fields',
 56            nargs='+',
 57            default=[],
 58            help='Поля для исключения из импорта'
 59        )
 60
 61    def handle(self, *args, **options):
 62        filepath = options['filepath']
 63        format_type = options['format']
 64        dry_run = options['dry_run']
 65        batch_size = options['batch_size']
 66        skip_errors = options['skip_errors']
 67        verbose = options['verbose']
 68        exclude_fields = options['exclude_fields']
 69
 70        # Проверка существования файла
 71        if not os.path.exists(filepath):
 72            raise CommandError(f'Файл не найден: {filepath}')
 73
 74        if verbose:
 75            self.stdout.write(f'Начинаем импорт из файла: {filepath}')
 76            self.stdout.write(f'Формат: {format_type}')
 77            self.stdout.write(f'Размер пакета: {batch_size}')
 78            if exclude_fields:
 79                self.stdout.write(f'Исключаемые поля: {exclude_fields}')
 80
 81        # Обработка в зависимости от формата
 82        if format_type == 'csv':
 83            self.import_csv(filepath, batch_size, exclude_fields, dry_run, verbose)
 84        elif format_type == 'json':
 85            self.import_json(filepath, batch_size, exclude_fields, dry_run, verbose)
 86        else:
 87            raise CommandError(f'Неподдерживаемый формат: {format_type}')
 88
 89        if not dry_run:
 90            self.stdout.write(
 91                self.style.SUCCESS('Импорт завершен успешно')
 92            )
 93        else:
 94            self.stdout.write(
 95                self.style.WARNING('Тестовый режим - данные не сохранены')
 96            )
 97
 98    def import_csv(self, filepath, batch_size, exclude_fields, dry_run, verbose):
 99        """Импорт из CSV файла"""
100        processed = 0
101        errors = 0
102
103        with open(filepath, 'r', encoding='utf-8') as file:
104            reader = csv.DictReader(file)
105
106            batch = []
107            for row in reader:
108                # Исключаем указанные поля
109                for field in exclude_fields:
110                    row.pop(field, None)
111
112                batch.append(row)
113                processed += 1
114
115                if len(batch) >= batch_size:
116                    if not dry_run:
117                        self.process_batch(batch)
118                    batch = []
119
120                    if verbose:
121                        self.stdout.write(f'Обработано записей: {processed}')
122
123        # Обрабатываем оставшиеся записи
124        if batch and not dry_run:
125            self.process_batch(batch)
126
127        if verbose:
128            self.stdout.write(f'Всего обработано: {processed} записей')
129            if errors > 0:
130                self.stdout.write(f'Ошибок: {errors}')
131
132    def process_batch(self, batch):
133        """Обработка пакета данных"""
134        # Здесь должна быть логика сохранения данных
135        pass

Валидация и обработка ошибок

  1from django.core.management.base import BaseCommand, CommandError
  2from django.core.validators import URLValidator
  3from django.core.exceptions import ValidationError
  4import requests
  5import time
  6
  7class Command(BaseCommand):
  8    help = 'Проверка доступности URL адресов'
  9
 10    def add_arguments(self, parser):
 11        parser.add_argument(
 12            'urls',
 13            nargs='+',
 14            type=str,
 15            help='URL адреса для проверки'
 16        )
 17
 18        parser.add_argument(
 19            '--timeout',
 20            type=int,
 21            default=30,
 22            help='Таймаут запроса в секундах (по умолчанию: 30)'
 23        )
 24
 25        parser.add_argument(
 26            '--retries',
 27            type=int,
 28            default=3,
 29            help='Количество попыток (по умолчанию: 3)'
 30        )
 31
 32        parser.add_argument(
 33            '--output',
 34            type=str,
 35            choices=['console', 'json', 'csv'],
 36            default='console',
 37            help='Формат вывода результатов'
 38        )
 39
 40    def handle(self, *args, **options):
 41        urls = options['urls']
 42        timeout = options['timeout']
 43        retries = options['retries']
 44        output_format = options['output']
 45
 46        # Валидация URL
 47        url_validator = URLValidator()
 48        valid_urls = []
 49
 50        for url in urls:
 51            try:
 52                url_validator(url)
 53                valid_urls.append(url)
 54            except ValidationError:
 55                self.stdout.write(
 56                    self.style.ERROR(f'Неверный URL: {url}')
 57                )
 58
 59        if not valid_urls:
 60            raise CommandError('Нет валидных URL для проверки')
 61
 62        # Проверка URL
 63        results = []
 64        for url in valid_urls:
 65            result = self.check_url(url, timeout, retries)
 66            results.append(result)
 67
 68        # Вывод результатов
 69        self.output_results(results, output_format)
 70
 71    def check_url(self, url, timeout, retries):
 72        """Проверка доступности URL"""
 73        for attempt in range(retries):
 74            try:
 75                start_time = time.time()
 76                response = requests.get(url, timeout=timeout)
 77                response_time = time.time() - start_time
 78
 79                return {
 80                    'url': url,
 81                    'status_code': response.status_code,
 82                    'response_time': round(response_time, 2),
 83                    'available': response.status_code < 400,
 84                    'attempt': attempt + 1
 85                }
 86
 87            except requests.exceptions.Timeout:
 88                if attempt == retries - 1:
 89                    return {
 90                        'url': url,
 91                        'status_code': None,
 92                        'response_time': None,
 93                        'available': False,
 94                        'error': 'Timeout',
 95                        'attempt': attempt + 1
 96                    }
 97                time.sleep(1)  # Пауза перед повторной попыткой
 98
 99            except requests.exceptions.RequestException as e:
100                if attempt == retries - 1:
101                    return {
102                        'url': url,
103                        'status_code': None,
104                        'response_time': None,
105                        'available': False,
106                        'error': str(e),
107                        'attempt': attempt + 1
108                    }
109                time.sleep(1)
110
111        return {
112            'url': url,
113            'status_code': None,
114            'response_time': None,
115            'available': False,
116            'error': 'Unknown error',
117            'attempt': retries
118        }
119
120    def output_results(self, results, output_format):
121        """Вывод результатов в различных форматах"""
122        if output_format == 'console':
123            self.output_console(results)
124        elif output_format == 'json':
125            self.output_json(results)
126        elif output_format == 'csv':
127            self.output_csv(results)
128
129    def output_console(self, results):
130        """Вывод в консоль"""
131        available_count = sum(1 for r in results if r['available'])
132        total_count = len(results)
133
134        self.stdout.write(f'\nРезультаты проверки ({available_count}/{total_count} доступны):')
135        self.stdout.write('=' * 80)
136
137        for result in results:
138            if result['available']:
139                self.stdout.write(
140                    self.style.SUCCESS(
141                        f"✓ {result['url']} - {result['status_code']} "
142                        f"({result['response_time']}s)"
143                    )
144                )
145            else:
146                error_msg = result.get('error', 'Unknown error')
147                self.stdout.write(
148                    self.style.ERROR(
149                        f"✗ {result['url']} - {error_msg}"
150                    )
151                )

Интерактивные команды

  1from django.core.management.base import BaseCommand
  2from django.contrib.auth.models import User
  3from django.core.validators import validate_email
  4import getpass
  5
  6class Command(BaseCommand):
  7    help = 'Интерактивное создание пользователя'
  8
  9    def add_arguments(self, parser):
 10        parser.add_argument(
 11            '--username',
 12            type=str,
 13            help='Имя пользователя (если не указано, будет запрошено)'
 14        )
 15
 16        parser.add_argument(
 17            '--email',
 18            type=str,
 19            help='Email (если не указан, будет запрошен)'
 20        )
 21
 22        parser.add_argument(
 23            '--non-interactive',
 24            action='store_true',
 25            help='Неинтерактивный режим (требует все аргументы)'
 26        )
 27
 28    def handle(self, *args, **options):
 29        username = options['username']
 30        email = options['email']
 31        non_interactive = options['non_interactive']
 32
 33        if non_interactive:
 34            if not username or not email:
 35                raise CommandError(
 36                    'В неинтерактивном режиме username и email обязательны'
 37                )
 38            self.create_user_non_interactive(username, email)
 39        else:
 40            self.create_user_interactive(username, email)
 41
 42    def create_user_interactive(self, username=None, email=None):
 43        """Интерактивное создание пользователя"""
 44        self.stdout.write('Создание нового пользователя')
 45        self.stdout.write('=' * 40)
 46
 47        # Запрашиваем username
 48        while not username:
 49            username = input('Имя пользователя: ').strip()
 50            if not username:
 51                self.stdout.write(
 52                    self.style.ERROR('Имя пользователя не может быть пустым')
 53                )
 54            elif User.objects.filter(username=username).exists():
 55                self.stdout.write(
 56                    self.style.ERROR('Пользователь с таким именем уже существует')
 57                )
 58                username = None
 59
 60        # Запрашиваем email
 61        while not email:
 62            email = input('Email: ').strip()
 63            if not email:
 64                self.stdout.write(
 65                    self.style.ERROR('Email не может быть пустым')
 66                )
 67            else:
 68                try:
 69                    validate_email(email)
 70                    if User.objects.filter(email=email).exists():
 71                        self.stdout.write(
 72                            self.style.ERROR('Email уже используется')
 73                        )
 74                        email = None
 75                except ValidationError:
 76                    self.stdout.write(
 77                        self.style.ERROR('Неверный формат email')
 78                    )
 79                    email = None
 80
 81        # Запрашиваем пароль
 82        while True:
 83            password = getpass.getpass('Пароль: ')
 84            if len(password) < 8:
 85                self.stdout.write(
 86                    self.style.ERROR('Пароль должен содержать минимум 8 символов')
 87                )
 88                continue
 89
 90            password_confirm = getpass.getpass('Подтвердите пароль: ')
 91            if password != password_confirm:
 92                self.stdout.write(
 93                    self.style.ERROR('Пароли не совпадают')
 94                )
 95                continue
 96            break
 97
 98        # Дополнительные поля
 99        first_name = input('Имя (необязательно): ').strip()
100        last_name = input('Фамилия (необязательно): ').strip()
101
102        # Создание пользователя
103        try:
104            user = User.objects.create_user(
105                username=username,
106                email=email,
107                password=password,
108                first_name=first_name,
109                last_name=last_name
110            )
111
112            self.stdout.write(
113                self.style.SUCCESS(
114                    f'\nПользователь {username} успешно создан!'
115                )
116            )
117
118        except Exception as e:
119            raise CommandError(f'Ошибка создания пользователя: {e}')
120
121    def create_user_non_interactive(self, username, email):
122        """Неинтерактивное создание пользователя"""
123        # Генерируем случайный пароль
124        import secrets
125        import string
126
127        alphabet = string.ascii_letters + string.digits
128        password = ''.join(secrets.choice(alphabet) for i in range(12))
129
130        try:
131            user = User.objects.create_user(
132                username=username,
133                email=email,
134                password=password
135            )
136
137            self.stdout.write(
138                self.style.SUCCESS(
139                    f'Пользователь {username} создан с паролем: {password}'
140                )
141            )
142
143        except Exception as e:
144            raise CommandError(f'Ошибка создания пользователя: {e}')

Команды с прогресс-баром

  1from django.core.management.base import BaseCommand
  2from django.db import transaction
  3from tqdm import tqdm
  4import time
  5
  6class Command(BaseCommand):
  7    help = 'Массовое обновление данных с прогресс-баром'
  8
  9    def add_arguments(self, parser):
 10        parser.add_argument(
 11            '--model',
 12            type=str,
 13            required=True,
 14            help='Название модели для обновления'
 15        )
 16
 17        parser.add_argument(
 18            '--field',
 19            type=str,
 20            required=True,
 21            help='Поле для обновления'
 22        )
 23
 24        parser.add_argument(
 25            '--value',
 26            type=str,
 27            required=True,
 28            help='Новое значение'
 29        )
 30
 31        parser.add_argument(
 32            '--batch-size',
 33            type=int,
 34            default=1000,
 35            help='Размер пакета (по умолчанию: 1000)'
 36        )
 37
 38        parser.add_argument(
 39            '--dry-run',
 40            action='store_true',
 41            help='Тестовый режим без сохранения'
 42        )
 43
 44    def handle(self, *args, **options):
 45        model_name = options['model']
 46        field_name = options['field']
 47        new_value = options['value']
 48        batch_size = options['batch_size']
 49        dry_run = options['dry_run']
 50
 51        # Получаем модель
 52        try:
 53            from django.apps import apps
 54            model = apps.get_model(model_name)
 55        except Exception as e:
 56            raise CommandError(f'Не удалось получить модель {model_name}: {e}')
 57
 58        # Проверяем существование поля
 59        if not hasattr(model, field_name):
 60            raise CommandError(f'Поле {field_name} не существует в модели {model_name}')
 61
 62        # Получаем общее количество записей
 63        total_count = model.objects.count()
 64        if total_count == 0:
 65            self.stdout.write('Нет записей для обновления')
 66            return
 67
 68        self.stdout.write(f'Начинаем обновление {total_count} записей')
 69        self.stdout.write(f'Модель: {model_name}')
 70        self.stdout.write(f'Поле: {field_name}')
 71        self.stdout.write(f'Новое значение: {new_value}')
 72        self.stdout.write(f'Размер пакета: {batch_size}')
 73
 74        if dry_run:
 75            self.stdout.write(self.style.WARNING('ТЕСТОВЫЙ РЕЖИМ - данные не изменятся'))
 76
 77        # Обновляем данные пакетами
 78        updated_count = 0
 79        start_time = time.time()
 80
 81        with tqdm(total=total_count, desc="Обновление") as pbar:
 82            for offset in range(0, total_count, batch_size):
 83                batch = model.objects.all()[offset:offset + batch_size]
 84
 85                if not dry_run:
 86                    with transaction.atomic():
 87                        for obj in batch:
 88                            setattr(obj, field_name, new_value)
 89                            obj.save(update_fields=[field_name])
 90                            updated_count += 1
 91                else:
 92                    updated_count += len(batch)
 93
 94                pbar.update(len(batch))
 95
 96                # Небольшая пауза для снижения нагрузки на БД
 97                time.sleep(0.01)
 98
 99        execution_time = time.time() - start_time
100
101        if dry_run:
102            self.stdout.write(
103                self.style.WARNING(
104                    f'Тестовый режим завершен. Обновлено бы {updated_count} записей'
105                )
106            )
107        else:
108            self.stdout.write(
109                self.style.SUCCESS(
110                    f'Обновление завершено! Обновлено {updated_count} записей за {execution_time:.2f} секунд'
111                )
112            )

Лучшие практики

  • Всегда добавляй help текст - объясняй назначение команды и аргументов
  • Используй правильные типы аргументов - type=str, type=int, action='store_true'
  • Валидируй входные данные - проверяй корректность аргументов
  • Обрабатывай ошибки gracefully - используй CommandError для понятных сообщений
  • Добавляй прогресс-бары - для длительных операций
  • Используй batch обработку - для больших объемов данных
  • Добавляй dry-run режим - для тестирования без изменений
  • Логируй важные операции - для отладки и мониторинга
  • Используй цветной вывод - self.style.SUCCESS, self.style.ERROR
  • Добавляй verbose режим - для детального вывода

FAQ

Q: Как добавить валидацию аргументов?
A: Используй type функции для валидации, choices для ограничения значений, и кастомную логику в handle() с raise CommandError.

Q: В чем разница между позиционными и именованными аргументами?
A: Позиционные аргументы обязательны и идут в определенном порядке, именованные опциональны и могут идти в любом порядке.

Q: Как создать интерактивную команду?
A: Используй input() для запроса данных у пользователя, getpass.getpass() для паролей и циклы для валидации.

Q: Можно ли использовать команды в скриптах?
A: Да, используй call_command() из django.core.management для вызова команд из Python кода.

Q: Как обрабатывать большие объемы данных?
A: Используй batch обработку, iterator() для queryset, и прогресс-бары для отображения процесса.

Q: Что такое dry-run режим?
A: Режим тестирования, когда команда показывает, что было бы сделано, но не изменяет данные.

Q: Как добавить цветной вывод в командах?
A: Используй self.style.SUCCESS, self.style.ERROR, self.style.WARNING для цветного вывода в терминале.