Source code for pepys_import.resolvers.command_line_resolver

import sys
import uuid

from prompt_toolkit import prompt
from prompt_toolkit.validation import ValidationError, Validator
from tabulate import tabulate

from pepys_import.core.store.constants import NATIONALITY, PLATFORM, PRIVACY
from pepys_import.resolvers import constants
from pepys_import.resolvers.command_line_input import create_menu, get_fuzzy_completer, is_valid
from pepys_import.resolvers.data_resolver import DataResolver
from pepys_import.utils.sqlalchemy_utils import get_lowest_privacy
from pepys_import.utils.text_formatting_utils import (
    custom_print_formatted_text,
    format_command,
    format_error_message,
    print_help_text,
    print_new_section_title,
)


[docs]def is_number(text): return text.isdigit()
numeric_validator = Validator.from_callable( is_number, error_message="This input contains non-numeric characters", move_cursor_to_end=True )
[docs]class CommandLineResolver(DataResolver): def __init__(self): self.store_all_platforms_as_unknown = False
[docs] def resolve_datafile(self, data_store, datafile_name, datafile_type, privacy, change_id): """ This method resolves datafile type and privacy. It asks user whether to create a datafile with resolved values or not. If user enters Yes, it returns all necessary data to create a datafile. If user enters No, it resolves values again :param data_store: A :class:`DataStore` object :type data_store: DataStore :param datafile_name: Name of :class`Datafile` :type datafile_name: String :param datafile_type: Type of :class`Datafile` :type datafile_type: String :param privacy: Name of :class:`Privacy` :type privacy: String :param change_id: ID of the :class:`Change` object :type change_id: UUID :return: """ if datafile_name is None: raise ValueError("You must specify a datafile name when calling resolve_datafile") print_new_section_title("Resolve Datafile") print(f"Ok, adding new datafile {datafile_name}.") # Choose Datafile Type if datafile_type: chosen_datafile_type = data_store.add_to_datafile_types(datafile_type, change_id) else: chosen_datafile_type = self.resolve_reference( data_store, change_id, data_type="Datafile", db_class=data_store.db_classes.DatafileType, field_name="datafile_type", help_id=constants.RESOLVE_DATAFILE_TYPE, search_help_id=constants.FUZZY_SEARCH_DATAFILE_TYPE, ) if chosen_datafile_type is None: print("Quitting") sys.exit(1) # Choose Privacy if privacy: chosen_privacy = data_store.search_privacy(privacy) if chosen_privacy is None: level = prompt( format_command(f"Please type level of new classification ({privacy}): "), validator=numeric_validator, ) chosen_privacy = data_store.add_to_privacies(privacy, level, change_id) else: chosen_privacy = self.resolve_reference( data_store, change_id, data_type=f"Datafile named '{datafile_name}'", db_class=data_store.db_classes.Privacy, field_name="privacy", text_name="classification", help_id=constants.RESOLVE_PRIVACY, search_help_id=constants.FUZZY_SEARCH_PRIVACY, ) if chosen_privacy is None: print("Quitting") sys.exit(1) while True: print("-" * 60) print("Input complete. About to create this datafile:") print(f"Name: {datafile_name}") print(f"Type: {chosen_datafile_type.name}") print(f"Classification: {chosen_privacy.name}") choice = create_menu( "Create this datafile?: ", ["Yes", "No, make further edits"], validate_method=is_valid, ) if choice in ["?", "HELP"]: print_help_text(data_store, constants.RESOLVE_DATAFILE) else: break if choice == str(1): return datafile_name, chosen_datafile_type, chosen_privacy elif choice == str(2): return self.resolve_datafile(data_store, datafile_name, None, None, change_id) elif choice == ".": print("Quitting") sys.exit(1)
[docs] def resolve_platform( self, data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, quadgraph=None, ): if self.store_all_platforms_as_unknown: return self.add_unknown_platform(data_store, platform_name, change_id) print_new_section_title("Resolve Platform") platform_details = [] final_options = [ "Add a new platform", "Search for existing platform", "Store as Unknown platform", "Store remaining platforms in this datafile as Unknown", ] if platform_name: # If we've got a platform_name, then we can search for all platforms # with this name, and present a list to the user to choose from, # alongside the options to search for an existing platform and add # a new platform # The order-by clause is important, to get the same ordering of # options on different platforms/db backends, so that our tests work platforms = ( data_store.session.query(data_store.db_classes.Platform) .join(data_store.db_classes.Nationality) .filter(data_store.db_classes.Platform.name == platform_name) .order_by( data_store.db_classes.Platform.identifier.asc(), data_store.db_classes.Nationality.priority.asc(), data_store.db_classes.Nationality.name.asc(), ) .all() ) for platform in platforms: platform_details.append( f"Select: {platform.name} / {platform.identifier} / {platform.nationality_name}" ) choices = final_options + platform_details def is_valid_dynamic(option): # pragma: no cover return option in [str(i) for i in range(1, len(choices) + 1)] + [".", "?", "HELP"] choice = create_menu( f"Select a platform entry for {platform_name}:", choices, validate_method=is_valid_dynamic, ) if choice == ".": print("Quitting") sys.exit(1) elif choice in ["?", "HELP"]: print_help_text(data_store, constants.RESOLVE_PLATFORM) return self.resolve_platform( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif choice == str(1): return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif choice == str(2): return self.fuzzy_search_platform( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif choice == str(3): return self.add_unknown_platform(data_store, platform_name, change_id) elif choice == str(4): self.store_all_platforms_as_unknown = True return self.add_unknown_platform(data_store, platform_name, change_id) elif 5 <= int(choice) <= len(choices): # One of the pre-existing platforms was chosen platform_index = int(choice) - 5 return platforms[platform_index]
[docs] def add_unknown_platform(self, data_store, platform_name, change_id): if platform_name is None: platform_name = str(uuid.uuid4()) identifier = platform_name trigraph = platform_name[:3] quadgraph = platform_name[:4] chosen_platform_type = data_store.add_to_platform_types( "Unknown", change_id, default_data_interval_secs=60 ) chosen_nationality = data_store.add_to_nationalities("Unknown", change_id) chosen_privacy = data_store.search_privacy(get_lowest_privacy(data_store)) return ( platform_name, trigraph, quadgraph, identifier, chosen_platform_type, chosen_nationality, chosen_privacy, )
[docs] def reset_per_file_settings(self): self.store_all_platforms_as_unknown = False
[docs] def resolve_sensor(self, data_store, sensor_name, sensor_type, host_id, privacy, change_id): print_new_section_title("Resolve Sensor") Platform = data_store.db_classes.Platform host_platform = ( data_store.session.query(Platform).filter(Platform.platform_id == host_id).first() ) if not host_platform: custom_print_formatted_text(format_error_message("Invalid platform id specified")) print("Quitting") sys.exit(1) objects = ( data_store.session.query(data_store.db_classes.Sensor) .filter(data_store.db_classes.Sensor.host == host_id) .all() ) if len(objects) == 0: # No sensors for this platform, so no point asking to search options = ["Add a new sensor"] search_option = 0 # Invalid option number, as search not available in this situation add_option = 1 other_options_base = 2 else: add_option = 1 search_option = 2 other_options_base = 3 options = [ "Add a new sensor", f"Search for existing sensor on platform '{host_platform.name}'", ] objects_dict = {f"Select: {obj.name}": obj for obj in objects} if len(objects_dict) <= 7: options.extend(objects_dict) if sensor_name: prompt = f"Sensor '{sensor_name}' on platform '{host_platform.name}' not found. Do you wish to: " else: prompt = f"Sensor on platform '{host_platform.name}' not found. Do you wish to: " def is_valid_dynamic(option): # pragma: no cover return option in [str(i) for i in range(1, len(options) + 1)] + [".", "?", "HELP"] choice = create_menu(prompt, options, validate_method=is_valid_dynamic) if choice == ".": print("Quitting") sys.exit(1) elif choice in ["?", "HELP"]: print_help_text(data_store, constants.RESOLVE_SENSOR) return self.resolve_sensor( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) elif choice == str(search_option): return self.fuzzy_search_sensor( data_store, sensor_name, sensor_type, host_platform.platform_id, privacy, change_id ) elif choice == str(add_option): return self.add_to_sensors( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) elif other_options_base <= int(choice) <= len(options): selected_object = objects_dict[options[int(choice) - 1]] if selected_object: # add sensor name and the selected sensor to sensor cache data_store._sensor_cache[(sensor_name, host_platform.platform_id)] = selected_object return selected_object
[docs] def resolve_reference( self, data_store, change_id, data_type, db_class, field_name, help_id, search_help_id, text_name=None, ): """ This method resolves any reference data according to the given parameters. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param change_id: ID of the :class:`Change` object :type change_id: UUID :param data_type: For which data type the reference is resolved(Platform, Sensor or Datafile) :type data_type: String :param db_class: Class of a Reference Table :type db_class: SQLAlchemy Declarative Base Class :param field_name: Name of the resolved data :type field_name: String :param text_name: Printed name of the resolved data :type text_name: String :param help_id: Integer ID of the help text for resolve reference :type help_id: Integer :param search_help_id: Integer ID of the help text for fuzzy search reference :type search_help_id: Integer :return: """ if text_name is None: text_name = field_name.replace("_", "-") options = [f"Search an existing {text_name}"] title = f"Ok, please provide {text_name} for new {data_type}: " current_values = "" if db_class.__tablename__ == NATIONALITY: objects = ( data_store.session.query(db_class) .filter(db_class.priority.in_([1, 2])) .order_by(db_class.priority, db_class.name) .all() ) elif db_class.__tablename__ == PRIVACY: all_values = data_store.session.query(db_class).order_by(db_class.level).all() objects = all_values[:8] current_values = "\nCurrent Privacies in the Database\n" headers = ["name", "level"] current_values += tabulate( [[str(getattr(row, column)) for column in headers] for row in all_values], headers=headers, tablefmt="grid", floatfmt=".3f", ) current_values += "\n" else: objects = data_store.session.query(db_class).limit(8).all() objects_dict = {f"Select: {obj.name}": obj for obj in objects} options.extend(objects_dict) def is_valid_dynamic(option): # pragma: no cover return option in [str(i) for i in range(1, len(options) + 1)] + [".", "?", "HELP"] choice = create_menu( title, options, validate_method=is_valid_dynamic, cancel=f"import (Please contact an expert user if you need a new {text_name} to be added)", ) if choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return None elif choice in ["?", "HELP"]: print_help_text(data_store, help_id) return self.resolve_reference( data_store, change_id, data_type, db_class, field_name, help_id, search_help_id, text_name, ) elif choice == str(1): result = self.fuzzy_search_reference( data_store=data_store, change_id=change_id, data_type=data_type, db_class=db_class, field_name=field_name, help_id=help_id, search_help_id=search_help_id, text_name=text_name, ) if result is None: return self.resolve_reference( data_store, change_id, data_type, db_class, field_name, help_id, search_help_id, text_name, ) else: return result elif 2 <= int(choice) <= len(options): selected_object = objects_dict[options[int(choice) - 1]] if selected_object: return selected_object
[docs] def resolve_missing_info( self, question, default_value, min_value=None, max_value=None, allow_empty=False ): question_to_ask = question if allow_empty is True: question_to_ask += f" (Default: {default_value})" if isinstance(default_value, int): # Apply some validation if int expected info = prompt( format_command(question_to_ask), validator=MinMaxValidator(min_value, max_value, allow_empty), ) else: # Assume caller to validate if not number info = prompt(format_command(question_to_ask)) if info is None or info == "": print(f"Using default value: {default_value}") info = default_value return info
[docs] def fuzzy_search_reference( self, data_store, change_id, data_type, db_class, field_name, help_id, search_help_id, text_name=None, ): """ This method parses any reference data according to the given parameters, and uses fuzzy search when user is typing. If user enters a new value, it adds to the related reference table or searches for an existing entity again. If user selects an existing value, it returns the selected entity. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param change_id: ID of the :class:`Change` object :type change_id: UUID :param data_type: For which data type the reference is resolved(Platform, Sensor or Datafile) :type data_type: String :param db_class: Class of a Reference Table :type db_class: SQLAlchemy Declarative Base Class :param field_name: Name of the resolved data :type field_name: String :param text_name: Printed name of the resolved data :type text_name: String :param help_id: Integer ID of the help text for resolve reference :type help_id: Integer :param search_help_id: Integer ID of the help text for fuzzy search reference :type search_help_id: Integer :return: """ objects = data_store.session.query(db_class).all() completer = [p.name for p in objects] def is_valid_reference(option): # pragma: no cover return option in completer + ["."] choice = create_menu( "Please start typing to show suggested values", cancel=f"{text_name} search", choices=[], completer=get_fuzzy_completer(completer), validate_method=is_valid_reference, ) if choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return None elif choice in ["?", "HELP"]: print_help_text(data_store, search_help_id) return self.fuzzy_search_reference( data_store=data_store, change_id=change_id, data_type=data_type, db_class=db_class, field_name=field_name, help_id=help_id, search_help_id=search_help_id, text_name=text_name, ) else: return data_store.session.query(db_class).filter(db_class.name == choice).first()
# Helper methods
[docs] def fuzzy_search_platform( self, data_store, platform_name, identifier, platform_type, nationality, privacy, change_id ): """ This method parses all platforms in the DB, and uses fuzzy search when user is typing. If user enters a new value, it adds to Synonym or Platforms according to user's choice. If user selects an existing value, it returns the selected Platform entity. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param platform_name: Name of :class:`Platform` :type platform_name: String :param nationality: Name of :class:`Nationality` :type nationality: Nationality :param platform_type: Name of :class:`PlatformType` :type platform_type: PlatformType :param privacy: Name of :class:`Privacy` :type privacy: Privacy :param change_id: ID of the :class:`Change` object :type change_id: UUID :return: """ completer = list() platforms = data_store.session.query(data_store.db_classes.Platform).all() for platform in platforms: values = list() values.append(platform.name) if platform.trigraph: values.append(platform.trigraph) if platform.quadgraph: values.append(platform.quadgraph) synonym = ( data_store.session.query(data_store.db_classes.Synonym) .filter( data_store.db_classes.Synonym.table == PLATFORM, data_store.db_classes.Synonym.entity == platform.platform_id, ) .all() ) if synonym: synonym_names = [s.synonym for s in synonym] values.append(",".join(synonym_names)) values.append(platform.identifier) values.append(platform.nationality_name) line = " / ".join(values) completer.append(line) choice = create_menu( "Please start typing to show suggested values", cancel="platform search", choices=[], completer=get_fuzzy_completer(completer), ) if choice in ["?", "HELP"]: print_help_text(data_store, constants.FUZZY_SEARCH_PLATFORM) return self.fuzzy_search_platform( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif choice in completer: # Extract the platform details from the string platform_details = choice.split(" / ") name, identifier, nationality = ( platform_details[0], platform_details[-2], platform_details[-1], ) # Get the platform from the database platform = ( data_store.session.query(data_store.db_classes.Platform) .filter(data_store.db_classes.Platform.name == name) .filter(data_store.db_classes.Platform.identifier == identifier) .filter(data_store.db_classes.Platform.nationality_name == nationality) .first() ) # If we've been given a platform name, then we might want to link # that platform name to the one we've picked, as a synonym if platform_name: while True: new_choice = create_menu( f"Do you wish to keep {platform_name} as synonym for {choice}?\n", ["Yes", "No"], validate_method=is_valid, ) if new_choice in ["?", "HELP"]: print_help_text(data_store, constants.KEEP_PLATFORM_AS_SYNONYM) else: break if new_choice == str(1): # Add it to synonyms and return existing platform data_store.add_to_synonyms( PLATFORM, platform_name, platform.platform_id, change_id ) print(f"'{platform_name}' added to Synonyms!") return platform elif new_choice == str(2): return platform elif new_choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return self.fuzzy_search_platform( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) return platform elif choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return self.resolve_platform( data_store, platform_name, identifier, nationality, platform_type, privacy, change_id, ) elif choice not in completer: custom_print_formatted_text( format_error_message( f"'{choice}' could not be found! Redirecting to adding a new platform.." ) ) return self.add_to_platforms( data_store, choice, identifier, platform_type, nationality, privacy, change_id )
[docs] def fuzzy_search_sensor( self, data_store, sensor_name, sensor_type, host_id, privacy, change_id ): """ This method parses all sensors in the DB, and uses fuzzy search when user is typing. If user enters a new value, it adds to Sensor table or searches for an existing sensor again. If user selects an existing value, it returns the selected Sensor entity. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param sensor_name: Name of :class:`Sensor` :type sensor_name: Sensor :param sensor_type: Type of :class:`Sensor` :type sensor_type: SensorType :param privacy: Name of :class:`Privacy` :type privacy: Privacy :param change_id: ID of the :class:`Change` object :type change_id: UUID :return: """ sensors = ( data_store.session.query(data_store.db_classes.Sensor) .filter(data_store.db_classes.Sensor.host == host_id) .all() ) completer = [sensor.name for sensor in sensors] choice = create_menu( "Please start typing to show suggested values", cancel="sensor search", choices=[], completer=get_fuzzy_completer(completer), ) if choice in ["?", "HELP"]: print_help_text(data_store, constants.FUZZY_SEARCH_SENSOR) return self.fuzzy_search_sensor( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) elif choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return self.resolve_sensor( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) elif choice not in completer: custom_print_formatted_text( format_error_message( f"'{choice}' could not be found! Redirecting to adding a new sensor.." ) ) return self.add_to_sensors( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) else: return ( data_store.session.query(data_store.db_classes.Sensor) .filter(data_store.db_classes.Sensor.name == choice) .first() )
[docs] def add_to_platforms( self, data_store, platform_name, identifier, platform_type, nationality, privacy, change_id ): """ This method resolves platform type, nationality and privacy. It asks user whether to create a platform with resolved values or not. If user enters Yes, it returns all necessary data to create a platform. If user enters No, it resolves values again. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param platform_name: Name of :class:`Platform` :type platform_name: String :param nationality: Name of :class:`Nationality` :type nationality: Nationality :param platform_type: Name of :class:`PlatformType` :type platform_type: PlatformType :param privacy: Name of :class:`Privacy` :type privacy: Privacy :param change_id: ID of the :class:`Change` object :type change_id: UUID :return: """ print("Ok, adding new platform.") if platform_name is None: platform_name = "" platform_name = prompt( format_command("Please enter a name: "), default=platform_name ).strip() if len(platform_name) > 150: custom_print_formatted_text( format_error_message( "Platform name too long, maximum length 150 characters. Restarting platform data entry." ) ) return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) while True: if identifier is None: identifier = "" identifier = prompt( format_command("Please enter identifier (pennant or tail number): "), default=identifier, ).strip() if len(identifier) > 30: custom_print_formatted_text( format_error_message( "Identifier too long, maximum length 10 characters. Restarting platform data entry." ) ) return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif identifier in ["?", "HELP"]: print_help_text(data_store, constants.PLATFORM_IDENTIFIER) else: break while True: trigraph = prompt( format_command("Please enter trigraph (optional): "), default=platform_name[:3] ).strip() if len(trigraph) > 3: custom_print_formatted_text( format_error_message( "Trigraph too long, maximum length 3 characters. Restarting platform data entry." ) ) return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif trigraph in ["?", "HELP"]: print_help_text(data_store, constants.PLATFORM_TRIGRAPH) else: break while True: quadgraph = prompt( format_command("Please enter quadgraph (optional): "), default=platform_name[:4] ).strip() if len(quadgraph) > 4: custom_print_formatted_text( format_error_message( "Quadgraph too long, maximum length 4 characters. Restarting platform data entry." ) ) return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) elif quadgraph in ["?", "HELP"]: print_help_text(data_store, constants.PLATFORM_QUADGRAPH) else: break if platform_name == "" or identifier == "": custom_print_formatted_text( format_error_message( "You must provide a platform name and identifier! Restarting platform data entry." ) ) return self.add_to_platforms( data_store, platform_name, identifier, platform_type, nationality, privacy, change_id, ) # Choose Nationality if nationality: chosen_nationality = data_store.add_to_nationalities(nationality, change_id) else: chosen_nationality = self.resolve_reference( data_store, change_id, "Platform", data_store.db_classes.Nationality, field_name="nationality", help_id=constants.RESOLVE_NATIONALITY, search_help_id=constants.FUZZY_SEARCH_NATIONALITY, ) if chosen_nationality is None: custom_print_formatted_text( format_error_message( "Nationality couldn't resolved. Returning to the previous menu!" ) ) return self.resolve_platform( data_store, platform_name, None, None, None, None, change_id ) # Choose Platform Type if platform_type: chosen_platform_type = data_store.add_to_platform_types(platform_type, change_id) else: chosen_platform_type = self.resolve_reference( data_store, change_id, data_type="Platform", db_class=data_store.db_classes.PlatformType, field_name="platform_type", help_id=constants.RESOLVE_PLATFORM_TYPE, search_help_id=constants.FUZZY_SEARCH_PLATFORM_TYPE, ) if chosen_platform_type is None: custom_print_formatted_text( format_error_message( "Platform Type couldn't resolved. Returning to the previous menu!" ) ) return self.resolve_platform( data_store, platform_name, None, None, None, None, change_id ) # Choose Privacy if privacy: chosen_privacy = data_store.search_privacy(privacy) if chosen_privacy is None: level = prompt( format_command(f"Please type level of new classification ({privacy}): "), validator=numeric_validator, ) chosen_privacy = data_store.add_to_privacies(privacy, level, change_id) else: chosen_privacy = self.resolve_reference( data_store, change_id, data_type="Platform", text_name="classification", db_class=data_store.db_classes.Privacy, field_name="privacy", help_id=constants.RESOLVE_PRIVACY, search_help_id=constants.FUZZY_SEARCH_PRIVACY, ) if chosen_privacy is None: custom_print_formatted_text( format_error_message( "Classification couldn't resolved. Returning to the previous menu!" ) ) return self.resolve_platform( data_store, platform_name, None, None, None, None, change_id ) def is_valid_choice(option): # pragma: no cover return option in [str(1), str(2), str(3), ".", "?", "HELP"] while True: print("-" * 60) print("Input complete. About to create this platform:") print(f"Name: {platform_name}") print(f"Trigraph: {trigraph}") print(f"Quadgraph: {quadgraph}") print(f"Identifier: {identifier}") print(f"Nationality: {chosen_nationality.name}") print(f"Platform type: {chosen_platform_type.name}") print(f"Classification: {chosen_privacy.name}") choice = create_menu( "Create this platform?: ", ["Yes", "No, make further edits", "No, restart platform selection process"], validate_method=is_valid_choice, ) if choice in ["?", "HELP"]: print_help_text(data_store, constants.ADD_TO_PLATFORMS) else: break if choice == str(1): return ( platform_name, trigraph, quadgraph, identifier, chosen_platform_type, chosen_nationality, chosen_privacy, ) elif choice == str(2): return self.add_to_platforms( data_store, platform_name, None, None, None, None, change_id ) elif choice == str(3): return self.resolve_platform( data_store, platform_name, None, None, None, None, change_id ) elif choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return self.resolve_platform( data_store, platform_name, None, None, None, None, change_id )
[docs] def add_to_sensors(self, data_store, sensor_name, sensor_type, host_id, privacy, change_id): """ This method resolves sensor type and privacy. It returns existing or resolved sensor type and privacy entities. :param data_store: A :class:`DataStore` object :type data_store: DataStore :param sensor_name: Name of :class:`Sensor` :type sensor_name: Sensor :param sensor_type: Type of :class:`Sensor` :type sensor_type: SensorType :param privacy: Name of :class:`Privacy` :type privacy: Privacy :param change_id: ID of the :class:`Change` object :type change_id: UUID :return: """ # Choose Sensor Type print("Ok, adding new sensor.") # Can't pass None to prompt function below, so pass empty string instead if sensor_name is None: sensor_name = "" sensor_name = prompt(format_command("Please enter a name: "), default=sensor_name).strip() if sensor_name == "": custom_print_formatted_text( format_error_message("You must provide a sensor name. Restarting sensor data entry") ) return self.add_to_sensors( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) if len(sensor_name) > 150: custom_print_formatted_text( format_error_message( "Sensor name too long, maximum length 150 characters. Restarting sensor data entry." ) ) return self.add_to_sensors( data_store, sensor_name, sensor_type, host_id, privacy, change_id ) if sensor_type: sensor_type = data_store.add_to_sensor_types(sensor_type, change_id) else: sensor_type = self.resolve_reference( data_store, change_id, data_type="Sensor", db_class=data_store.db_classes.SensorType, field_name="sensor_type", help_id=constants.RESOLVE_SENSOR_TYPE, search_help_id=constants.FUZZY_SEARCH_SENSOR_TYPE, ) if sensor_type is None: custom_print_formatted_text( format_error_message( "Sensor Type couldn't resolved. Returning to the previous menu!" ) ) return self.resolve_sensor(data_store, sensor_name, None, host_id, None, change_id) if privacy: chosen_privacy = data_store.search_privacy(privacy) if chosen_privacy is None: level = prompt( format_command(f"Please type level of new classification ({privacy}): "), validator=numeric_validator, ) chosen_privacy = data_store.add_to_privacies(privacy, level, change_id) else: chosen_privacy = self.resolve_reference( data_store, change_id, data_type="Sensor", text_name="classification", db_class=data_store.db_classes.Privacy, field_name="privacy", help_id=constants.RESOLVE_PRIVACY, search_help_id=constants.FUZZY_SEARCH_PRIVACY, ) if chosen_privacy is None: custom_print_formatted_text( format_error_message( "Classification couldn't resolved. Returning to the previous menu!" ) ) return self.resolve_sensor(data_store, sensor_name, None, host_id, None, change_id) def is_valid_choice(option): # pragma: no cover return option in [str(1), str(2), str(3), ".", "?", "HELP"] while True: print("-" * 60) print("Input complete. About to create this sensor:") print(f"Name: {sensor_name}") print(f"Type: {sensor_type.name}") print(f"Classification: {chosen_privacy.name}") choice = create_menu( "Create this sensor?: ", ["Yes", "No, make further edits", "No, restart sensor selection process"], validate_method=is_valid_choice, ) if choice in ["?", "HELP"]: print_help_text(data_store, constants.ADD_TO_SENSORS) else: break if choice == str(1): return sensor_name, sensor_type, chosen_privacy elif choice == str(2): return self.add_to_sensors(data_store, sensor_name, None, host_id, None, change_id) elif choice == str(3): return self.resolve_sensor(data_store, sensor_name, None, host_id, None, change_id) elif choice == ".": print("-" * 60, "\nReturning to the previous menu\n") return self.resolve_sensor(data_store, sensor_name, None, host_id, None, change_id)
[docs]class MinMaxValidator(Validator): """A validator to check numerical values are between a given minimum/maximum value""" def __init__(self, minimum=None, maximum=None, allow_empty=False): """Creates a new validator to check numerical values between two values :param minimum: The minimum value (inclusive) allowed by the validator :param maximum: The maximum value (inclusive) allowed by the validator :param allow_empty: Whether we allow an empty string to be parsed """ self.min = minimum self.max = maximum self.allow_empty = allow_empty
[docs] def validate(self, document): to_validate = document.text if is_number(to_validate): number = int(to_validate) if self.min and number < self.min: raise ValidationError(len(document.text), f"Number must be {self.min} or higher") if self.max and number > self.max: raise ValidationError(len(document.text), f"Number must be {self.max} or lower") elif self.allow_empty is True and to_validate == "": pass else: raise ValidationError(len(document.text), "This input contains non-numeric characters")