From 89cc7bb4b336fb3abf7d8f65a3ddc237e83ca17a Mon Sep 17 00:00:00 2001 From: Andreas Gemsa Date: Sun, 17 Dec 2023 13:24:51 +0100 Subject: [PATCH] smaller improvements --- osm/read_osm.py | 45 +++++++++++++++++++++++++++---------------- osm/sanitize_input.py | 16 +++++++-------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/osm/read_osm.py b/osm/read_osm.py index 102fdec..4ea95ab 100644 --- a/osm/read_osm.py +++ b/osm/read_osm.py @@ -1,12 +1,13 @@ import bz2 import xml.sax +from xml.sax.xmlreader import InputSource from osm.osm_types import OSMNode, OSMWay from osm.way_parser_helper import WayParserHelper from osm.xml_handler import NodeHandler, WayHandler, PercentageFile from utils import timer -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple, Union @timer.timer @@ -26,43 +27,53 @@ def read_file(osm_filename, configuration) -> Tuple[Dict[int, OSMNode], List[OSM def decompress_content(osm_filename): magic_bz2 = "\x42\x5a\x68" - with open(osm_filename, "r", encoding="utf-8", errors="replace") as f: - content_begin = f.read(10) + try: + with open(osm_filename, "r", encoding="utf-8", errors="replace") as f: + content_begin = f.read(10) + except Exception as e: + print(f"Error occurred while opening {osm_filename}: {e}") + return None if content_begin.startswith(magic_bz2): print("identified bz2 compressed file.. decompressing") - f = bz2.open(osm_filename, "rb") - content = f.read() - print("done!") - return content + try: + with bz2.open(osm_filename, "rb") as f: + content = f.read() + print("done!") + return content + except Exception as e: + print(f"Error occurred while decompressing {osm_filename}: {e}") + return None print("no compression recognized!") return None @timer.timer -def _read_ways(osm_file, configuration) -> Tuple[List[OSMWay], Set[int]]: +def _read_ways( + osm_file: Union[PercentageFile, InputSource, str], configuration +) -> Tuple[List[OSMWay], Set[int]]: parser = xml.sax.make_parser() w_handler = WayHandler(configuration) parser.setContentHandler(w_handler) - if isinstance(osm_file, PercentageFile): - parser.parse(osm_file) - else: - xml.sax.parseString(osm_file, w_handler) + if isinstance(osm_file, str): + osm_file = InputSource(osm_file) + parser.parse(osm_file) return w_handler.found_ways, w_handler.found_nodes @timer.timer -def _read_nodes(osm_file, found_nodes) -> Dict[int, OSMNode]: +def _read_nodes( + osm_file: Union[PercentageFile, InputSource, str], found_nodes +) -> Dict[int, OSMNode]: parser = xml.sax.make_parser() n_handler = NodeHandler(found_nodes) parser.setContentHandler(n_handler) - if isinstance(osm_file, PercentageFile): - parser.parse(osm_file) - else: - xml.sax.parseString(osm_file, n_handler) + if isinstance(osm_file, str): + osm_file = InputSource(osm_file) + parser.parse(osm_file) return n_handler.nodes diff --git a/osm/sanitize_input.py b/osm/sanitize_input.py index e88b8cd..c2eab64 100644 --- a/osm/sanitize_input.py +++ b/osm/sanitize_input.py @@ -1,20 +1,20 @@ +from osm.osm_types import OSMNode, OSMWay from utils import timer +from typing import Dict, List, Tuple + @timer.timer -def sanitize_input(ways, nodes): +def sanitize_input(ways: List[OSMWay], nodes: Dict[int, OSMNode]): """ This function removes all - - nodes not used in any of the Ways, and - - ways that contain one or more vertices not in nodes + - nodes not used in any of the OSMWays, and + - ways that contain one or more OSMNodes not in nodes :rtype : list of Ways, list of Vertices - :param ways: list of input Ways - :param nodes: list of input Vertices - :return: Filtered list of Ways and Nodes + :param ways: list of input OSMWays + :param nodes: list of input OSMNodes """ - assert isinstance(ways, list) - assert isinstance(nodes, dict) def remove_adjacent_duplicates(nodes): for i in range(len(nodes) - 1, 0, -1):