Source code for pepys_import.utils.data_store_utils

import csv
import json
import os
import sys
import uuid
from inspect import getfullargspec
from math import ceil

import sqlalchemy
from sqlalchemy import func, inspect, select
from sqlalchemy.sql.expression import text

from paths import MIGRATIONS_DIRECTORY, PEPYS_IMPORT_DIRECTORY
from pepys_import.resolvers.command_line_input import create_menu
from pepys_import.utils.sqlalchemy_utils import get_primary_key_for_table
from pepys_import.utils.table_name_utils import table_name_to_class_name
from pepys_import.utils.text_formatting_utils import (
    custom_print_formatted_text,
    format_error_message,
)

STORED_PROC_PATH = os.path.join(PEPYS_IMPORT_DIRECTORY, "database", "postgres_stored_procedures")


[docs]def import_from_csv(data_store, path, files, change_id): for file in sorted(files): # split file into filename and extension table_name, _ = os.path.splitext(file) if table_name.lower() == "synonyms": import_synonyms(data_store, os.path.join(path, file), change_id) continue possible_method = "add_to_" + table_name.lower().replace(" ", "_") method_to_call = getattr(data_store, possible_method, None) if method_to_call: # Get all arguments of the method, except the first argument which is 'self' arguments = getfullargspec(method_to_call).args[1:] possible_arguments = ",".join(arguments) with open(os.path.join(path, file), "r") as file_object: reader = csv.reader(file_object) # extract header header = next(reader) if not set(header).issubset(set(arguments)): custom_print_formatted_text( format_error_message( f"Headers and the arguments of DataStore.{possible_method}() don't match!" f"\nPossible arguments: {possible_arguments}" f"\nPlease check your CSV file." ) ) return for row_number, row in enumerate(reader): row_as_string = "".join(row).strip() if row_as_string == "": continue keyword_arguments = dict(zip(header, row)) try: method_to_call(**keyword_arguments, change_id=change_id) except Exception as e: custom_print_formatted_text( format_error_message(f"Error importing row {row} from {file}") ) custom_print_formatted_text(format_error_message(f" Error was '{str(e)}'")) else: custom_print_formatted_text( format_error_message(f"Method({possible_method}) not found!") )
[docs]def import_synonyms(data_store, filepath, change_id): with open(filepath, "r") as file_object: reader = csv.reader(file_object) # extract header header = next(reader) if not set(header).issubset({"synonym", "table", "target_name"}): custom_print_formatted_text( format_error_message( "Headers of the Synonyms.csv file are wrong or missing!" "\nNecessary arguments: synonym,table,target_name" "\nPlease check your CSV file." ) ) return # For every row in the CSV for row in reader: row_as_string = "".join(row).strip() if row_as_string == "": continue values = dict(zip(header, row)) # Search in the given table for the name class_name = table_name_to_class_name(values["table"]) try: db_class = getattr(data_store.db_classes, class_name) pri_key_column_name = db_class.__table__.primary_key.columns.values()[0].name except AttributeError: custom_print_formatted_text(format_error_message(f"Error on row {row}")) custom_print_formatted_text( format_error_message(f" Invalid table name {values['table']}") ) continue # Try and find a name column to use possibilities = ["name", "reference"] name_col = None for poss in possibilities: try: name_col = getattr(db_class, poss) except AttributeError: continue if name_col is None: custom_print_formatted_text(format_error_message(f"Error on row {row}")) custom_print_formatted_text( format_error_message(f" Cannot find name column for table {values['table']}") ) continue results = ( data_store.session.query(db_class).filter(name_col == values["target_name"]).all() ) if len(results) == 0: # Nothing to link synonym to so give error custom_print_formatted_text(format_error_message(f"Error on row {row}")) custom_print_formatted_text( format_error_message( f" Name '{values['target_name']}' is not found in table {values['table']}" ) ) continue elif len(results) == 1: guid = getattr(results[0], pri_key_column_name) # Found one entry, so can create synonym data_store.add_to_synonyms(values["table"], values["synonym"], guid, change_id) elif len(results) > 1: if values["table"] != "Platforms": custom_print_formatted_text(format_error_message(f"Error on row {row}")) custom_print_formatted_text( format_error_message( f" Name '{values['target_name']}' occurs multiple times in table {values['table']}." f" Asking user to resolve is only supported for Platforms table." ) ) continue results = sorted(results, key=lambda x: x.identifier) chosen_item = ask_user_for_synonym_link(data_store, results, values) if chosen_item is None: print("Skipping row") continue else: guid = getattr(chosen_item, pri_key_column_name) data_store.add_to_synonyms(values["table"], values["synonym"], guid, change_id)
[docs]def is_schema_created(engine, db_type): """Returns True if Pepys Tables are created, False otherwise.""" inspector = inspect(engine) if db_type == "sqlite": table_names = inspector.get_table_names() # SQLite table numbers vary by mod_spatialite. The version of mod_spatialiate # that is installed can vary by platform - so both numbers should be acceptable. if len(table_names) >= 77 and len(table_names) <= 79: return True else: table_names = inspector.get_table_names(schema="pepys") if len(table_names) == 41: return True if len(table_names) == 0: message = "Database tables are not found! (Hint: Did you initialise the DataStore?)" else: message = "Please run database migration to bring tables up to date." custom_print_formatted_text(format_error_message(message)) return False
[docs]def create_spatial_tables_for_sqlite(engine): """Create geometry_columns and spatial_ref_sys metadata table""" if not inspect(engine).has_table("spatial_ref_sys"): with engine.begin() as connection: connection.execute(select(func.InitSpatialMetaData(1)))
[docs]def create_spatial_tables_for_postgres(engine): """Create schema pepys and extension for PostGIS""" query = """ CREATE SCHEMA IF NOT EXISTS pepys; CREATE EXTENSION IF NOT EXISTS postgis; SET search_path = pepys,public; """ with engine.begin() as connection: connection.execute(text(query))
[docs]def create_stored_procedures_for_postgres(engine): stored_procedure_files = [ os.path.join(STORED_PROC_PATH, "dashboard_metadata.sql"), os.path.join(STORED_PROC_PATH, "dashboard_stats.sql"), os.path.join(STORED_PROC_PATH, "Comments_for.sql"), os.path.join(STORED_PROC_PATH, "Contacts_for.sql"), os.path.join(STORED_PROC_PATH, "Datafiles_for.sql"), os.path.join(STORED_PROC_PATH, "States_for.sql"), ] with engine.begin() as connection: for filename in stored_procedure_files: with open(filename) as f: procedure_definition = f.read() connection.execute(text(procedure_definition))
[docs]def create_alembic_version_table(engine, db_type): with open(os.path.join(MIGRATIONS_DIRECTORY, "latest_revisions.json"), "r") as file: versions = json.load(file) if "LATEST_POSTGRES_VERSION" not in versions or "LATEST_SQLITE_VERSION" not in versions: custom_print_formatted_text(format_error_message("Latest revision IDs couldn't found!")) return if db_type == "sqlite": # Try and get all entries from alembic_version table try: with engine.begin() as connection: table_contents = connection.execute( text("SELECT * from alembic_version;") ).fetchall() if len(table_contents) == 0: # Table exists but no version number row, so stamp it: sql = "INSERT INTO alembic_version (version_num) VALUES (:id)" connection.execute(text(sql), {"id": versions["LATEST_SQLITE_VERSION"]}) if len(table_contents) == 1: if table_contents[0][0] == versions["LATEST_SQLITE_VERSION"]: # Current version already stamped in table - so just continue print( "Initialising database - alembic version in database matches latest version." ) else: # The version in the database doesn't match the current version - so raise an error raise ValueError( f"Database revision in alembic_version table ({table_contents[0][0]}) does not match latest revision ({versions['LATEST_SQLITE_VERSION']})." "Please run database migration." ) if len(table_contents) > 1: raise ValueError( "Multiple rows detected in alembic_version table. Database potentially in inconsistent state." "Migration functionality will not work. Please contact support." ) except sqlalchemy.exc.OperationalError: with engine.begin() as connection: # Error running select, so table doesn't exist - create it and stamp the current version connection.execute( text( """ CREATE TABLE IF NOT EXISTS alembic_version ( version_num VARCHAR(32) NOT NULL, CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num) ); """ ) ) sql = "INSERT INTO alembic_version (version_num) VALUES (:id)" connection.execute(text(sql), {"id": versions["LATEST_SQLITE_VERSION"]}) else: # Try and get all entries from alembic_version table try: with engine.begin() as connection: table_contents = connection.execute( text("SELECT * from pepys.alembic_version;") ).fetchall() if len(table_contents) == 0: # Table exists but no version number row, so stamp it: sql = "INSERT INTO pepys.alembic_version (version_num) VALUES (:id)" connection.execute(text(sql), {"id": versions["LATEST_POSTGRES_VERSION"]}) if len(table_contents) == 1: if table_contents[0][0] == versions["LATEST_POSTGRES_VERSION"]: # Current version already stamped in table - so just continue print( "Initialising database - alembic version in database matches latest version." ) else: # The version in the database doesn't match the current version - so raise an error raise ValueError( f"Database revision in alembic_version table ({table_contents[0][0]}) does not match latest revision ({versions['LATEST_POSTGRES_VERSION']})." "Please run database migration." ) if len(table_contents) > 1: raise ValueError( "Multiple rows detected in alembic_version table. Database potentially in inconsistent state." "Migration functionality will not work. Please contact support." ) except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.ProgrammingError): # Error running select, so table doesn't exist - create it and stamp the current version with engine.begin() as connection: connection.execute( text( """ CREATE TABLE IF NOT EXISTS pepys.alembic_version ( version_num VARCHAR(32) NOT NULL, CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num) ); """ ) ) sql = "INSERT INTO pepys.alembic_version (version_num) VALUES (:id)" connection.execute(text(sql), {"id": versions["LATEST_POSTGRES_VERSION"]})
[docs]def cache_results_if_not_none(cache_attribute): def real_decorator(f): def helper(self, name): cache = eval("self." + cache_attribute) if name not in cache: result = f(self, name) if result: self.session.expunge(result) cache[name] = result return result else: return cache[name] return helper return real_decorator
[docs]def shorten_uuid(id): # pragma: no cover return str(id)[-6:]
[docs]class MissingDataException(Exception): pass
[docs]def lowercase_or_none(obj): if obj is None: return None else: return obj.lower()
[docs]def chunked_list(lst, size): """Split a list into multiple chunks of length size. Returns a list containing sublists of length size. If the list doesn't divide by size exactly, then the last sublist will have a length < size. """ # Quick 'short-circuit' for a list less than size if len(lst) < size: return [lst] n_chunks = ceil(len(lst) / size) # We're returning a list containing lots of sublists # rather than yielding items as a generator # This is because we use a tqdm progress bar around this # function, and that needs to know the number of sublists # to be able to show a proper progress bar result = [] for i in range(n_chunks): result.append(lst[i * size : (i + 1) * size]) return result
[docs]def convert_edit_dict_columns(edit_dict, table_object): update_dict = {} # Convert the edit_dict we get from the GUI into a dict suitable for use in the update function # This involves converting any relationship columns into their ID column for col_name, new_value in edit_dict.items(): attr_from_db_class = getattr(table_object, col_name) try: if isinstance( attr_from_db_class.prop, sqlalchemy.orm.relationships.RelationshipProperty ): local_column = list(attr_from_db_class.prop.local_columns)[0].key update_dict[local_column] = new_value else: update_dict[col_name] = new_value except Exception: update_dict[col_name] = new_value return update_dict
[docs]def convert_objects_to_ids(items, table_obj): if isinstance(items, list): new_id_list = [] for value in items: if not isinstance(value, uuid.UUID): value = getattr(value, get_primary_key_for_table(table_obj)) new_id_list.append(value) return new_id_list else: if not isinstance(items, uuid.UUID): value = getattr(items, get_primary_key_for_table(table_obj)) else: value = items return value
[docs]def read_version_from_pepys_install(path): init_path = os.path.join(path, "pepys_import", "__init__.py") try: with open(init_path, "r") as f: for line in f: if "__version__" in line: splitted = line.split("=") # Remove whitespace, double-quotes and single-quotes from either end version = splitted[1].strip().strip('"').strip("'") return version return None except Exception: print(f"WARNING: Cannot read Pepys version from network master install at {path}") return None