Source code for kestrel_datasource_stixbundle.interface

"""The STIX bundle data source package provides access to canned data in STIX
bundles locally or remotely.

"""

import json
import logging
import os
import re
import uuid
import pathlib
import shutil
from datetime import datetime, timedelta, timezone
import requests

from stix2matcher.matcher import Pattern

from firepit.woodchipper import convert_to_stix

from firepit.timestamp import to_datetime
from kestrel.datasource import AbstractDataSourceInterface
from kestrel.datasource import ReturnFromFile
from kestrel.exceptions import DataSourceManagerInternalError, DataSourceConnectionError

_logger = logging.getLogger(__name__)


def _make_query_id(uri, pattern):
    return str(uuid.uuid5(uuid.NAMESPACE_URL, str(uri) + pattern))


def _make_query_dir(query_id):
    path = pathlib.Path(query_id)
    path.mkdir(parents=True, exist_ok=False)
    return path


def _make_download_dir():
    path = pathlib.Path("downloads")
    path.mkdir(parents=True, exist_ok=True)
    return path


def _clean_ingestdir_and_raise_error(ingestdir, uri):
    # it is important to clean the directory before raise error
    # otherwise, the next execution will find the dir and assume good data there
    shutil.rmtree(ingestdir)
    raise DataSourceConnectionError(uri)


def fixup_pattern(pattern):
    # The matcher doesn't accept TimestampLiterals in START/STOP
    # See https://github.com/oasis-open/cti-pattern-validator/issues/52
    return re.sub(r"(START|STOP)\s+t'", r"\1 '", pattern)


[docs]class StixBundleInterface(AbstractDataSourceInterface):
[docs] @staticmethod def schemes(): """STIX bundle data source interface supporting ``file:///``, ``http://``, ``https://`` scheme.""" return ["file", "http", "https"]
[docs] @staticmethod def list_data_sources(config=None): """This interface does not list data sources.""" return []
[docs] @staticmethod def query(uri, pattern, session_id=None, config=None, store=None, limit=None): """Query a STIX bundle locally or remotely.""" _logger.debug(f"query URI received at interface_stixbundle: {uri}") scheme, _, data_paths = uri.rpartition("://") data_paths = data_paths.split(",") pattern = fixup_pattern(pattern) compiled_pattern = Pattern(pattern) query_id = _make_query_id(uri, pattern) downloaddir = _make_download_dir() try: ingestdir = _make_query_dir(query_id) except FileExistsError: # We already cached this bundle data_paths = [] bundles = [] num_records = 0 for i, data_path in enumerate(data_paths): _logger.debug(f"requesting data from path: {data_path}") if scheme == "file": rawfile = data_path try: with open(data_path, "r") as f: bundle_in = json.load(f) except Exception: _clean_ingestdir_and_raise_error(ingestdir, uri) elif scheme == "http" or scheme == "https": data_uri = f"{scheme}://{data_path}" data_path, extension = os.path.splitext(data_path) data_path_stripped = "".join(filter(str.isalnum, data_path)) rawfile = downloaddir / f"{data_path_stripped}" if extension: rawfile = rawfile.with_suffix(f"{extension}") last_modified = None file_time = None if rawfile.exists(): _logger.debug("File exists: %s", rawfile) try: resp = requests.head(data_uri) except requests.exceptions.ConnectionError: _clean_ingestdir_and_raise_error(ingestdir, uri) file_time = datetime.fromtimestamp( rawfile.stat().st_mtime, tz=timezone.utc ) last_modified = resp.headers.get("Last-Modified") if last_modified: last_modified = to_datetime(last_modified) else: _logger.debug( "HTTP/HTTPS response header does not have 'Last-Modified' field" ) last_modified = datetime.now(timezone.utc) - timedelta( minutes=5 ) else: _logger.debug("File not on disk: %s", rawfile) if not last_modified or last_modified > file_time: _logger.info("Downloading %s to %s", data_uri, rawfile) try: resp = requests.get(data_uri, stream=True) except requests.exceptions.ConnectionError: _clean_ingestdir_and_raise_error(ingestdir, uri) with rawfile.open("wb") as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) else: # We already have this file _logger.debug("Using cached file: %s", rawfile) try: bundle_in = _get_bundle(rawfile) except Exception: _clean_ingestdir_and_raise_error(ingestdir, uri) else: raise DataSourceManagerInternalError( f"interface {__package__} should not process scheme {scheme}" ) bundle_out = {} _logger.debug("Filtering: %s", rawfile) count = 0 matched = 0 for prop, val in bundle_in.items(): if prop == "objects": bundle_out[prop] = [] for obj in val: count += 1 if obj["type"] != "observed-data" or compiled_pattern.match( [obj], False ): matched += 1 if obj["type"] == "observed-data": num_records += 1 if not limit or num_records <= limit: bundle_out[prop].append(obj) else: bundle_out[prop].append(obj) else: bundle_out[prop] = val _logger.debug("Matched %d of %d observations: %s", matched, count, rawfile) ingestfile = ingestdir / f"{i}.json" with ingestfile.open("w") as f: json.dump(bundle_out, f) bundles.append(str(ingestfile.expanduser().resolve())) return ReturnFromFile(query_id, bundles)
def _get_bundle(rawfile): try: with open(rawfile, "r") as fp: bundle = json.load(fp) except (json.decoder.JSONDecodeError, UnicodeDecodeError): # It's not JSON. Maybe firepit can convert it to STIX? bundle = convert_to_stix(str(rawfile)) return bundle