Source code for pepys_import.file.file_processor

import inspect
import json
import os
import shutil
from datetime import datetime
from getpass import getuser
from stat import S_IREAD

from halo import Halo
from prompt_toolkit import prompt
from prompt_toolkit.validation import Validator

from paths import IMPORTERS_DIRECTORY
from pepys_import import __build_timestamp__, __version__
from pepys_import.core.store import constants
from pepys_import.core.store.data_store import DataStore
from pepys_import.core.store.db_status import TableTypes
from pepys_import.file.highlighter.highlighter import HighlightedFile
from pepys_import.file.importer import Importer
from pepys_import.resolvers.command_line_resolver import CommandLineResolver
from pepys_import.utils.datafile_utils import hash_file
from pepys_import.utils.import_utils import import_module_, sort_files
from pepys_import.utils.sqlalchemy_utils import get_primary_key_for_table
from pepys_import.utils.table_name_utils import table_name_to_class_name
from pepys_import.utils.text_formatting_utils import (
    custom_print_formatted_text,
    format_error_message,
    format_table,
    print_new_section_title,
)

USER = getuser()


[docs]class FileProcessor: def __init__( self, filename=None, archive=False, skip_validation=False, archive_path=None, local_parsers=None, ): self.importers = [] # Register local importers if any exists if local_parsers: if not os.path.exists(local_parsers): custom_print_formatted_text( format_error_message( f"No such file or directory: {local_parsers}. Only core " "parsers are going to work." ) ) else: self.load_importers_dynamically(local_parsers) if filename is None: self.filename = ":memory:" else: self.filename = filename self.output_path = None self.input_files_path = None self.directory_path = None self.archive = archive if self.archive: # Only create the archive folder if we are actually going # to be doing archiving if archive_path: # Create the path if it doesn't exist try: if not os.path.exists(archive_path): os.makedirs(archive_path) self.output_path = archive_path except Exception as e: raise ValueError( f"Could not create archive folder at {archive_path}. Original error: {str(e)}" ) self.skip_validation = skip_validation
[docs] def process(self, path: str, data_store: DataStore = None, descend_tree: bool = True): """Process the data in the given path :param path: File/Folder path :type path: String :param data_store: Database :type data_store: DataStore :param descend_tree: Whether to recursively descend through the folder tree :type descend_tree: bool """ dir_path = os.path.dirname(path) # create output folder if not exists if not self.output_path: self.output_path = os.path.join(dir_path, "output") if not os.path.exists(self.output_path): os.makedirs(self.output_path) # Create dict to store success/failures/skipped in import_summary = {} import_summary["succeeded"] = [] import_summary["failed"] = [] import_summary["skipped"] = [] # Take current timestamp without milliseconds now = datetime.utcnow() # Create non existing directories in the following format: # output_folder/YYYY/MM/DD/HH/mm/ss(_sss) self.output_path = os.path.join( self.output_path, str(now.year), str(now.month).zfill(2), str(now.day).zfill(2), str(now.hour).zfill(2), str(now.minute).zfill(2), str(now.second).zfill(2), ) if not os.path.isdir(self.output_path): os.makedirs(self.output_path) else: self.output_path = os.path.join( self.output_path + "_" + str(now.microsecond).zfill(3)[:3] ) os.makedirs(self.output_path) # create input_files folder if not exists self.input_files_path = os.path.join(self.output_path, "sources") if not os.path.exists(self.input_files_path): os.makedirs(self.input_files_path) self.directory_path = os.path.join(self.output_path, "reports") if not os.path.isdir(self.directory_path): os.makedirs(self.directory_path) processed_ctr = 0 # get the data_store if data_store is None: data_store = DataStore("", "", "", 0, self.filename, db_type="sqlite") data_store.initialise() # If given path is a single file, then just process that file if os.path.isfile(path): with data_store.session_scope(): filename = os.path.abspath(path) current_path = os.path.dirname(path) processed_ctr = self.process_file( filename, current_path, data_store, processed_ctr, import_summary, file_number=1, total_files=1, ) self.display_import_summary(import_summary) print(f"Files got processed: {processed_ctr} times") abs_path = os.path.abspath(self.output_path) print(f"Archive/report folders can be found at {abs_path}") return # If we've got to here then we're dealing with a folder # check folder exists if not os.path.isdir(path): raise FileNotFoundError(f"Folder not found in the given path: {path}") # decide whether to descend tree, or just work on this folder with data_store.session_scope(): # capture path in absolute form abs_path = os.path.abspath(path) if descend_tree: # loop through this folder and children and store in list # (so we know how many we have) files_and_paths = [] for current_path, folders, files in os.walk(abs_path): for file in sort_files(files): files_and_paths.append((file, current_path)) for i, (file, current_path) in enumerate(files_and_paths, start=1): processed_ctr = self.process_file( file, current_path, data_store, processed_ctr, import_summary, file_number=i, total_files=len(files_and_paths), ) else: # loop through this path files_in_folder = [ file for file in sort_files(os.scandir(abs_path)) if file.is_file() ] for i, file in enumerate(files_in_folder, start=1): processed_ctr = self.process_file( file, abs_path, data_store, processed_ctr, import_summary, file_number=i, total_files=len(files_in_folder), ) self.display_import_summary(import_summary) print(f"Files got processed: {processed_ctr} times")
[docs] def process_file( self, file_object, current_path, data_store, processed_ctr, import_summary, file_number, total_files, ): # file may have full path, therefore extract basename and split it basename = os.path.basename(file_object) filename, file_extension = os.path.splitext(basename) if basename == ".DS_Store": return processed_ctr # make copy of list of importers good_importers = self.importers.copy() full_path = os.path.join(current_path, basename) # print("Checking:" + str(full_path)) # start with file suffixes tmp_importers = good_importers.copy() for importer in tmp_importers: # print("Checking suffix:" + str(importer)) if not importer.can_load_this_type(file_extension): good_importers.remove(importer) # now the filename tmp_importers = good_importers.copy() for importer in tmp_importers: # print("Checking filename:" + str(importer)) if not importer.can_load_this_filename(filename): good_importers.remove(importer) # tests are starting to get expensive. Check # we have some file importers left if len(good_importers) > 0: # now the first line tmp_importers = good_importers.copy() first_line = self.get_first_line(full_path) for importer in tmp_importers: # print("Checking first_line:" + str(importer)) if not importer.can_load_this_header(first_line): good_importers.remove(importer) # Create a HighlightedFile instance for the file highlighted_file = HighlightedFile(full_path) # Get the file contents, for the final check try: file_contents = self.get_file_contents(full_path) except Exception: # Can't get the file contents - eg. because it's not a proper # unicode text file (This can occur for binary files in the same folders) # So skip the file return processed_ctr # lastly the contents tmp_importers = good_importers.copy() for importer in tmp_importers: if not importer.can_load_this_file(file_contents): good_importers.remove(importer) # if good importers list is empty, return processed_ctr, # which means the file is not processed if not good_importers: return processed_ctr # If the file is loaded before, return processed_ctr, # which means the file is not processed again file_size = os.path.getsize(full_path) file_hash = hash_file(full_path) if data_store.is_datafile_loaded_before(file_size, file_hash): return processed_ctr reason = f"Importing '{basename}' using Pepys {__version__}" # ok, let these importers handle the file if __build_timestamp__ is not None: reason += f", built on {__build_timestamp__}" change = data_store.add_to_changes(user=USER, modified=datetime.utcnow(), reason=reason) privacy = None for importer in good_importers: if importer.default_privacy: privacy = importer.default_privacy break exclude = [ constants.CHANGE, constants.DATAFILE, constants.EXTRACTION, constants.LOG, ] metadata_summaries_before = data_store.get_status(TableTypes.METADATA, exclude=exclude) measurement_summaries_before = data_store.get_status( TableTypes.MEASUREMENT, exclude=exclude ) # This will produce a header with a progress counter # Be aware that the denominator is the count of all files in the path to be imported # and Pepys may ignore some (or many) of these files if they aren't types that Pepys recognises # So it could say "Importing file 1 of 500" and then only import 3 files, if the other 497 are # files that no Pepys importers recognise print_new_section_title(f"Processing file {file_number} of {total_files}:\n{basename}") # We assume that good importers will have the same datafile-type values at the moment. # That's why we can create a datafile using the first importer's datafile_type. # They don't have different datafile-type values, but if necessary, we might iterate over # good importers and find a composite datafile-type. datafile = data_store.get_datafile( basename, good_importers[0].datafile_type, file_size, file_hash, change.change_id, privacy=privacy, ) highlighted_file.datafile = datafile datafile.highlighted_file = highlighted_file # Update change object change.datafile_id = datafile.datafile_id data_store.session.flush() # Run all parsers for importer in good_importers: processed_ctr += 1 importer.load_this_file( data_store, full_path, highlighted_file, datafile, change.change_id ) # Write highlighted output to file highlighted_output_path = os.path.join( self.directory_path, f"{filename}_highlighted.html" ) print(f"Writing highlighted file for {basename}") highlighted_file.export(highlighted_output_path, include_key=True) # Run all validation tests errors = list() importers_with_errors = [] validators_with_errors = [] for importer in good_importers: # If the importer has errors then note this, so we can inform the user if len(importer.errors) > 0: importers_with_errors.append(importer.short_name) # Call related validation tests, extend global errors lists if the # importer has errors validation_errors = [] validated, failed_validators = datafile.validate( validation_level=importer.validation_level, errors=validation_errors, parser=importer.short_name, skip_validation=self.skip_validation, ) # Add the list of failed validators from that importer to # the overall list of validators with errors for this file validators_with_errors.extend(failed_validators) # Add the importer errors and the validation errors to the list # of errors for this file errors.extend(importer.errors) errors.extend(validation_errors) data_store.missing_data_resolver.reset_per_file_settings() # If all tests pass for all parsers, commit datafile if not errors: # Keep track of some details for the import summary summary_details = {} summary_details["filename"] = basename log = datafile.commit(data_store, change.change_id) metadata_summaries_after = data_store.get_status( TableTypes.METADATA, exclude=exclude ) measurement_summaries_after = data_store.get_status( TableTypes.MEASUREMENT, exclude=exclude ) metadata_report = metadata_summaries_after.show_delta_of_rows_added_metadata( metadata_summaries_before ) formatted_text = format_table("METADATA REPORT", table_string=metadata_report) custom_print_formatted_text(formatted_text) measurement_report = measurement_summaries_after.show_delta_of_rows_added( measurement_summaries_before ) formatted_text = format_table("MEASUREMENT REPORT", table_string=measurement_report) custom_print_formatted_text(formatted_text) if isinstance(data_store.missing_data_resolver, CommandLineResolver): choices = ( "Import metadata", "Import metadata and measurements", "Don't import data from this file.", ) choice = self._ask_user_for_finalizing_import(choices) else: # default is Import metadata and measurements choice = "2" if choice == "1": # Import metadata self._remove_measurements(data_store, datafile, change.change_id) data_store.session.commit() # Set log to an empty list because measurements are deleted log = [] elif choice == "2": # Import metadata and measurements pass else: # Don't import data from this file. # Remove metadata and measurement self._remove_measurement_and_metadata(data_store, datafile, change.change_id) data_store.session.commit() import_summary["skipped"].append(summary_details) return processed_ctr # write extraction log to output folder with open( os.path.join(self.directory_path, f"{filename}_output.log"), "w", ) as file: file.write("\n".join(log)) if self.archive is True: # move original file to output folder new_path = os.path.join(self.input_files_path, basename) shutil.move(full_path, new_path) # make it read-only os.chmod(new_path, S_IREAD) summary_details["archived_location"] = new_path import_summary["succeeded"].append(summary_details) else: metadata_summaries_after = data_store.get_status( TableTypes.METADATA, exclude=exclude ) metadata_report = metadata_summaries_after.show_delta_of_rows_added_metadata( metadata_summaries_before ) formatted_text = format_table("METADATA REPORT", table_string=metadata_report) custom_print_formatted_text(formatted_text) if isinstance(data_store.missing_data_resolver, CommandLineResolver): choices = ( "Import metadata", "Don't import data from this file.", ) choice = self._ask_user_for_finalizing_import(choices) else: # Default is import metadata choice = "1" if choice == "1": # Import metadata self._remove_measurements(data_store, datafile, change.change_id) elif choice == "2": # Don't import data from this file self._remove_measurement_and_metadata(data_store, datafile, change.change_id) data_store.session.commit() failure_report_filename = os.path.join( self.directory_path, f"{filename}_errors.log" ) # write error log to the output folder with open(failure_report_filename, "w") as file: json.dump(errors, file, ensure_ascii=False, indent=4) import_summary["failed"].append( { "filename": basename, "importers_with_errors": importers_with_errors, "validators_with_errors": validators_with_errors, "report_location": failure_report_filename, } ) return processed_ctr
def _remove_measurement_and_metadata(self, data_store, datafile, change_id): # Remove measurement entities self._remove_measurements(data_store, datafile, change_id) # Remove metadata entities self._remove_metadata(data_store, change_id) @staticmethod def _remove_measurements(data_store, datafile, change_id): # Delete the datafile entry, as we won't be importing any entries linked to it, # because we had errors. CASCADING will handle the deletion of the all measurement objects # of the datafile spinner = Halo(text="@ Clearing measurements", spinner="dots") spinner.start() data_store.session.delete(datafile) # Remove log objects objects_from_logs = data_store.get_logs_by_change_id(change_id) for obj in objects_from_logs: table_cls = getattr(data_store.db_classes, table_name_to_class_name(obj.table)) if table_cls.table_type == TableTypes.MEASUREMENT: data_store.session.delete(obj) spinner.succeed("Measurements cleared") @staticmethod def _remove_metadata(data_store, change_id): spinner = Halo(text="@ Clearing metadata", spinner="dots") spinner.start() objects_from_logs = data_store.get_logs_by_change_id(change_id) for obj in objects_from_logs: table_cls = getattr(data_store.db_classes, table_name_to_class_name(obj.table)) if table_cls.table_type == TableTypes.METADATA: primary_key_field = getattr(table_cls, get_primary_key_for_table(table_cls)) data_store.session.query(table_cls).filter(primary_key_field == obj.id).delete() # Remove Logs entity data_store.session.delete(obj) spinner.succeed("Metadata cleared")
[docs] @staticmethod def display_import_summary(import_summary): if len(import_summary["succeeded"]) > 0: print("Import succeeded for:") for details in import_summary["succeeded"]: if "archived_location" in details: print(f" - {details['filename']}") print(f" - Archived to {details['archived_location']}") else: print(f" - {details['filename']}") print() if len(import_summary["failed"]) > 0: print("Import failed for:") for details in import_summary["failed"]: failed_importers_list = "\n".join( [f" - {name}" for name in details["importers_with_errors"]] ) failed_validators_list = "\n".join( [f" - {name}" for name in set(details["validators_with_errors"])] ) print(f" - {details['filename']}") if len(failed_importers_list) > 0: print(" - Importers failing:") print(failed_importers_list) if len(failed_validators_list) > 0: print(" - Validators failing:") print(failed_validators_list) print(f" - Failure report at {details['report_location']}") print() if len(import_summary["skipped"]) > 0: print("Import skipped for:") for details in import_summary["skipped"]: print(f" - {details['filename']}") print()
[docs] def register_importer(self, importer): """Adds the supplied importer to the list of import modules :param importer: An importer module that must define the functions defined in the Importer base class :type importer: Importer """ self.importers.append(importer)
[docs] def load_importers_dynamically(self, path=IMPORTERS_DIRECTORY): """Dynamically adds all the importers in the given path. It loads core importers by default. :param path: Path of a folder that has importers :type path: String """ if os.path.exists(path): for file in sort_files(os.scandir(path)): # import file using its name and full path if file.is_file(): classes = import_module_(file) for name, class_ in classes: # continue only if it's a concrete class that inherits Importer if issubclass(class_, Importer) and not inspect.isabstract(class_): # Create an object of the class, add it to importers obj = class_() self.importers.append(obj)
[docs] @staticmethod def get_first_line(file_path: str): """Retrieve the first line from the file :param file_path: Full file path :type file_path: String :return: First line of text :rtype: String """ try: with open(file_path, "r", encoding="windows-1252") as file: first_line = file.readline() return first_line except UnicodeDecodeError: return None
[docs] @staticmethod def get_file_contents(full_path: str): with open(full_path, "r", encoding="windows-1252") as file: lines = file.read().split("\n") return lines
@staticmethod def _input_validator(options): def is_valid(option): return option in [str(o) for o, _ in enumerate(options, 1)] validator = Validator.from_callable( is_valid, error_message="You didn't select a valid option", move_cursor_to_end=True, ) return validator def _ask_user_for_finalizing_import(self, choices): validator = self._input_validator(choices) input_text = "\n\nWould you like to\n" for index, choice in enumerate(choices, 1): input_text += f" {str(index)}) {choice}\n" choice = prompt(input_text, validator=validator) return choice