from datetime import datetime
from pepys_import.core.formats import unit_registry
from pepys_import.core.formats.location import Location
from pepys_import.core.validators import constants
from pepys_import.file.highlighter.support.combine import combine_tokens
from pepys_import.file.importer import Importer
from pepys_import.utils.unit_utils import convert_absolute_angle, convert_distance, convert_speed
[docs]class NMEAImporter(Importer):
def __init__(self):
super().__init__(
name="NMEA File Format Importer",
validation_level=constants.BASIC_LEVEL,
short_name="NMEA Importer",
default_privacy="Private",
datafile_type="NMEA",
)
self.latitude = None
self.latitude_hem = None
self.longitude = None
self.longitude_hem = None
self.date = None
self.time = None
self.heading = None
self.speed = None
self.depth = None
self.date_token = None
self.time_token = None
self.speed_token = None
self.heading_token = None
self.lat_token = None
self.lon_token = None
self.location = None
[docs] def can_load_this_type(self, suffix):
return suffix.upper() == ".LOG" or suffix.upper() == ".TXT"
[docs] def can_load_this_filename(self, filename):
return True
[docs] def can_load_this_file(self, file_contents):
# Need to check this isn't WECDIS (Which is derived from this format)
contents_string = " ".join(file_contents[0:100])
return ",VER," not in contents_string and ",CHART," not in contents_string
def _load_this_line(self, data_store, line_number, line, datafile, change_id):
if line_number == 1:
# We don't know the platform name when we start reading the file, so
# set it to None. Later we will set it to the returned value from the get_platform call
# so that we don't have to ask about the platform again for each line in the file
self.platform_name = None
tokens = line.tokens(line.CSV_TOKENISER, ",")
if len(tokens) > 1:
msg_type = tokens[1].text
if msg_type == "DZA":
self.date_token = tokens[2]
self.date = self.date_token.text
self.time_token = tokens[3]
self.time = self.time_token.text
elif msg_type == "VEL":
self.speed_token = tokens[6]
self.speed = self.speed_token.text
elif msg_type == "HDG":
self.heading_token = tokens[2]
self.heading = self.heading_token.text
elif msg_type == "POS":
self.latitude = tokens[3].text
self.latitude_hem = tokens[4].text
self.lat_token = combine_tokens(tokens[3], tokens[4])
self.longitude = tokens[5].text
self.longitude_hem = tokens[6].text
self.lon_token = combine_tokens(tokens[5], tokens[6])
elif msg_type == "PDS":
self.depth_token = tokens[2]
self.depth = self.depth_token.text
# do we have all we need?
if (
self.date
and self.time
and self.speed
and self.heading
and self.latitude
and self.latitude_hem
and self.longitude
and self.longitude_hem
):
# and finally store it
platform = self.get_cached_platform(
data_store, self.platform_name, change_id=change_id
)
# Keep track of the platform name, so we don't have to ask for each line
self.platform_name = platform.name
# Explicitly store this new platform in the cache under the new platform name
# It won't be stored by default as the first time we called get_cached_platform
# we passed None as a name, and so it wouldn't have cached
self.platform_cache[self.platform_name] = platform
sensor = self.get_cached_sensor(
data_store=data_store,
sensor_name=None,
sensor_type=None,
platform_id=platform.platform_id,
change_id=change_id,
)
timestamp = self.parse_timestamp(self.date, self.time)
if timestamp:
combine_tokens(self.date_token, self.time_token).record(
self.name, "timestamp", timestamp
)
else:
self.errors.append(
{
self.error_type: f"Line {line_number}. Error in timestamp value {self.date} {self.time}. "
f"Couldn't convert to a datetime"
}
)
state = datafile.create_state(
data_store, platform, sensor, timestamp, self.short_name
)
self.location = Location(
errors=self.errors,
error_type=self.error_type,
)
if not self.location.set_latitude_dms(
degrees=self.latitude[:2],
minutes=self.latitude[2:],
seconds=0,
hemisphere=self.latitude_hem,
):
return
if not self.location.set_longitude_dms(
degrees=self.longitude[:3],
minutes=self.longitude[3:],
seconds=0,
hemisphere=self.longitude_hem,
):
return
state.location = self.location
combine_tokens(self.lat_token, self.lon_token).record(
self.name, "location", state.location, "DMS"
)
heading_valid, heading = convert_absolute_angle(
self.heading, line_number, self.errors, self.error_type
)
if heading_valid:
state.heading = heading
self.heading_token.record(self.name, "heading", heading)
speed_valid, speed = convert_speed(
self.speed,
unit_registry.knots,
line_number,
self.errors,
self.error_type,
)
if speed_valid:
state.speed = speed
self.speed_token.record(self.name, "speed", speed)
if self.depth is not None:
depth_valid, depth = convert_distance(
self.depth, unit_registry.metre, line_number, self.errors, self.error_type
)
if depth_valid:
state.elevation = -1 * depth
self.depth_token.record(self.name, "depth", depth)
self.date = None
self.time = None
self.speed = None
self.heading = None
self.latitude = None
self.latitude_hem = None
self.longitude = None
self.longitude_hem = None
self.location = None
self.depth = None
datafile.flush_extracted_tokens()
[docs] @staticmethod
def parse_timestamp(date, time):
if len(date) == 6:
format_str = "%y%m%d"
else:
format_str = "%Y%m%d"
if len(time) == 6:
format_str += "%H%M%S"
else:
format_str += "%H%M%S.%f"
try:
parsed_timestamp = datetime.strptime(date + time, format_str)
except ValueError:
return False
return parsed_timestamp