Source code for kestrel_analytics_docker.interface

"""Docker analytics interface executes Kestrel analytics via docker.

An analytics using this interface should follow the rules:

- The analytics is built into a docker container reachable by the Python ``docker`` package.

- The name of the container should start with ``kestrel-analytics-``.

- The container will be launched with a mounted volumn ``/data/`` for exchanging input/output.

- The input Kestrel variables (all records) are put in ``/data/input/`` as
  ``0.parquet.gz``, ``1.parquet.gz``, ..., in the same order as they are passed
  to the ``APPLY`` command.

- The output (updated variable data) should be yielded by the analytics to
  ``/data/output/`` as ``0.parquet.gz``, ``1.parquet.gz``, ..., in the same
  order of the input variables. If a variable is unchanged, the output parquet
  file of it can be omitted.

- If a display object is yielded, the analytics should write it into
  ``/data/display/``.

"""

import docker
import logging
import pandas

from kestrel.analytics import AbstractAnalyticsInterface
from kestrel.exceptions import (
    InvalidAnalytics,
    AnalyticsManagerInternalError,
    AnalyticsError,
)
from kestrel.utils import mkdtemp
from kestrel_analytics_docker.config import (
    DOCKER_IMAGE_PREFIX,
    VAR_FILE_SUFFIX,
    INPUT_FOLDER,
    OUTPUT_FOLDER,
    DISP_FOLDER,
)
from kestrel.codegen.display import DisplayHtml

_logger = logging.getLogger(__name__)


[docs]class DockerInterface(AbstractAnalyticsInterface):
[docs] @staticmethod def schemes(): """Docker analytics interface only supports ``docker://`` scheme.""" return ["docker"]
[docs] @staticmethod def list_analytics(config=None): """Check docker for the list of Kestrel analytics.""" docker_images = docker.from_env().images.list() tags = [tag for img in docker_images for tag in img.attrs["RepoTags"]] image_names = [tag.split(":")[0] for tag in tags] analytics_names = [ img[len(DOCKER_IMAGE_PREFIX) :] for img in image_names if img.startswith(DOCKER_IMAGE_PREFIX) ] _logger.debug(f"Analytics names obtained: {str(analytics_names)}") return analytics_names
[docs] @staticmethod def execute(uri, argument_variables, config=None, session_id=None, parameters=None): """Execute an analytics.""" scheme, _, analytics_name = uri.rpartition("://") container_name = DOCKER_IMAGE_PREFIX + analytics_name if scheme != "docker": raise AnalyticsManagerInternalError( f"interface {__package__} should not process scheme {scheme}" ) shared_dir = mkdtemp().resolve() input_dir = shared_dir / INPUT_FOLDER input_dir.mkdir() output_dir = shared_dir / OUTPUT_FOLDER output_dir.mkdir() display_dir = shared_dir / DISP_FOLDER display_dir.mkdir() for index, var in enumerate(argument_variables): arg_var_file_name = str(index) + VAR_FILE_SUFFIX arg_var_file_path = str(input_dir / arg_var_file_name) var.dump_to_file(arg_var_file_path) dclient = docker.from_env() if not dclient.images.list(container_name): raise InvalidAnalytics( analytics_name, "docker", f"{container_name} is not an avaliable docker container.", ) # the execution of the container try: dclient.containers.run( container_name, volumes={str(shared_dir): {"bind": "/data", "mode": "rw"}}, environment=parameters, ) except docker.errors.ContainerError as e: error = e.stderr.decode("utf-8") _logger.error(error) raise AnalyticsError(f"{analytics_name} failed: {error}") # process returned/updated variables for index, var in enumerate(argument_variables): arg_var_file_name = str(index) + VAR_FILE_SUFFIX arg_var_file_path = str(output_dir / arg_var_file_name) var_data = None try: var_data = pandas.read_parquet(arg_var_file_path) except FileNotFoundError: _logger.info( f'analytics "{analytics_name}" has no output file {arg_var_file_name}.' ) except Exception as e: raise AnalyticsError( f'{e.__class__.__name__}: analytics "{analytics_name}" yielded invalid return data {arg_var_file_path}.' ) if var_data is None: continue if not "id" in var_data: raise AnalyticsError( f'analytics "{analytics_name}" yielded invalid return {arg_var_file_path} without "id".' ) if not "type" in var_data: raise AnalyticsError( f'analytics "{analytics_name}" yielded invalid return {arg_var_file_path} without "type".' ) var_data_type_set = set(var_data["type"]) var_data_type = var_data_type_set.pop() if var_data_type_set: raise AnalyticsError( f'analytics "{analytics_name}" yielded invalid return {arg_var_file_path} with inconsistent types' ) var_data_dict = var_data.to_dict(orient="records") var.store.reassign(var.entity_table, var_data_dict) # process returned display disp_files = list(display_dir.iterdir()) if disp_files: disp_file = disp_files.pop() if disp_files: raise AnalyticsError( f'analytics "{analytics_name}" yielded more than one display files' ) if disp_file.suffix == ".html": with open(disp_file, "r") as h: html = h.read() display = DisplayHtml(html) else: raise NotImplementedError else: display = None return display