Source code for hypershell.data

# SPDX-FileCopyrightText: 2025 Geoffrey Lentner
# SPDX-License-Identifier: Apache-2.0

"""Database interface, models, and methods."""


# Type annotations
from __future__ import annotations
from typing import Tuple, Final

# Standard libs
import os
import sys
import functools

# External libs
from cmdkit.app import Application, exit_status
from cmdkit.cli import Interface, ArgumentError
from cmdkit.config import ConfigurationError
from sqlalchemy import inspect, text, type_coerce
from sqlalchemy.orm import close_all_sessions, sessionmaker, Session as SessionType
from sqlalchemy.exc import OperationalError
from sqlalchemy.engine import create_engine

# Internal libs
from hypershell.core.logging import Logger
from hypershell.core.config import config
from hypershell.core.exceptions import handle_exception, DatabaseUninitialized, get_shared_exception_mapping
from hypershell.core.pretty_print import format_bytes
from hypershell.data.core import Session, engine, in_memory, schema
from hypershell.data.model import Entity, Task, JSON

# Public interface
__all__ = [
    'initdb', 'truncatedb', 'checkdb', 'ensuredb', 'vacuumdb', 'rotatedb',
    'DATABASE_ENABLED', 'DATABASE_PROVIDER',
    'InitDBApp',
]

# Initialize logger
log = Logger.with_name(__name__)


DATABASE_ENABLED: Final[bool] = not in_memory
"""Set if database has been configured."""

DATABASE_PROVIDER: Final[str] = config.database.provider
"""Either sqlite or postgres."""

DATABASE_HOST: Final[str] = config.database.get('host', 'localhost')
"""Database server hostname (default: localhost)."""

DATABASE_SITE: Final[str] = DATABASE_HOST if DATABASE_PROVIDER != 'sqlite' else config.database.get('file', ':memory:')
"""Database server hostname (Postgres) or file path (SQLite)."""

DATABASE_INFO: Final[str] = f'{config.database.provider} ({DATABASE_SITE})'
"""Concise connection info for database."""


