Source code for hypershell.submit

# SPDX-FileCopyrightText: 2025 Geoffrey Lentner
# SPDX-License-Identifier: Apache-2.0

Submit tasks to the database.

Any iterable of command lines can be submitted directly.
    >>> from hypershell.submit import submit_from
    >>> submit_from(['echo AA', 'echo BB', 'echo CC'])

A file stream is a valid iterable to pass to `submit_from`.
Use `submit_file` with the file path as shorthand.

    >>> from hypershell.submit import submit_file
    >>> submit_file('/path/to/commandlines.txt')

Embed a `SubmitThread` in your application directly as the `ServerThread` does.
Call `stop()` to stop early.

    >>> import sys
    >>> from hypershell.submit import SubmitThread
    >>> submit_thread =, bundlesize=10)

    In order for the `SubmitThread` to actively monitor the state set by `stop` and
    halt execution (a requirement because of how CPython does threading), the implementation
    uses a finite state machine. *You should not instantiate this machine directly*.

    Because the `SubmitThread` checks state actively to decide whether to halt, if your
    `source` is blocking (e.g., `sys.stdin`) it will not be able to halt immediately. If
    your main program exits however, the thread will be stopped regardless because it
    runs as a `daemon`.

# type annotations
from __future__ import annotations
from typing import List, Iterable, Iterator, IO, Optional, Dict, Callable, Type, Final
from types import TracebackType

# standard libs
import io
import sys
import functools
from enum import Enum
from datetime import datetime
from queue import Queue, Empty as QueueEmpty, Full as QueueFull

# external libs
from cmdkit.config import ConfigurationError
from import Application
from cmdkit.cli import Interface, ArgumentError

# internal libs
from hypershell.core.logging import Logger
from hypershell.core.config import config, default
from hypershell.core.fsm import State, StateMachine
from hypershell.core.queue import QueueClient, QueueConfig
from hypershell.core.thread import Thread
from hypershell.core.template import Template, DEFAULT_TEMPLATE
from hypershell.core.exceptions import get_shared_exception_mapping
from hypershell.core.types import JSONValue
from import Task
from import initdb, checkdb
from hypershell.task import Tag

