"""A Kestrel session provides an isolated stateful runtime space for a huntflow.
A huntflow is the source code or script of a cyber threat hunt, which can be
developed offline in a text editor or interactively as the hunt goes. A Kestrel
session provides the runtime space for a huntflow that allows execution and
inspection of hunt statements in the huntflow. The :class:`Session` class in this
module supports both non-interactive and interactive execution of huntflows as
well as comprehensive APIs besides execution.
.. highlight:: python
Examples:
A non-interactive execution of a huntflow::
from kestrel.session import Session
with Session() as session:
open(huntflow_file) as hff:
huntflow = hff.read()
session.execute(huntflow)
An interactive composition and execution of a huntflow::
from kestrel.session import Session
with Session() as session:
try:
hunt_statement = input(">>> ")
except EOFError:
print()
break
else:
output = session.execute(hunt_statement)
print(output)
Export Kestrel variable to Python::
from kestrel.session import Session
huntflow = ""\"newvar = GET process
FROM stixshifter://workstationX
WHERE [process:name = 'cmd.exe']""\"
with Session() as session:
session.execute(huntflow)
cmds = session.get_variable("newvar")
for process in cmds:
print(process["name"])
"""
import tempfile
import os
import getpass
import pathlib
import shutil
import uuid
import logging
import time
import math
import lark
import atexit
from contextlib import AbstractContextManager
from kestrel.exceptions import (
KestrelSyntaxError,
InvalidStixPattern,
DebugCacheLinkOccupied,
)
from kestrel.syntax.parser import parse_kestrel
from kestrel.semantics.processor import semantics_processing
from kestrel.semantics.completor import do_complete
from kestrel.codegen import commands
from kestrel.codegen.display import DisplayBlockSummary
from kestrel.codegen.summary import gen_variable_summary
from kestrel.symboltable.symtable import SymbolTable
from kestrel.utils import (
set_current_working_directory,
resolve_path_in_kestrel_env_var,
add_logging_handler,
)
from kestrel.config import load_config
from kestrel.datasource import DataSourceManager
from kestrel.analytics import AnalyticsManager
from firepit import get_storage
from firepit.exceptions import StixPatternError
_logger = logging.getLogger(__name__)
[docs]class Session(AbstractContextManager):
"""Kestrel Session class
A session object needs to be instantiated to create a Kestrel runtime space.
This is the foundation of multi-user dynamic composition and execution of
huntflows. A Kestrel session has two important properties:
- Stateful: a session keeps track of states/effects of statements that have
been previously executed in this session, e.g., the values of previous
established Kestrel variables. A session can invoke more than one
:meth:`execute`, and each :meth:`execute` can process a block of Kestrel code,
i.e., multiple Kestrel statements.
- Isolated: each session is established in an isolated space (memory and
file system):
- Memory isolation is accomplished by OS process and memory space
management automatically -- different Kestrel session instances will not
overlap in memory.
- File system isolation is accomplished with the setup and management of
a temporary runtime directory for each session.
Args:
runtime_dir (str): to be used for :attr:`runtime_directory`.
store_path (str): the file path or URL to initialize :attr:`store`.
debug_mode (bool): to be assign to :attr:`debug_mode`.
Attributes:
session_id (str): The Kestrel session ID, which will be created as a random
UUID if not given in the constructor.
runtime_directory (str): The runtime directory stores session related
data in the file system such as local cache of queried results,
session log, and may be the internal store. The session will use
a temporary directory derived from :attr:`session_id` if the path is
not specified in constructor parameters.
store (firepit.SqlStorage): The internal store used
by the session to normalize queried results, implement cache, and
realize the low level code generation. The store from the
``firepit`` package provides an operation abstraction
over the raw internal database: either a local store, e.g., SQLite,
or a remote one, e.g., PostgreSQL. If not specified from the
constructor parameter, the session will use the default SQLite
store in the :attr:`runtime_directory`.
debug_mode (bool): The debug flag set by the session constructor. If
True, a fixed debug link ``/tmp/kestrel`` of :attr:`runtime_directory`
will be created, and :attr:`runtime_directory` will not be removed by
the session when terminating.
runtime_directory_is_owned_by_upper_layer (bool): The flag to specify
who owns and manages :attr:`runtime_directory`. False by default,
where the Kestrel session will manage session file system isolation --
create and destory :attr:`runtime_directory`. If True, the runtime
directory is created, passed in to the session constructor, and will
be destroyed by the calling site.
symtable (dict): The continuously updated *symbol table* of the running
session, which is a dictionary mapping from Kestrel variable names
``str`` to their associated Kestrel internal data structure
``VarStruct``.
data_source_manager (kestrel.datasource.DataSourceManager): The
data source manager handles queries to all data source interfaces such as
local file stix bundle and stix-shifter. It also stores previous
queried data sources for the session, which is used for a syntax
sugar when there is no data source in a Kestrel ``GET`` statement -- the
last data source is implicitly used.
analytics_manager (kestrel.analytics.AnalyticsManager): The analytics
manager handles all analytics related operations such as executing an
analytics or getting the list of analytics for code auto-completion.
"""
def __init__(
self, session_id=None, runtime_dir=None, store_path=None, debug_mode=False
):
_logger.debug(
f"Establish session with session_id: {session_id}, runtime_dir: {runtime_dir}, store_path:{store_path}, debug_mode:{debug_mode}"
)
resolve_path_in_kestrel_env_var()
self.config = load_config()
if session_id:
self.session_id = session_id
else:
self.session_id = str(uuid.uuid4())
self.debug_mode = (
True
if debug_mode or os.getenv(self.config["debug"]["env_var"], False)
else False
)
# default value of runtime_directory ownership
self.runtime_directory_is_owned_by_upper_layer = False
# runtime (temporary) directory to store session-related data
sys_tmp_dir = pathlib.Path(tempfile.gettempdir())
if runtime_dir:
if os.path.exists(runtime_dir):
self.runtime_directory_is_owned_by_upper_layer = True
else:
pathlib.Path(runtime_dir).mkdir(parents=True, exist_ok=True)
self.runtime_directory = runtime_dir
else:
tmp_dir = sys_tmp_dir / (
self.config["session"]["cache_directory_prefix"]
+ str(os.getuid())
+ "-"
+ self.session_id
)
self.runtime_directory = tmp_dir.expanduser().resolve()
if tmp_dir.exists():
if tmp_dir.is_dir():
_logger.debug(
"Kestrel session with runtime_directory exists, reuse it."
)
else:
_logger.debug(
"strange tmp file that uses kestrel session dir name, remove it."
)
os.remove(self.runtime_directory)
else:
_logger.debug(
f"create new session runtime_directory: {self.runtime_directory}."
)
tmp_dir.mkdir(parents=True, exist_ok=True)
if self.debug_mode:
self._setup_runtime_directory_master()
self._logging_setup()
# local database of SQLite or PostgreSQL
if not store_path:
# use the default local database in config.py
local_database_path = self.config["session"]["local_database_path"]
if "://" in local_database_path:
store_path = local_database_path
else:
store_path = os.path.join(self.runtime_directory, local_database_path)
self.store = get_storage(store_path, self.session_id)
# Symbol Table
# linking variables in syntax with internal data structure
# handling fallback_var for the most recently accessed var
# {"var": VarStruct}
self.symtable = SymbolTable()
self.data_source_manager = DataSourceManager(self.config)
self.analytics_manager = AnalyticsManager(self.config)
atexit.register(self.close)
[docs] def execute(self, codeblock):
"""Execute a Kestrel code block.
A Kestrel statement or multiple consecutive statements constitute a code
block, which can be executed by this method. New Kestrel variables can be
created in a code block such as ``newvar = GET ...``. Two types of Kestrel
variables can be legally referred in a Kestrel statement in the code block:
* A Kestrel variable created in the same code block prior to the reference.
* A Kestrel variable created in code blocks previously executed by the
session. The session maintains the :attr:`symtable` to keep the state
of all previously executed Kestrel statements and their established Kestrel
variables.
Args:
codeblock (str): the code block to be executed.
Returns:
A list of outputs that each of them is the output for each
statement in the inputted code block.
"""
ast = self.parse(codeblock)
return self._execute_ast(ast)
[docs] def parse(self, codeblock):
"""Parse a Kestrel code block.
Parse one or multiple consecutive Kestrel statements (a Kestrel code block)
into the abstract syntax tree. This could be useful for frontends that
need to parse a statement *without* executing it in order to render
some type of interface.
Args:
codeblock (str): the code block to be parsed.
Returns:
A list of dictionaries that each of them is an *abstract syntax
tree* for one Kestrel statement in the inputted code block.
"""
try:
ast = parse_kestrel(
codeblock,
self.config["language"]["default_variable"],
self.config["language"]["default_sort_order"],
)
except lark.UnexpectedEOF as err:
# use `err.expected` as used in lark.exceptions.UnexpectedEOF
raise KestrelSyntaxError(
err.line, err.column, "end of line", "", err.expected
)
except lark.UnexpectedCharacters as err:
# use `err.allowed` as used in lark.exceptions.UnexpectedCharacters
raise KestrelSyntaxError(
err.line, err.column, "character", err.char, err.allowed
)
except lark.UnexpectedToken as err:
# use `err.accepts or err.expected` as used in lark.exceptions.UnexpectedToken
raise KestrelSyntaxError(
err.line, err.column, "token", err.token, err.accepts or err.expected
)
return ast
[docs] def get_variable_names(self):
"""Get the list of Kestrel variable names created in this session."""
return list(self.symtable.keys())
[docs] def get_variable(self, var_name, deref=True):
"""Get the data of Kestrel variable ``var_name``, which is list of homogeneous entities (STIX SCOs)."""
# In the future, consider returning a generator here?
return self.symtable[var_name].get_entities(deref)
[docs] def create_variable(self, var_name, objects, object_type=None):
"""Create a new Kestrel variable ``var_name`` with data in ``objects``.
This is the API equivalent to Kestrel command ``NEW``, while allowing more
flexible objects types (Python objects) than the objects serialized
into text/JSON in the command ``NEW``.
Args:
var_name (str): The Kestrel variable to be created.
objects (list): List of Python objects, currently support either a
list of ``str`` or a list of ``dict``.
object_type (str): The Kestrel entity type for the created Kestrel
variable. It overrides the ``type`` field in ``objects``. If
there is no ``type`` field in ``objects``, e.g., ``objects`` is a
list of ``str``, this parameter is required.
"""
virtual_stmt_ast = [
{"command": "new", "output": var_name, "data": objects, "type": object_type}
]
self._execute_ast(virtual_stmt_ast)
[docs] def do_complete(self, code, cursor_pos):
"""Kestrel code auto-completion.
Args:
code (str): Kestrel code.
cursor_pos (int): the position to start completion (index in ``code``).
Returns:
A list of suggested strings to complete the code.
"""
return do_complete(
code,
cursor_pos,
self.data_source_manager,
self.analytics_manager,
self.symtable,
)
[docs] def close(self):
"""Explicitly close the session.
This may be executed by a context manager or when the program exits.
"""
# Note there are two conditions that trigger this function, so it is probably executed twice
# Be careful to write the logic in this function to avoid deleting nonexist files/dirs
# remove logging handler
if self.logging_handler:
logging.getLogger().removeHandler(self.logging_handler)
self.logging_handler.close()
# ensure this does not executed twice
self.logging_handler = None
if self.original_logging_level:
logging.getLogger().setLevel(self.original_logging_level)
# close store/database
if self.store:
# release resources
self.store.close()
# ensure this does not executed twice
self.store = None
# manage temp folder for debug
# ensure this does not execute if already executed
if os.path.exists(self.runtime_directory):
if not self.runtime_directory_is_owned_by_upper_layer:
if self.debug_mode:
self._leave_exit_marker()
self._remove_obsolete_debug_folders()
else:
shutil.rmtree(self.runtime_directory)
def __exit__(self, exception_type, exception_value, traceback):
self.close()
def _execute_ast(self, ast):
displays = []
new_vars = []
start_exec_ts = time.time()
for stmt in ast:
try:
# semantic checking and unfolding
semantics_processing(
stmt,
self.symtable,
self.store,
self.data_source_manager,
self.config,
)
# code generation and execution
execute_cmd = getattr(commands, stmt["command"])
# set current working directory for each command execution
# use this to implicitly pass runtime_dir as an argument to each command
# the context manager switch back cwd when the command execution completes
with set_current_working_directory(self.runtime_directory):
output_var_struct, display = execute_cmd(stmt, self)
# exception completion
except StixPatternError as e:
raise InvalidStixPattern(e.stix) from e
# post-processing: symbol table update
if output_var_struct is not None:
output_var_name = stmt["output"]
self._update_symbol_table(output_var_name, output_var_struct)
if output_var_name != self.config["language"]["default_variable"]:
if output_var_name in new_vars:
new_vars.remove(output_var_name)
new_vars.append(output_var_name)
if display is not None:
displays.append(display)
end_exec_ts = time.time()
execution_time_sec = math.ceil(end_exec_ts - start_exec_ts)
if self.config["session"]["show_execution_summary"] and new_vars:
vars_summary = [
gen_variable_summary(vname, self.symtable[vname]) for vname in new_vars
]
displays.append(DisplayBlockSummary(vars_summary, execution_time_sec))
return displays
def _update_symbol_table(self, output_var_name, output_var_struct):
self.symtable[output_var_name] = output_var_struct
self.symtable[self.config["language"]["default_variable"]] = output_var_struct
def _leave_exit_marker(self):
runtime_dir = pathlib.Path(self.runtime_directory)
exit_marker = runtime_dir / self.config["debug"]["session_exit_marker"]
exit_marker.touch()
def _remove_obsolete_debug_folders(self):
# will only clean debug cache directories under system temp directory
# [(cache_dir, timestamp)]
exited_sessions = []
for x in pathlib.Path(tempfile.gettempdir()).iterdir():
if x.is_dir() and x.parts[-1].startswith(
self.config["session"]["cache_directory_prefix"]
+ str(os.getuid())
+ "-"
):
marker = x / self.config["debug"]["session_exit_marker"]
if marker.exists():
exited_sessions.append((x, marker.stat().st_mtime))
# preserve the newest self.config["debug"]["maximum_exited_session"] debug sessions
exited_sessions.sort(key=lambda x: x[1])
for x, _ in exited_sessions[: -self.config["debug"]["maximum_exited_session"]]:
shutil.rmtree(x)
def _get_runtime_directory_master(self):
sys_tmp_dir = pathlib.Path(tempfile.gettempdir())
user_suffix = None
for f in [getpass.getuser, os.getuid]:
if not user_suffix:
try:
user_suffix = f()
except:
pass
if not user_suffix:
user_suffix = "noUID"
return sys_tmp_dir / (
self.config["debug"]["cache_directory_prefix"] + user_suffix
)
def _setup_runtime_directory_master(self):
master_dir = self._get_runtime_directory_master()
# master_dir.exists() should not be used
# it will return False for broken link
try:
master_dir.unlink()
except FileNotFoundError:
pass
except PermissionError:
raise DebugCacheLinkOccupied(master_dir.resolve())
master_dir.symlink_to(self.runtime_directory)
def _logging_setup(self):
log_file_name = self.config["session"]["log_path"]
log_file_path = os.path.join(self.runtime_directory, log_file_name)
handler = logging.FileHandler(log_file_path)
self.logging_handler, self.original_logging_level = add_logging_handler(
handler, self.debug_mode
)