Source code for pepys_admin.export_cli
import os
from datetime import datetime
from iterfzf import iterfzf
from prompt_toolkit import prompt as ptk_prompt
from prompt_toolkit.completion import PathCompleter
from pepys_admin.base_cli import BaseShell
from pepys_admin.utils import get_default_export_folder
from pepys_import.utils.data_store_utils import is_schema_created
from pepys_import.utils.text_formatting_utils import (
custom_print_formatted_text,
format_command,
format_error_message,
)
[docs]class ExportShell(BaseShell):
"""Offers to export datafiles by name, by platform and sensor"""
choices = """(1) Export by name
(2) Export by Platform and sensor
(.) Back
"""
prompt = "(pepys-admin) (export) "
[docs] @staticmethod
def do_cancel():
"""Returns to the previous menu"""
print("Returning to the previous menu...")
def __init__(self, data_store):
super(ExportShell, self).__init__()
self.data_store = data_store
self.aliases = {
".": self.do_cancel,
"1": self.do_export,
"2": self.do_export_by_platform_name,
"9": self.do_export_all,
}
[docs] def do_export(self):
"""Exports datafiles by name."""
if is_schema_created(self.data_store.engine, self.data_store.db_type) is False:
return
with self.data_store.session_scope():
datafiles = self.data_store.get_all_datafiles()
if not datafiles:
custom_print_formatted_text(
format_error_message("There is no datafile found in the database!")
)
return
datafiles_dict = {d.reference: d.datafile_id for d in datafiles}
message = "Select a datafile to export > "
selected_datafile = iterfzf(datafiles_dict.keys(), prompt=message)
if selected_datafile is None or selected_datafile not in datafiles_dict.keys():
custom_print_formatted_text(
format_error_message("You haven't selected a valid option!")
)
return
export_flag = ptk_prompt(
format_command(f"Do you want to export {selected_datafile}? (Y/n)\n")
)
if export_flag in ["", "Y", "y"]:
folder_completer = PathCompleter(only_directories=True, expanduser=True)
folder_path = ptk_prompt(
format_command("Please provide a folder path for the exported file: "),
default=get_default_export_folder(),
completer=folder_completer,
complete_while_typing=True,
)
datafile_name = f"exported_{selected_datafile.replace('.', '_')}.rep"
print(f"'{selected_datafile}' is going to be exported.")
selected_datafile_id = datafiles_dict[selected_datafile]
export_file_full_path = os.path.expanduser(os.path.join(folder_path, datafile_name))
with self.data_store.session_scope():
self.data_store.export_datafile(selected_datafile_id, export_file_full_path)
print(f"Datafile successfully exported to {export_file_full_path}.")
elif export_flag in ["N", "n"]:
print("You selected not to export!")
else:
custom_print_formatted_text(format_error_message("Please enter a valid input."))
[docs] def do_export_by_platform_name(self):
"""Exports datafiles by platform and sensor names. It asks user to select an existing
:code:`Platform` first. Then, it finds all datafile objects which include the selected
:code:`Platform`. Creates a dynamic intro (menu) from the found datafile objects, runs
:code:`ExportByPlatformNameShell`
"""
if is_schema_created(self.data_store.engine, self.data_store.db_type) is False:
return
Sensor = self.data_store.db_classes.Sensor
with self.data_store.session_scope():
platforms = self.data_store.session.query(self.data_store.db_classes.Platform).all()
if not platforms:
custom_print_formatted_text(
format_error_message("There is no platform found in the database!")
)
return
platforms_dict = {p.name: p.platform_id for p in platforms}
message = "Select a platform name to export datafiles that include it > "
selected_platform = iterfzf(platforms_dict.keys(), prompt=message)
if selected_platform is None or selected_platform not in platforms_dict.keys():
custom_print_formatted_text(
format_error_message("You haven't selected a valid option!")
)
return
# Find related sensors to the selected platform
platform_id = platforms_dict[selected_platform]
sensors = self.data_store.session.query(Sensor).filter(Sensor.host == platform_id).all()
sensors_dict = {s.name: s.sensor_id for s in sorted(sensors, key=lambda x: x.name)}
with self.data_store.session_scope():
objects = self.data_store.find_related_datafile_objects(platform_id, sensors_dict)
# Create a dynamic menu for the found datafile objects
title = "Select from the found datafile objects.\n--- Menu ---\n"
choices_idx = [
".",
]
choices_full_text = ""
for index, obj in enumerate(objects, 1):
choices_full_text += (
f"({index}) {obj['name']} {obj['filename']} {obj['min']}-{obj['max']}\n"
)
choices_idx.append(str(index))
choices_full_text += "(.) Cancel\n"
# Initialise a new menu
export_platform = ExportByPlatformNameShell(
self.data_store, title, choices_full_text, choices_idx, objects
)
export_platform.cmdloop()
[docs] def do_export_all(self):
"""Exports all datafiles."""
if is_schema_created(self.data_store.engine, self.data_store.db_type) is False:
return
export_flag = ptk_prompt(format_command("Do you want to export all Datafiles. (Y/n)\n"))
if export_flag in ["", "Y", "y"]:
while True:
folder_name = ptk_prompt(
format_command(
"Please provide folder name (Press Enter for auto generated folder):"
)
)
if folder_name:
if os.path.isdir(folder_name):
custom_print_formatted_text(
format_error_message(f"{folder_name} already exists.")
)
else:
os.mkdir(folder_name)
break
else:
folder_name = datetime.utcnow().strftime("exported_datafiles_%Y%m%d_%H%M%S")
os.mkdir(folder_name)
break
print(f"Datafiles are going to be exported to '{folder_name}' folder.")
with self.data_store.session_scope():
datafiles = self.data_store.get_all_datafiles()
if not datafiles:
custom_print_formatted_text(
format_error_message("There is no datafile found in the database!")
)
return
for datafile in datafiles:
datafile_name = f"exported_{datafile.reference.replace('.', '_')}.rep"
print(f"'{datafile_name}' is going to be exported.")
datafile_filename = os.path.join(folder_name, datafile_name)
datafile_id = datafile.datafile_id
self.data_store.export_datafile(datafile_id, datafile_filename)
print(f"Datafile successfully exported to {datafile_name}.")
print("All datafiles are successfully exported!")
elif export_flag in ["N", "n"]:
print("You selected not to export!")
else:
custom_print_formatted_text(format_error_message("Please enter a valid input."))
[docs]class ExportByPlatformNameShell(BaseShell):
"""Offers to export datafiles by platform and sensor"""
prompt = "(pepys-admin) (export by platform) "
def __init__(self, data_store, title, choices_full_text, choices_idx, objects):
super(ExportByPlatformNameShell, self).__init__()
self.data_store = data_store
self.title = title
self.choices = choices_full_text
self.choices_idx = choices_idx
self.objects = objects
[docs] @staticmethod
def do_cancel():
"""Returns to the previous menu"""
print("Returning to the previous menu...")
[docs] def do_export(self, option):
"""Asks user for a file name, then calls :code:`DataStore.export_datafile` to export Datafiles."""
datafile_id = option["datafile_id"]
sensor_id = option.get("sensor_id") # May be missing if it's a Comment object
platform_id = option.get("platform_id") # May be missing if it's a State or Contact object
default_export_name = f"exported_{option['name']}.rep"
file_name = ptk_prompt(
format_command(
f"Please provide a name (Press Enter for default value "
f"({default_export_name})):"
)
)
if file_name:
if not file_name.endswith(".rep"):
file_name += ".rep"
export_file_name = file_name or default_export_name
folder_completer = PathCompleter(only_directories=True, expanduser=True)
folder_path = ptk_prompt(
format_command(
"Please provide a folder path for the exported file: ",
),
default=get_default_export_folder(),
completer=folder_completer,
complete_while_typing=True,
)
export_file_full_path = os.path.expanduser(os.path.join(folder_path, export_file_name))
print(f"Objects are going to be exported to '{export_file_full_path}'.")
with self.data_store.session_scope():
self.data_store.export_datafile(
datafile_id, export_file_full_path, sensor_id, platform_id
)
print(f"Objects successfully exported to {export_file_full_path}.")
return True
[docs] def default(self, line):
cmd_, arg, line = self.parseline(line)
# Python accepts letters, digits, and "_" character as a command.
# Therefore, "." is interpreted as an argument.
if arg == "." and line == ".":
return True
elif cmd_ in self.choices_idx:
selected_option = self.objects[int(cmd_) - 1]
return self.do_export(selected_option)
else:
custom_print_formatted_text(format_error_message(f"*** Unknown syntax: {line}"))