[docs]def initdb(optimize: bool = False) -> None: """Initialize database tables.""" Entity.metadata.create_all(engine) if optimize and DATABASE_PROVIDER == 'sqlite': log.info(f'Optimizing database {DATABASE_SITE}') Session.execute(text('PRAGMA optimize'))
[docs]def truncatedb() -> None: """Truncate database tables.""" # NOTE: We still might hang here if other sessions exist outside this app instance close_all_sessions() log.trace('Dropping all tables') Entity.metadata.drop_all(engine) log.trace('Creating all tables') Entity.metadata.create_all(engine) log.warning(f'Truncated database')
[docs]def checkdb() -> None: """Ensure database connection and tables exist.""" if not inspect(engine).has_table('task', schema=schema): raise DatabaseUninitialized('Use \'initdb\' to initialize the database')
[docs]def ensuredb(auto_init: bool = False) -> None: """ Ensure database configuration before applying any operations. If SQLite and `auto_init` we run :meth:`initdb`, else :meth:`checkdb`. """ db = config.database.get('file', None) or config.database.get('database', None) if DATABASE_PROVIDER == 'sqlite' and db in ('', ':memory:', None): raise ConfigurationError('Missing database configuration') if DATABASE_PROVIDER == 'sqlite' or auto_init is True: initdb() else: checkdb()
[docs]def vacuumdb(path: str = None) -> None: """Apply database vacuum (optionally into backup location for SQLite).""" if not path: log.info(f'Vacuuming database {DATABASE_SITE}') if DATABASE_PROVIDER == 'sqlite': size_before = os.path.getsize(DATABASE_SITE) Session.execute(text('VACUUM')) Session.execute(text('PRAGMA optimize')) size_after = os.path.getsize(DATABASE_SITE) log.info(f'Cleaned {format_bytes(size_before - size_after)} from {DATABASE_SITE}') else: # VACUUM cannot run inside a transaction block for PostgreSQL autocommit_engine = engine.execution_options(isolation_level='AUTOCOMMIT') with SessionType(autocommit_engine) as session: session.execute(text('VACUUM')) else: if DATABASE_PROVIDER != 'sqlite': raise RuntimeError(f'{DATABASE_PROVIDER} cannot backup database into file ({path})') log.info(f'Backing up {DATABASE_SITE} into {path}') Session.execute(text(f'VACUUM INTO :path'), params={'path': path})
[docs]def rotatedb() -> None: """Split main database into next partition (SQLite only).""" if DATABASE_PROVIDER != 'sqlite': raise RuntimeError(f'Cannot rotate database with {DATABASE_PROVIDER} provider') part_id, part_path = next_rotate_path() log.info(f'Rotating database {DATABASE_SITE} into {part_path}') # Mark completed tasks as having part:N # We cannot simply drop completed tasks naively as more tasks may be updated in the # time between vacuuming to the new partition and the drop step ( Session.query(Task) .filter(Task.exit_status.isnot(None)) .update({Task.tag: text('json_set(task.tag, :k, :v)').params({'k': '$.part', 'v': part_id})}) ) Session.commit() # Clone entire database to new file # Previously marked tasks as part:N can then be dropped from main database log.debug(f'Vacuuming into {part_path}') Session.execute(text(f'VACUUM INTO :path'), params={'path': part_path}) count_deleted = Session.query(Task).filter(Task.tag['part'] == type_coerce(part_id, JSON)).delete() Session.commit() log.debug(f'Dropped {count_deleted} completed tasks from main ({DATABASE_SITE})') # Now vacuum main database to reclaim space and optimize log.debug(f'Vacuuming main database ({DATABASE_SITE})') Session.execute(text('VACUUM')) # Now we can drop anything in the new partition not belonging to part:N with sessionmaker(bind=create_engine(f'sqlite:///{part_path}'))() as external_session: count_deleted = external_session.query(Task).filter(Task.tag['part'] != type_coerce(part_id, JSON)).delete() external_session.commit() external_session.execute(text('VACUUM')) log.debug(f'Dropped {count_deleted} remaining tasks from partition ({part_path})')
def next_rotate_path() -> Tuple[int, str]: """ Choose next file path (increment suffix by one). We enumerate files in the containing directory and pick the next integer. Suffixes are dropped (e.g., main.db -> main.1, main.2, ...). """ dirname = os.path.dirname(config.database.file) filename, _ = os.path.splitext(os.path.basename(config.database.file)) n = 1 while os.path.isfile(rotated_filename := os.path.join(dirname, f'{filename}.{n}')): n += 1 return n, rotated_filename INITDB_PROGRAM = 'hs initdb' INITDB_USAGE = f"""\ Usage: {INITDB_PROGRAM} [-h] [--truncate | --vacuum | --rotate | --backup PATH] [--yes] Initialize database.\ """ INITDB_HELP = f"""\ {INITDB_USAGE} For SQLite this happens automatically. See also --initdb for the `hs cluster` command. The available special actions are mutually exclusive. The --rotate operation migrates completed tasks to the next database partition, and applies a special purpose `part:N` tag to the new partition and remaining tasks. Actions: --vacuum Vacuum an existing database. --backup PATH Vacuum into backup file (SQLite only). --rotate Rotate completed tasks to new database (SQLite only). -t, --truncate Truncate database (task metadata will be lost). Options: -y, --yes Auto-confirm action (default will prompt). -h, --help Show this message and exit.\ """ class InitDBApp(Application): """Initialize database (not needed for SQLite).""" interface = Interface(INITDB_PROGRAM, INITDB_USAGE, INITDB_HELP) ALLOW_NOARGS = True truncate_mode: bool = False vacuum_mode: bool = False rotate_mode: bool = False backup_path: str | None = None action_interface = interface.add_mutually_exclusive_group() action_interface.add_argument('-t', '--truncate', action='store_true', dest='truncate_mode') action_interface.add_argument('-v', '--vacuum', action='store_true', dest='vacuum_mode') action_interface.add_argument('-r', '--rotate', action='store_true', dest='rotate_mode') action_interface.add_argument('-b', '--backup', dest='backup_path') auto_confirm: bool = False interface.add_argument('-y', '--yes', action='store_true', dest='auto_confirm') exceptions = { OperationalError: functools.partial(handle_exception, logger=log, status=exit_status.runtime_error), **get_shared_exception_mapping(__name__), } def run(self: InitDBApp) -> None: """Run database operations.""" self.check_arguments() if self.vacuum_mode: if self.confirm_action(f'Vacuum {DATABASE_INFO}'): vacuumdb() elif self.backup_path: if self.confirm_action(f'Backup {DATABASE_INFO} into \'{self.backup_path}\''): vacuumdb(self.backup_path) elif self.rotate_mode: part_id, part_path = next_rotate_path() if self.confirm_action(f'Rotate {DATABASE_INFO} to \'{part_path}\''): rotatedb() elif self.truncate_mode: if self.confirm_action(f'Truncate {DATABASE_INFO}: {Task.count()} tasks'): truncatedb() else: if config.database.provider == 'sqlite': log.info('SQLite database initialized automatically') initdb(optimize=True) return if self.confirm_action(f'Initialize {DATABASE_INFO}'): initdb() def check_arguments(self: InitDBApp) -> None: """Check configuration and given command-line arguments.""" if not DATABASE_ENABLED: raise ConfigurationError('No database configured') if config.database.provider != 'sqlite' and self.backup_path: raise ArgumentError('Can only backup SQLite') if config.database.provider != 'sqlite' and self.rotate_mode: raise ArgumentError('Can only rotate SQLite') if self.backup_path and os.path.exists(self.backup_path): raise RuntimeError(f'Backup path already exists ({self.backup_path})') if not sys.stdout.isatty() and not self.auto_confirm: raise RuntimeError('Non-interactive prompt cannot confirm (see --yes).') def confirm_action(self: InitDBApp, message: str) -> bool: """True if okay to proceed, else False.""" if self.auto_confirm: return True response = input(f'{message}? [Y]es/no: ').strip() if response.lower() in ['', 'y', 'yes']: print('Ok') return True if response.lower() in ['n', 'no']: print('Stopping') return False else: raise RuntimeError(f'Stopping (invalid response: "{response}")')