<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># -*- coding: utf-8 -*-
import json
from operator import itemgetter
from pathlib import Path

from django.core.management import call_command
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS, connections
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.recorder import MigrationRecorder
from django.utils import timezone

from django_extensions.management.utils import signalcommand

DEFAULT_FILENAME = 'managestate.json'
DEFAULT_STATE = 'default'


class Command(BaseCommand):
    help = 'Manage database state in the convenient way.'
    conn = database = None
    _applied_migrations = None
    migrate_args: dict
    migrate_options: dict
    filename: str
    verbosity: int

    def add_arguments(self, parser):
        parser.add_argument(
            'action', choices=('dump', 'load'),
            help='An action to do. '
                 'Dump action saves applied migrations to a file. '
                 'Load action applies migrations specified in a file.',
        )
        parser.add_argument(
            'state', nargs='?', default=DEFAULT_STATE,
            help=f'A name of a state. Usually a name of a git branch. Defaults to "{DEFAULT_STATE}"',
        )
        parser.add_argument(
            '-d', '--database', default=DEFAULT_DB_ALIAS,
            help=f'Nominates a database to synchronize. Defaults to the "{DEFAULT_DB_ALIAS}" database.',
        )
        parser.add_argument(
            '-f', '--filename', default=DEFAULT_FILENAME,
            help=f'A file to write to. Defaults to "{DEFAULT_FILENAME}"',
        )

        # migrate command arguments
        parser.add_argument(
            '--noinput', '--no-input', action='store_false', dest='interactive',
            help='The argument for "migrate" command. '
                 'Tells Django to NOT prompt the user for input of any kind.',
        )
        parser.add_argument(
            '--fake', action='store_true',
            help='The argument for "migrate" command. '
                 'Mark migrations as run without actually running them.',
        )
        parser.add_argument(
            '--fake-initial', action='store_true',
            help='The argument for "migrate" command. '
                 'Detect if tables already exist and fake-apply initial migrations if so. Make sure '
                 'that the current database schema matches your initial migration before using this '
                 'flag. Django will only check for an existing table name.',
        )
        parser.add_argument(
            '--plan', action='store_true',
            help='The argument for "migrate" command. '
                 'Shows a list of the migration actions that will be performed.',
        )
        parser.add_argument(
            '--run-syncdb', action='store_true',
            help='The argument for "migrate" command. '
                 'Creates tables for apps without migrations.',
        )
        parser.add_argument(
            '--check', action='store_true', dest='check_unapplied',
            help='The argument for "migrate" command. '
                 'Exits with a non-zero status if unapplied migrations exist.',
        )

    @signalcommand
    def handle(self, action, database, filename, state, *args, **options):
        self.migrate_args = args
        self.migrate_options = options
        self.verbosity = options['verbosity']
        self.conn = connections[database]
        self.database = database
        self.filename = filename
        getattr(self, action)(state)

    def dump(self, state: str):
        """Save applied migrations to a file."""
        migrated_apps = self.get_migrated_apps()
        migrated_apps.update(self.get_applied_migrations())
        self.write({state: migrated_apps})
        self.stdout.write(self.style.SUCCESS(
            f'Migrations for state "{state}" have been successfully saved to {self.filename}.'
        ))

    def load(self, state: str):
        """Apply migrations from a file."""
        migrations = self.read().get(state)
        if migrations is None:
            raise CommandError(f'No such state saved: {state}')

        kwargs = {
            **self.migrate_options,
            'database': self.database,
            'verbosity': self.verbosity - 1 if self.verbosity &gt; 1 else 0
        }

        for app, migration in migrations.items():
            if self.is_applied(app, migration):
                continue

            if self.verbosity &gt; 1:
                self.stdout.write(self.style.WARNING(f'Applying migrations for "{app}"'))
            args = (app, migration, *self.migrate_args)
            call_command('migrate', *args, **kwargs)

        self.stdout.write(self.style.SUCCESS(
            f'Migrations for "{state}" have been successfully applied.'
        ))

    def get_migrated_apps(self) -&gt; dict:
        """Installed apps having migrations."""
        apps = MigrationLoader(self.conn).migrated_apps
        migrated_apps = dict.fromkeys(apps, 'zero')
        if self.verbosity &gt; 1:
            self.stdout.write('Apps having migrations: ' + ', '.join(sorted(migrated_apps)))
        return migrated_apps

    def get_applied_migrations(self) -&gt; dict:
        """Installed apps with last applied migrations."""
        if self._applied_migrations:
            return self._applied_migrations

        migrations = MigrationRecorder(self.conn).applied_migrations()
        last_applied = sorted(migrations.keys(), key=itemgetter(1))

        self._applied_migrations = dict(last_applied)
        return self._applied_migrations

    def is_applied(self, app: str, migration: str) -&gt; bool:
        """Check whether a migration for an app is applied or not."""
        applied = self.get_applied_migrations().get(app)
        if applied == migration:
            if self.verbosity &gt; 1:
                self.stdout.write(self.style.WARNING(
                    f'Migrations for "{app}" are already applied.'
                ))
            return True
        return False

    def read(self) -&gt; dict:
        """Get saved state from the file."""
        path = Path(self.filename)
        if not path.exists() or not path.is_file():
            raise CommandError(f'No such file: {self.filename}')

        with open(self.filename) as file:
            return json.load(file)

    def write(self, data: dict):
        """Write new data to the file using existent one."""
        try:
            saved = self.read()
        except CommandError:
            saved = {}

        saved.update(data, updated_at=str(timezone.now()))
        with open(self.filename, 'w') as file:
            json.dump(saved, file, indent=2, sort_keys=True)
</pre></body></html>