import os import json import jsonschema import logging import time from typing import Union, List from flask import current_app as app from app.netedit.network import Network from app.netedit.events import VehicleEvents data_dir = f"{app.dotenv['DATA_DIR']}/{app.dotenv['URBANITE_CITY']}" city = app.dotenv["URBANITE_CITY"] logger = logging.getLogger(__name__) def get_sim_dir_names(sim_ids): """ Get the directory name of the last run simulation for each of the simulations """ # logger.error("SIM DIR NAMES LOOKING IN:\n%s", data_dir) retval = [] if isinstance(sim_ids, int): sim_ids = [sim_ids] for sim_id in sim_ids: direntries = [] for dir_entry in os.scandir(f"{data_dir}/simulations/{sim_id}/results/"): if dir_entry.is_dir(): direntries.append(dir_entry.name) # return the last simulation performed direntries.sort(reverse=False) retval.append(direntries[-1]) # logger.error("SIM DIR NAMES FOUND:\n%s", retval) return retval def validate_json(obj, schema_path): """ Validates if an object corresponds to a schema. Returns nothing, riases Exception if invalid. """ schema = json.load(open(schema_path, "r", encoding="utf-8")) logger.debug(schema) jsonschema.validate(obj, schema) def get_last_iteration(folder_path: str) -> int: """ Finds the last iteration number. """ big_number = 0 for dir_entry in os.scandir(folder_path): # direntries are often not sorted if "it." in dir_entry.name: num = int(dir_entry.name.split(".")[-1]) if num > big_number: big_number = num return big_number def get_max_traffic_time(sim_id): """ Returns the time with highest traffic for the simulation. """ curr_max_traffic = 0.0 curr_max_time = 0 simulation_dir_name = get_sim_dir_names(sim_id)[0] folder_path = ( f"{data_dir}/simulations/{sim_id}/results/{simulation_dir_name}/ITERS/" ) last_iteration = get_last_iteration(folder_path) with open( f"{folder_path}/it.{last_iteration}/{last_iteration}.legHistogram.txt", "r", encoding="utf-8", ) as f: # initial line has to be read because it is just description print(f.readline().split(" ")[0].split(":")[0]) full_line = f.readline().split() time = int(full_line[0].split(":")[0]) while time < 24: if int(full_line[5]) > curr_max_traffic: curr_max_traffic = int(full_line[5]) curr_max_time = time full_line = f.readline().split() time = int(full_line[0].split(":")[0]) return curr_max_time def getEventsFromKeywords(inputPath, outputPath, keywords: list): """ Creates the file with the events that contain words inside 'keywords' list """ input = open(inputPath, "r") output = open(outputPath, "a") output.write('<?xml version="1.0" encoding="utf-8"?>') output.write('<events version="1.0">') for index, line in enumerate(input): for keyword in keywords: if keyword in line: output.write(line) output.write("</events>") input.close() output.close() def get_network_and_events(simulation_id): """ Returns the network and the events objects. """ start = time.time() date = get_sim_dir_names([simulation_id])[0] network_path = f"{data_dir}/simulations/{simulation_id}/network.xml" events_path = ( f"{data_dir}/simulations/{simulation_id}/results/{date}/output_events.xml" ) if not os.path.exists(network_path): raise ValueError("Path does not exist", network_path) if not os.path.exists(events_path): raise ValueError("Path does not exist", events_path) network = Network(city=city, path=network_path) events = VehicleEvents(path=events_path, network=network) app.logger.warn( "loaded vehicle events in %s s", time.time() - start ) return network, events def deep_merge_dicts(orig_dict, new_dict): """ Recursively merges two dicts. Common keys are stored into the list. Example: dic1 = {"a": "a1", "b": {"c": "c1", "d": {"e": "e1"}}} dic2 = {"a": "a2", "b": {"c": "c2", "d": {"e": "e2"}}} dic3 = {"a": "a3", "b": {"c": "c3", "d": {"e": "e3"}}} deep_merge_dicts(dic1, dic2) deep_merge_dicts(dic1, dic3) Output: { 'a': ['a1', 'a2', 'a3'], 'b': { 'c': ['c1', 'c2', 'c3'], 'd': {'e': ['e1', 'e2', 'e3']} } } """ for key, val in new_dict.items(): if isinstance(val, dict): tmp = deep_merge_dicts(orig_dict.get(key, {}), val) orig_dict[key] = tmp elif isinstance(val, list): orig_dict[key] = orig_dict.get(key, []) + val else: if key not in orig_dict: orig_dict[key] = new_dict[key] elif isinstance(orig_dict[key], list): orig_dict[key].append(new_dict[key]) else: orig_dict[key] = [orig_dict[key], new_dict[key]] return orig_dict