# public interface
__all__ = ['submit_from', 'submit_file', 'SubmitThread', 'LiveSubmitThread',

# initialize logger
log = Logger.with_name(__name__)

class LoaderState(State, Enum):
    """Finite states of loader machine."""
    START = 0
    GET = 1
    PUT = 2
    FINAL = 3
    HALT = 4

class Loader(StateMachine):
    """Enqueue tasks from iterable source."""

    task: Task
    source: Iterator[str]
    queue: Queue[Optional[Task]]
    template: Template
    count: int
    tags: Dict[str, str]

    state = LoaderState.START
    states = LoaderState

    def __init__(self: Loader,
                 source: Iterable[str],
                 queue: Queue[Optional[Task]],
                 template: str = DEFAULT_TEMPLATE,
                 tags: Dict[str, str] = None) -> None:
        """Initialize source to read tasks and submit to database."""
        self.template = Template(template)
        self.source = map(self.template.expand, map(str.strip, map(str, source)))
        self.queue = queue
        self.tags = tags
        self.count = 0

    def actions(self: Loader) -> Dict[LoaderState, Callable[[], LoaderState]]:
        return {
            LoaderState.START: self.start,
            LoaderState.GET: self.get_task,
            LoaderState.PUT: self.put_task,
            LoaderState.FINAL: self.finalize,

    def start() -> LoaderState:
        """Jump to GET state."""
        log.debug('Started (loader)')
        return LoaderState.GET

    def get_task(self: Loader) -> LoaderState:
        """Get the next task from the source."""
            self.task =, tag=self.tags)
            log.trace(f'Loaded task ({self.task.args})')
            return LoaderState.PUT
        except StopIteration:
            return LoaderState.FINAL

    def put_task(self: Loader) -> LoaderState:
        """Enqueue loaded task."""
            self.queue.put(self.task, timeout=1)
            self.count += 1
            return LoaderState.GET
        except QueueFull:
            return LoaderState.PUT

    def finalize() -> LoaderState:
        """Return HALT."""
        log.debug('Done (loader)')
        return LoaderState.HALT

class LoaderThread(Thread):
    """Run loader within dedicated thread."""

    def __init__(self: LoaderThread,
                 source: Iterable[str],
                 queue: Queue[Optional[Task]],
                 template: str = DEFAULT_TEMPLATE,
                 tags: Dict[str, str] = None) -> None:
        """Initialize machine."""
        self.machine = Loader(source=source, queue=queue, template=template, tags=tags)

    def run_with_exceptions(self: LoaderThread) -> None:
        """Run machine."""

    def stop(self: LoaderThread, wait: bool = False, timeout: int = None) -> None:
        """Stop machine."""
        log.warning('Stopping (loader)')
        super().stop(wait=wait, timeout=timeout)

class DatabaseCommitterState(State, Enum):
    """Finite states for database submitter."""
    START = 0
    GET = 1
    COMMIT = 2
    FINAL = 3
    HALT = 4

DEFAULT_BUNDLESIZE: Final[int] = default.submit.bundlesize
"""Default size of task bundles."""

DEFAULT_BUNDLEWAIT: Final[int] = default.submit.bundlewait
"""Default waiting period before forcing task bundle push."""

class DatabaseCommitter(StateMachine):
    """Commit tasks from local queue to database."""

    queue: Queue[Optional[Task]]
    tasks: List[Task]
    bundlesize: int
    bundlewait: int
    previous_submit: datetime

    state = DatabaseCommitterState.START
    states = DatabaseCommitterState

    def __init__(self: DatabaseCommitter,
                 queue: Queue[Optional[Task]],
                 bundlesize: int = DEFAULT_BUNDLESIZE,
                 bundlewait: int = DEFAULT_BUNDLEWAIT) -> None:
        """Initialize with task queue and buffering parameters."""
        self.queue = queue
        self.tasks = []
        self.bundlesize = bundlesize
        self.bundlewait = bundlewait

    def actions(self: DatabaseCommitter) -> Dict[DatabaseCommitterState, Callable[[], DatabaseCommitterState]]:
        return {
            DatabaseCommitterState.START: self.start,
            DatabaseCommitterState.GET: self.get_task,
            DatabaseCommitterState.COMMIT: self.commit,
            DatabaseCommitterState.FINAL: self.finalize,

    def start(self: DatabaseCommitter) -> DatabaseCommitterState:
        """Jump to GET state."""
        log.debug('Started (committer: database)')
        self.previous_submit =
        return DatabaseCommitterState.GET

    def get_task(self: DatabaseCommitter) -> DatabaseCommitterState:
        """Get tasks from local queue and check buffer."""
            task = self.queue.get(timeout=1)
        except QueueEmpty:
            return DatabaseCommitterState.GET
        if task is not None:
            since_last = ( - self.previous_submit).total_seconds()
            if len(self.tasks) >= self.bundlesize or since_last >= self.bundlewait:
                return DatabaseCommitterState.COMMIT
                return DatabaseCommitterState.GET
            return DatabaseCommitterState.FINAL

    def commit(self: DatabaseCommitter) -> DatabaseCommitterState:
        """Commit tasks to database."""
        if self.tasks:
            log.debug(f'Submitted {len(self.tasks)} tasks')
            self.previous_submit =
        return DatabaseCommitterState.GET

    def finalize(self: DatabaseCommitter) -> DatabaseCommitterState:
        """Force final commit of tasks and halt."""
        log.debug('Done (committer: database)')
        return DatabaseCommitterState.HALT

class DatabaseCommitterThread(Thread):
    """Run committer within dedicated thread."""

    def __init__(self: DatabaseCommitterThread,
                 queue: Queue[Optional[Task]],
                 bundlesize: int = DEFAULT_BUNDLESIZE,
                 bundlewait: int = DEFAULT_BUNDLEWAIT) -> None:
        """Initialize machine."""
        self.machine = DatabaseCommitter(queue=queue, bundlesize=bundlesize, bundlewait=bundlewait)

    def run_with_exceptions(self: DatabaseCommitterThread) -> None:
        """Run machine."""

    def stop(self: DatabaseCommitterThread, wait: bool = False, timeout: int = None) -> None:
        """Stop machine."""
        log.warning('Stopping (committer: database)')
        super().stop(wait=wait, timeout=timeout)

[docs] class SubmitThread(Thread): """ Submit tasks to database within dedicated thread. Args: source (Iterable[str]): Any iterable of command-line tasks. bundlesize (int, optional): Size of task bundles. See :const:`DEFAULT_BUNDLESIZE`. bundlewait (int, optional): Waiting period before forcing task bundle push. See :const:`DEFAULT_BUNDLEWAIT`. template (str, optional): Task command-line template pattern. See :const:`DEFAULT_TEMPLATE`. tags (Dict[str, JSONValue], optional): Tag dictionary for all submitted tasks. Example: >>> from hypershell.submit import SubmitThread >>> submitter =['AAA', 'BBB', 'CCC'], ... template='my-script {}' ... tags={'site': 'zzz', 'group': 37}) >>> submitter.join() See Also: - :meth:`submit_from` - :meth:`submit_file` """ source: Iterable[str] queue: Queue[Optional[Task]] loader: LoaderThread committer: DatabaseCommitterThread def __init__(self: SubmitThread, source: Iterable[str], bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT, template: str = DEFAULT_TEMPLATE, tags: Dict[str, JSONValue] = None) -> None: """Initialize queue and child threads.""" self.source = source self.queue = Queue(maxsize=bundlesize) self.loader = LoaderThread(source=source, queue=self.queue, template=template, tags=tags) self.committer = DatabaseCommitterThread(queue=self.queue, bundlesize=bundlesize, bundlewait=bundlewait) super().__init__(name='hypershell-submit') def run_with_exceptions(self: SubmitThread) -> None: """Start child threads, wait.""" log.debug(f'Started ({self.source_name})') self.loader.start() self.committer.start() self.loader.join() self.queue.put(None) self.committer.join() log.debug('Done') @functools.cached_property def source_name(self: SubmitThread) -> str: """Log details of source.""" if self.source is sys.stdin: return '<stdin>' elif isinstance(self.source, io.TextIOWrapper): return else: return '<iterable>'
[docs] def stop(self: SubmitThread, wait: bool = False, timeout: int = None) -> None: """Stop child threads before main thread.""" log.warning('Stopping') self.loader.stop(wait=wait, timeout=timeout) self.queue.put(None) self.committer.stop(wait=wait, timeout=timeout) super().stop(wait=wait, timeout=timeout)
@property def task_count(self: SubmitThread) -> int: """Count of submitted tasks.""" return self.loader.machine.count
class QueueCommitterState(State, Enum): """Finite states for queue submitter.""" START = 0 GET = 1 PACK = 2 COMMIT = 3 FINAL = 4 HALT = 5 class QueueCommitter(StateMachine): """Commit tasks from local queue directly to remote server queue.""" local: Queue[Optional[Task]] client: QueueClient tasks: List[Task] bundle: List[bytes] bundlesize: int bundlewait: int previous_submit: datetime state = QueueCommitterState.START states = QueueCommitterState def __init__(self: QueueCommitter, local: Queue[Optional[Task]], client: QueueClient, bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT) -> None: """Initialize with queue handles and buffering parameters.""" self.local = local self.client = client self.tasks = [] self.bundle = [] self.bundlesize = bundlesize self.bundlewait = bundlewait @functools.cached_property def actions(self: QueueCommitter) -> Dict[QueueCommitterState, Callable[[], QueueCommitterState]]: return { QueueCommitterState.START: self.start, QueueCommitterState.GET: self.get_task, QueueCommitterState.PACK: self.pack_bundle, QueueCommitterState.COMMIT: self.commit, QueueCommitterState.FINAL: self.finalize, } def start(self: QueueCommitter) -> QueueCommitterState: """Jump to GET state.""" log.debug('Started (committer: no database)') self.previous_submit = return QueueCommitterState.GET def get_task(self: QueueCommitter) -> QueueCommitterState: """Get tasks from local queue and check buffer.""" try: task = self.local.get(timeout=1) except QueueEmpty: return QueueCommitterState.GET if task is not None: self.tasks.append(task) since_last = ( - self.previous_submit).total_seconds() if len(self.tasks) >= self.bundlesize or since_last >= self.bundlewait: return QueueCommitterState.PACK else: return QueueCommitterState.GET else: return QueueCommitterState.FINAL def pack_bundle(self: QueueCommitter) -> QueueCommitterState: """Pack tasks into bundle for remote queue.""" if self.tasks: self.bundle = [task.pack() for task in self.tasks] return QueueCommitterState.COMMIT else: return QueueCommitterState.GET def commit(self: QueueCommitter) -> QueueCommitterState: """Commit tasks to server scheduling queue.""" try: if self.tasks: self.client.scheduled.put(self.bundle, timeout=2) for task in self.tasks: log.trace(f'Scheduled task ({})') self.tasks = [] self.bundle = [] self.previous_submit = return QueueCommitterState.GET except QueueFull: return QueueCommitterState.COMMIT def finalize(self) -> QueueCommitterState: """Force final commit of tasks and halt.""" self.pack_bundle() self.commit() log.debug('Done (committer: no database)') return QueueCommitterState.HALT class QueueCommitterThread(Thread): """Run queue committer within dedicated thread.""" def __init__(self: QueueCommitterThread, local: Queue[Optional[Task]], client: QueueClient, bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT) -> None: """Initialize machine.""" super().__init__(name='hypershell-submit-committer') self.machine = QueueCommitter(local=local, client=client, bundlesize=bundlesize, bundlewait=bundlewait) def run_with_exceptions(self: QueueCommitterThread) -> None: """Run machine.""" def stop(self: QueueCommitterThread, wait: bool = False, timeout: int = None) -> None: """Stop machine.""" log.warning('Stopping (committer: no database)') self.machine.halt() super().stop(wait=wait, timeout=timeout)
[docs] class LiveSubmitThread(Thread): """ Submit tasks directly to queue within dedicated thread. Args: source (Iterable[str]): Any iterable of command-line tasks. queue_config (:class:`~hypershell.core.queue.QueueConfig`): QueueConfig instance with `host`, `port`, and `auth`. bundlesize (int, optional): Size of task bundles. See :const:`DEFAULT_BUNDLESIZE`. bundlewait (int, optional): Waiting period before forcing task bundle push. See :const:`DEFAULT_BUNDLEWAIT`. template (str, optional): Task command-line template pattern. See :const:`DEFAULT_TEMPLATE`. tags (Dict[str, JSONValue], optional): Tag dictionary for all submitted tasks. Example: >>> from hypershell.submit import LiveSubmitThread >>> from hypershell.core.queue import QueueConfig >>> queue_config = QueueConfig(host='localhost', port=54321, auth='my-secret-key') >>> submitter =['AAA', 'BBB', 'CCC'], ... queue_config=queue_config, ... template='my-script {}' ... tags={'site': 'zzz', 'group': 37}) >>> submitter.join() See Also: - :meth:`submit_from` - :meth:`submit_file` """ source: Iterable[str] local: Queue[Optional[Task]] client: QueueClient loader: LoaderThread committer: QueueCommitterThread def __init__(self: LiveSubmitThread, source: Iterable[str], queue_config: QueueConfig, template: str = DEFAULT_TEMPLATE, bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT, tags: Dict[str, str] = None) -> None: """Initialize queue and child threads.""" self.source = source self.local = Queue(maxsize=bundlesize) self.loader = LoaderThread(source=source, queue=self.local, template=template, tags=tags) self.client = QueueClient(config=queue_config) self.committer = QueueCommitterThread(local=self.local, client=self.client, bundlesize=bundlesize, bundlewait=bundlewait) super().__init__(name='hypershell-submit') def run_with_exceptions(self: LiveSubmitThread) -> None: """Start child threads, wait.""" log.debug(f'Started ({self.source_name})') with self.client: self.loader.start() self.committer.start() log.trace('Waiting (loader)') self.loader.join() self.local.put(None) log.trace('Waiting (committer)') self.committer.join() log.debug('Done') @functools.cached_property def source_name(self: LiveSubmitThread) -> str: """Log details of source.""" if self.source is sys.stdin: return '<stdin>' elif isinstance(self.source, io.TextIOWrapper): return else: return '<iterable>'
[docs] def stop(self: LiveSubmitThread, wait: bool = False, timeout: int = None) -> None: """Stop child threads before main thread.""" log.warning('Stopping') self.loader.stop(wait=wait, timeout=timeout) self.local.put(None) self.committer.stop(wait=wait, timeout=timeout) super().stop(wait=wait, timeout=timeout)
@property def task_count(self: LiveSubmitThread) -> int: """Count of submitted tasks.""" return self.loader.machine.count
[docs] def submit_from(source: Iterable[str], queue_config: QueueConfig = None, bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT, template: str = DEFAULT_TEMPLATE, tags: Dict[str, JSONValue] = None) -> int: """ Submit all task arguments from `source`, return count of submitted tasks. If `queue_config` is provided, the :class:`LiveSubmitThread` is used to submit task bundles directly to the shared queue hosted by the server. Otherwise, the :class:`SubmitThread` submits tasks to the database. Args: source (Iterable[str]): Any iterable of command-line tasks. queue_config (:class:`~hypershell.core.queue.QueueConfig`): QueueConfig instance with `host`, `port`, and `auth`. bundlesize (int, optional): Size of task bundles. See :const:`DEFAULT_BUNDLESIZE`. bundlewait (int, optional): Waiting period before forcing task bundle push. See :const:`DEFAULT_BUNDLEWAIT`. template (str, optional): Task command-line template pattern. See :const:`DEFAULT_TEMPLATE`. tags (Dict[str, JSONValue], optional): Tag dictionary for all submitted tasks. Example: >>> from hypershell.submit import submit_from >>> submit_from(['AAA', 'BBB', 'CCC'], ... template='my-script {}' ... tags={'site': 'zzz', 'group': 37}) 3 See Also: - :meth:`submit_file` - :class:`SubmitThread` - :class:`LiveSubmitThread` Returns: task_count (int): Count of submitted tasks. """ if not queue_config: thread =, bundlesize=bundlesize, bundlewait=bundlewait, template=template, tags=tags) else: thread =, queue_config=queue_config, template=template, bundlesize=bundlesize, bundlewait=bundlewait, tags=tags) try: thread.join() except Exception: thread.stop() raise else: return thread.task_count
[docs] def submit_file(path: str, queue_config: QueueConfig = None, bundlesize: int = DEFAULT_BUNDLESIZE, bundlewait: int = DEFAULT_BUNDLEWAIT, template: str = DEFAULT_TEMPLATE, tags: Dict[str, JSONValue] = None, **file_options) -> int: """ Submit all task arguments by reading them from file `path`. Arguments are forwarded to :func:`submit_from` with the opened file stream from `path` as the task `source`. Args: path (str): Path to file containing command-line tasks. queue_config (:class:`~hypershell.core.queue.QueueConfig`): QueueConfig instance with `host`, `port`, and `auth`. bundlesize (int, optional): Size of task bundles. See :const:`DEFAULT_BUNDLESIZE`. bundlewait (int, optional): Waiting period before forcing task bundle push. See :const:`DEFAULT_BUNDLEWAIT`. template (str, optional): Task command-line template pattern. See :const:`DEFAULT_TEMPLATE`. tags (Dict[str, JSONValue], optional): Tag dictionary for all submitted tasks. Example: >>> from hypershell.submit import submit_from >>> from hypershell.core.queue import QueueConfig >>> queue_config = QueueConfig(host='', port=54321, auth='my-secret-key') >>> submit_file('/tmp/', ... queue_config=queue_config, ... template='my-script {}' ... tags={'site': 'zzz', 'group': 37}) 3 See Also: - :meth:`submit_from` - :class:`SubmitThread` - :class:`LiveSubmitThread` Returns: task_count (int): Count of submitted tasks. """ with open(path, mode='r', **file_options) as stream: return submit_from(stream, queue_config=queue_config, bundlesize=bundlesize, bundlewait=bundlewait, template=template, tags=tags)
APP_NAME = 'hs submit' APP_USAGE = f"""\ Usage: {APP_NAME} [-h] [FILE] [-b NUM] [-w SEC] [-t CMD] [--initdb] [--tag TAG [TAG...]] Submit tasks from a file.\ """ APP_HELP = f"""\ {APP_USAGE} Arguments: FILE Path to task file ("-" for <stdin>). Options: -t, --template CMD Submit-time template expansion (default: "{DEFAULT_TEMPLATE}"). -b, --bundlesize NUM Number of lines to buffer (default: {DEFAULT_BUNDLESIZE}). -w, --bundlewait SEC Seconds to wait before flushing tasks (default: {DEFAULT_BUNDLEWAIT}). --initdb Auto-initialize database. --tag TAG... Assign tags as `key:value`. -h, --help Show this message and exit.\ """ class SubmitApp(Application): """Submit tasks to the database.""" name = APP_NAME interface = Interface(APP_NAME, APP_USAGE, APP_HELP) source: IO filepath: str interface.add_argument('filepath', nargs='?', default='-') bundlesize: int = config.submit.bundlesize interface.add_argument('-b', '--bundlesize', type=int, default=bundlesize) bundlewait: int = config.submit.bundlewait interface.add_argument('-w', '--bundlewait', type=int, default=bundlewait) template: str = DEFAULT_TEMPLATE interface.add_argument('-t', '--template', default=template) auto_initdb: bool = False interface.add_argument('--initdb', action='store_true', dest='auto_initdb') tags: Dict[str, JSONValue] = {} taglist: List[str] = None interface.add_argument('--tag', nargs='*', default=[], dest='taglist') count: int = 0 exceptions = { **get_shared_exception_mapping(__name__) } def run(self: SubmitApp) -> None: """Run submit thread.""" self.submit_all()'Submitted {self.count} tasks') def submit_all(self: SubmitApp) -> None: """Submit all tasks from source.""" self.count = submit_from(self.source, template=self.template, bundlesize=self.bundlesize, bundlewait=self.bundlewait, tags=self.tags) @staticmethod def check_config(): """Halt if we are not connected to database.""" db = config.database.get('file', None) or config.database.get('database', None) if config.database.provider == 'sqlite' and db in ('', ':memory:', None): raise ConfigurationError('Submitting tasks to in-memory database has no effect') def check_tags(self: SubmitApp) -> None: """Ensure valid tags.""" self.tags = {} if not self.taglist else Tag.parse_cmdline_list(self.taglist) try: Task.ensure_valid_tag(self.tags) except (ValueError, TypeError) as error: raise ArgumentError(str(error)) from error def __enter__(self: SubmitApp) -> SubmitApp: """Open file if not stdin.""" self.source = sys.stdin if self.filepath == '-' else open(self.filepath, mode='r') self.check_config() self.check_tags() if config.database.provider == 'sqlite' or self.auto_initdb: initdb() # Auto-initialize if local sqlite provider else: checkdb() return self def __exit__(self: SubmitApp, exc_type: Optional[Type[Exception]], exc_val: Optional[Exception], exc_tb: Optional[TracebackType]) -> None: """Close file if not stdin.""" if self.source is not sys.stdin: self.source.close()