################################################################
# Generic Errors
################################################################
[docs]class KestrelException(Exception):
"""Generic Kestrel Exception
Args:
error (str): error message.
suggestion (str): suggestion to fix the issue.
"""
def __init__(self, error, suggestion=""):
self.error = error[:-1] if error[-1] == "\n" else error
self.suggestion = (
suggestion
if (not suggestion or suggestion[-1] == ".")
else suggestion + "."
)
def __str__(self):
return f"[ERROR] {self.__class__.__name__}: {self.error}\n{self.suggestion}"
[docs]class KestrelInternalError(KestrelException):
def __init__(self, error):
super().__init__(error, "please open a github issue to report")
################################################################
# Kestrel Session Errors
################################################################
[docs]class InvalidConfiguration(KestrelException):
def __init__(self, error, suggestion):
super().__init__(error, suggestion)
################################################################
# Kestrel Syntax Errors
################################################################
[docs]class KestrelSyntaxError(KestrelException):
def __init__(self, line, column, invalid_term_type, invalid_term_value, expected):
self.line = line
self.column = column
self.invalid_term_type = invalid_term_type
self.invalid_term_value = invalid_term_value
self.expected = list(expected)
self._expects_str = (
f'expects "{self.expected[0]}"'
if len(self.expected) == 1
else f"expects one of {self.expected}"
)
super().__init__(
f'invalid {self.invalid_term_type} "{self.invalid_term_value}" at line {self.line} column {self.column}, {self._expects_str}',
"rewrite the failed statement",
)
[docs]class InvalidStixPattern(KestrelException):
def __init__(
self,
stix,
line=None,
column=None,
invalid_term_type=None,
invalid_term_value=None,
):
self.stix = stix
self.line = line
self.column = column
self.invalid_term_type = invalid_term_type
self.invalid_term_value = invalid_term_value
msg = f'invalid STIX pattern "{stix}"'
if self.invalid_term_value:
details = f': invalid {self.invalid_term_type} "{self.invalid_term_value}" at line {self.line} column {self.column}'
msg = msg + details
super().__init__(msg, "rewrite the STIX pattern")
[docs]class VariableNotExist(KestrelException):
def __init__(self, var_name):
super().__init__(
f'variable "{var_name}" does not exist', "check the variable used"
)
[docs]class UnsupportedRelation(KestrelException):
def __init__(self, entity_x, relation, entity_y):
super().__init__(
f'unsupported relation "{entity_x}--{relation}--{entity_y}"',
"check for supported relations and entity types in the documentation",
)
[docs]class UnsupportedStixSyntax(KestrelException):
def __init__(self, msg):
super().__init__(msg, "rewrite the STIX pattern")
################################################################
# Kestrel Code Generation Errors
################################################################
[docs]class InvalidAttribute(KestrelException):
def __init__(self, attribute):
super().__init__(
f'invalid attribute "{attribute}"',
"rewrite the command with a valid attribute",
)
[docs]class MissingEntityType(KestrelException):
def __init__(self):
super().__init__(
'input data does not have "type" column',
'add "type" column to data or specify entity type in the Kestrel command',
)
################################################################
# Data Source Errors
################################################################
[docs]class DataSourceConnectionError(KestrelException):
def __init__(self, uri):
super().__init__(
f"cannot establish connection to {uri}",
"check URI for typos; please test network connection",
)
[docs]class DataSourceManagerInternalError(KestrelInternalError):
def __init__(self, error):
super().__init__(error)
[docs]class InvalidDataSource(KestrelException):
def __init__(self, uri, itf, msg=""):
super().__init__(
f'invalid data source "{uri}" at interface "{itf}". {msg}',
"please check data source configuration",
)
[docs]class DataSourceError(KestrelException):
def __init__(self, error, suggestion=""):
if not suggestion:
suggestion = "please check data source config or test the query manually"
super().__init__(
error,
suggestion,
)
[docs]class DataSourceInterfaceNotFound(KestrelException):
def __init__(self, scheme):
super().__init__(
f'interface handling "{scheme}://" does not exist',
"(re)install the missing/broken data source interface package",
)
[docs]class InvalidDataSourceInterfaceImplementation(Exception):
def __init__(self, error):
super().__init__(error, "report to data source interface developer")
[docs]class ConflictingDataSourceInterfaceScheme(KestrelException):
def __init__(self, itf_a, itf_b, scheme):
super().__init__(
f'conflicting data source interface scheme "{scheme}" between "{itf_a.__module__}" and "{itf_b.__module__}"',
"uninstall one of the data source interfaces",
)
################################################################
# Analytics Errors
################################################################
[docs]class AnalyticsManagerInternalError(KestrelInternalError):
def __init__(self, error):
super().__init__(error)
[docs]class InvalidAnalytics(KestrelException):
def __init__(self, name, itf, msg=""):
super().__init__(
f'invalid analytics "{name}" at interface "{itf}". {msg}',
"please check analytics availability",
)
[docs]class AnalyticsError(KestrelException):
def __init__(self, error, suggestion=""):
suggestion = "report to analytics developer" if not suggestion else suggestion
super().__init__(error, suggestion)
[docs]class AnalyticsInterfaceNotFound(KestrelException):
def __init__(self, scheme):
super().__init__(
f'interface handling "{scheme}://" does not exist',
"(re)install the missing/broken data source interface package",
)
[docs]class InvalidAnalyticsInterfaceImplementation(Exception):
def __init__(self, error):
super().__init__(error, "report to analytics interface developer")
[docs]class ConflictingAnalyticsInterfaceScheme(KestrelException):
def __init__(self, itf_a, itf_b, scheme):
super().__init__(
f'conflicting analytics interface scheme "{scheme}" between "{itf_a.__module__}" and "{itf_b.__module__}"',
"uninstall one of the analytics interfaces",
)
[docs]class InvalidAnalyticsArgumentCount(KestrelException):
def __init__(self, analytics_name, num_received, num_expected):
super().__init__(
f'the analytics "{analytics_name}" takes {num_expected} Kestrel variables, not {num_received} as given in APPLY.'
)
[docs]class InvalidAnalyticsOutput(KestrelException):
def __init__(self, analytcs_name, return_type):
super().__init__(
f"unsupported return type {return_type} from analytics: {analytics_name}"
)