Source code for kestrel.exceptions

################################################################
#                     Generic Errors
################################################################


[docs]class KestrelException(Exception): """Generic Kestrel Exception Args: error (str): error message. suggestion (str): suggestion to fix the issue. """ def __init__(self, error, suggestion=""): self.error = error[:-1] if error[-1] == "\n" else error self.suggestion = ( suggestion if (not suggestion or suggestion[-1] == ".") else suggestion + "." ) def __str__(self): return f"[ERROR] {self.__class__.__name__}: {self.error}\n{self.suggestion}"
[docs]class KestrelInternalError(KestrelException): def __init__(self, error): super().__init__(error, "please open a github issue to report")
################################################################ # Kestrel Session Errors ################################################################
[docs]class InvalidConfiguration(KestrelException): def __init__(self, error, suggestion): super().__init__(error, suggestion)
################################################################ # Kestrel Syntax Errors ################################################################
[docs]class KestrelSyntaxError(KestrelException): def __init__(self, line, column, invalid_term_type, invalid_term_value, expected): self.line = line self.column = column self.invalid_term_type = invalid_term_type self.invalid_term_value = invalid_term_value self.expected = list(expected) self._expects_str = ( f'expects "{self.expected[0]}"' if len(self.expected) == 1 else f"expects one of {self.expected}" ) super().__init__( f'invalid {self.invalid_term_type} "{self.invalid_term_value}" at line {self.line} column {self.column}, {self._expects_str}', "rewrite the failed statement", )
[docs]class InvalidStixPattern(KestrelException): def __init__( self, stix, line=None, column=None, invalid_term_type=None, invalid_term_value=None, ): self.stix = stix self.line = line self.column = column self.invalid_term_type = invalid_term_type self.invalid_term_value = invalid_term_value msg = f'invalid STIX pattern "{stix}"' if self.invalid_term_value: details = f': invalid {self.invalid_term_type} "{self.invalid_term_value}" at line {self.line} column {self.column}' msg = msg + details super().__init__(msg, "rewrite the STIX pattern")
[docs]class VariableNotExist(KestrelException): def __init__(self, var_name): super().__init__( f'variable "{var_name}" does not exist', "check the variable used" )
[docs]class UnsupportedRelation(KestrelException): def __init__(self, entity_x, relation, entity_y): super().__init__( f'unsupported relation "{entity_x}--{relation}--{entity_y}"', "check for supported relations and entity types in the documentation", )
[docs]class UnsupportedStixSyntax(KestrelException): def __init__(self, msg): super().__init__(msg, "rewrite the STIX pattern")
################################################################ # Kestrel Code Generation Errors ################################################################
[docs]class EmptyInputVariable(KestrelException): def __init__(self, var_name): super().__init__( f'empty input variable "{var_name}"', "rewrite the command to use a non-empty variable as input", )
[docs]class InvalidAttribute(KestrelException): def __init__(self, attribute): super().__init__( f'invalid attribute "{attribute}"', "rewrite the command with a valid attribute", )
[docs]class NonUniformEntityType(KestrelException): def __init__(self, etypes): super().__init__( f"there are more than one entity types in input data: {etypes}", "provide homogeneous entities to construct a Kestrel variable", )
[docs]class MissingEntityType(KestrelException): def __init__(self): super().__init__( 'input data does not have "type" column', 'add "type" column to data or specify entity type in the Kestrel command', )
################################################################ # Data Source Errors ################################################################
[docs]class DataSourceConnectionError(KestrelException): def __init__(self, uri): super().__init__( f"cannot establish connection to {uri}", "check URI for typos; please test network connection", )
[docs]class DataSourceManagerInternalError(KestrelInternalError): def __init__(self, error): super().__init__(error)
[docs]class InvalidDataSource(KestrelException): def __init__(self, uri, itf, msg=""): super().__init__( f'invalid data source "{uri}" at interface "{itf}". {msg}', "please check data source configuration", )
[docs]class DataSourceError(KestrelException): def __init__(self, error, suggestion=""): if not suggestion: suggestion = "please check data source config or test the query manually" super().__init__( error, suggestion, )
[docs]class DataSourceInterfaceNotFound(KestrelException): def __init__(self, scheme): super().__init__( f'interface handling "{scheme}://" does not exist', "(re)install the missing/broken data source interface package", )
[docs]class InvalidDataSourceInterfaceImplementation(Exception): def __init__(self, error): super().__init__(error, "report to data source interface developer")
[docs]class ConflictingDataSourceInterfaceScheme(KestrelException): def __init__(self, itf_a, itf_b, scheme): super().__init__( f'conflicting data source interface scheme "{scheme}" between "{itf_a.__module__}" and "{itf_b.__module__}"', "uninstall one of the data source interfaces", )
################################################################ # Analytics Errors ################################################################
[docs]class AnalyticsManagerInternalError(KestrelInternalError): def __init__(self, error): super().__init__(error)
[docs]class InvalidAnalytics(KestrelException): def __init__(self, name, itf, msg=""): super().__init__( f'invalid analytics "{name}" at interface "{itf}". {msg}', "please check analytics availability", )
[docs]class AnalyticsError(KestrelException): def __init__(self, error, suggestion=""): suggestion = "report to analytics developer" if not suggestion else suggestion super().__init__(error, suggestion)
[docs]class AnalyticsInterfaceNotFound(KestrelException): def __init__(self, scheme): super().__init__( f'interface handling "{scheme}://" does not exist', "(re)install the missing/broken data source interface package", )
[docs]class InvalidAnalyticsInterfaceImplementation(Exception): def __init__(self, error): super().__init__(error, "report to analytics interface developer")
[docs]class ConflictingAnalyticsInterfaceScheme(KestrelException): def __init__(self, itf_a, itf_b, scheme): super().__init__( f'conflicting analytics interface scheme "{scheme}" between "{itf_a.__module__}" and "{itf_b.__module__}"', "uninstall one of the analytics interfaces", )
[docs]class InvalidAnalyticsArgumentCount(KestrelException): def __init__(self, analytics_name, num_received, num_expected): super().__init__( f'the analytics "{analytics_name}" takes {num_expected} Kestrel variables, not {num_received} as given in APPLY.' )
[docs]class InvalidAnalyticsInput(KestrelException): def __init__(self, type_received, types_expected): typelist = ", ".join([f'"{t}"' for t in types_expected]) super().__init__( f'received unsupported type "{type_received}"; expected one of {typelist}' )
[docs]class InvalidAnalyticsOutput(KestrelException): def __init__(self, analytcs_name, return_type): super().__init__( f"unsupported return type {return_type} from analytics: {analytics_name}" )