Source code for kestrel_datasource_stixshifter.interface

"""STIX Shifter data source package provides access to data sources via
stix-shifter.

Before use, need to install the target stix-shifter connector packages such as
``stix-shifter-modules-carbonblack``.

The STIX Shifter interface can reach multiple data sources. The user needs to
setup one *profile* per data source. The profile name will be used in the
``FROM`` clause of the Kestrel ``GET`` command, e.g., ``newvar = GET entity-type
FROM stixshifter://profilename WHERE ...``. Kestrel runtime will load the profile
for the used profile from environment variables:

- ``STIXSHIFTER_PROFILENAME_CONNECTOR``: the STIX Shifter connector name, e.g., ``elastic_ecs``.
- ``STIXSHIFTER_PROFILENAME_CONNECTION``: the STIX Shifter `connection <https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#connection>`_ object in JSON string.
- ``STIXSHIFTER_PROFILENAME_CONFIG``: the STIX Shifter `configuration <https://github.com/opencybersecurityalliance/stix-shifter/blob/master/OVERVIEW.md#configuration>`_ object in JSON string.

Properties of profile name:

- Not case sensitive, e.g., ``profileX`` in the Kestrel command will match
  ``STIXSHIFTER_PROFILEX_...`` in environment variables.
- Cannot contain ``_``.

"""

import os
import json
import time

from stix_shifter.stix_translation import stix_translation
from stix_shifter.stix_transmission import stix_transmission

from kestrel.utils import mkdtemp
from kestrel.datasource import AbstractDataSourceInterface
from kestrel.datasource import ReturnFromFile
from kestrel.exceptions import (
    InvalidDataSource,
    DataSourceError,
    DataSourceManagerInternalError,
)

from kestrel_datasource_stixshifter.config import ENV_VAR_PREFIX, RETRIEVAL_BATCH_SIZE


[docs]class StixShifterInterface(AbstractDataSourceInterface):
[docs] @staticmethod def schemes(): """STIX Shifter data source interface only supports ``stixshifter://`` scheme.""" return ["stixshifter"]
[docs] @staticmethod def list_data_sources(): """Get configured data sources from environment variable profiles.""" data_sources = [] env_vars = os.environ.keys() stixshifter_vars = filter(lambda x: x.startswith(ENV_VAR_PREFIX), env_vars) for evar in stixshifter_vars: profile = evar.split("_")[1].lower() if profile not in data_sources: data_sources.append(profile) data_sources.sort() return data_sources
[docs] @staticmethod def query(uri, pattern, session_id=None): """Query a stixshifter data source.""" scheme, _, profile = uri.rpartition("://") profiles = profile.split(",") if scheme != "stixshifter": raise DataSourceManagerInternalError( f"interface {__package__} should not process scheme {scheme}" ) ingestdir = mkdtemp() query_id = ingestdir.name bundles = [] for i, profile in enumerate(profiles): ( connector_name, connection_dict, configuration_dict, ) = StixShifterInterface._get_stixshifter_config(profile) data_path_striped = "".join(filter(str.isalnum, profile)) ingestfile = ingestdir / f"{i}_{data_path_striped}.json" query_metadata = json.dumps( {"id": "identity--" + query_id, "name": connector_name} ) translation = stix_translation.StixTranslation() transmission = stix_transmission.StixTransmission( connector_name, connection_dict, configuration_dict ) dsl = translation.translate( connector_name, "query", query_metadata, pattern, {} ) if "error" in dsl: raise DataSourceError( f"STIX-shifter translation failed with message: {dsl['error']}" ) # query results should be put together; when translated to STIX, the relation between them will remain connector_results = [] for query in dsl["queries"]: search_meta_result = transmission.query(query) if search_meta_result["success"]: search_id = search_meta_result["search_id"] if transmission.is_async(): time.sleep(1) status = transmission.status(search_id) if status["success"]: while ( status["progress"] < 100 and status["status"] == "RUNNING" ): status = transmission.status(search_id) else: stix_shifter_error_msg = ( status["error"] if "error" in status else "details not avaliable" ) raise DataSourceError( f"STIX-shifter transmission.status() failed with message: {stix_shifter_error_msg}" ) result_retrieval_offset = 0 has_remaining_results = True while has_remaining_results: result_batch = transmission.results( search_id, result_retrieval_offset, RETRIEVAL_BATCH_SIZE ) if result_batch["success"]: new_entries = result_batch["data"] if new_entries: connector_results += new_entries result_retrieval_offset += RETRIEVAL_BATCH_SIZE if len(new_entries) < RETRIEVAL_BATCH_SIZE: has_remaining_results = False else: has_remaining_results = False else: stix_shifter_error_msg = ( result_batch["error"] if "error" in result_batch else "details not avaliable" ) raise DataSourceError( f"STIX-shifter transmission.results() failed with message: {stix_shifter_error_msg}" ) else: stix_shifter_error_msg = ( search_meta_result["error"] if "error" in search_meta_result else "details not avaliable" ) raise DataSourceError( f"STIX-shifter transmission.query() failed with message: {stix_shifter_error_msg}" ) stixbundle = translation.translate( connector_name, "results", query_metadata, json.dumps(connector_results), {}, ) with ingestfile.open("w") as ingest: json.dump(stixbundle, ingest, indent=4) bundles.append(str(ingestfile.resolve())) return ReturnFromFile(query_id, bundles)
@staticmethod def _get_stixshifter_config(profile_name): profile_name = profile_name.upper() env_conr_name = f"{ENV_VAR_PREFIX}{profile_name}_CONNECTOR" connector_name = os.getenv(env_conr_name) if not connector_name: raise InvalidDataSource( profile_name, "stixshifter", f"no {env_conr_name} environment variable found", ) connector_name = connector_name.lower() env_conn_name = f"{ENV_VAR_PREFIX}{profile_name}_CONNECTION" connection = os.getenv(env_conn_name) if not connection: raise InvalidDataSource( profile_name, "stixshifter", f"no {env_conn_name} environment variable found", ) env_conf_name = f"{ENV_VAR_PREFIX}{profile_name}_CONFIG" configuration = os.getenv(env_conf_name) if not configuration: raise InvalidDataSource( profile_name, "stixshifter", f"no {env_conf_name} environment variable found", ) try: connection = json.loads(connection) except json.decoder.JSONDecodeError: raise InvalidDataSource( profile_name, "stixshifter", f"invalid JSON in {env_conn_name} environment variable", ) if "host" not in connection: raise InvalidDataSource( profile_name, "stixshifter", f'invalid {env_conn_name} environment variable: no "host" field', ) if "port" not in connection and connector_name != "stix_bundle": raise InvalidDataSource( profile_name, "stixshifter", f'invalid {env_conn_name} environment variable: no "port" field', ) try: configuration = json.loads(configuration) except json.decoder.JSONDecodeError: raise InvalidDataSource( profile_name, "stixshifter", f"invalid JSON in {env_conf_name} environment variable", ) if "auth" not in configuration: raise InvalidDataSource( profile_name, "stixshifter", f'invalid {env_conf_name} environment variable: no "auth" field', ) return connector_name, connection, configuration