Select Git revision
pre-deploy-cleanup.bash
-
Prada Sarasola, Miguel authoredPrada Sarasola, Miguel authored
utils.py 5.15 KiB
import os
import json
import jsonschema
import logging
import time
from typing import Union, List
from flask import current_app as app
from app.netedit.network import Network
from app.netedit.events import VehicleEvents
data_dir = f"{app.dotenv['DATA_DIR']}/{app.dotenv['URBANITE_CITY']}"
city = app.dotenv["URBANITE_CITY"]
logger = logging.getLogger(__name__)
def get_sim_dir_names(sim_ids):
"""
Get the directory name of the last run simulation for each of the simulations
"""
# logger.error("SIM DIR NAMES LOOKING IN:\n%s", data_dir)
retval = []
if isinstance(sim_ids, int):
sim_ids = [sim_ids]
for sim_id in sim_ids:
direntries = []
for dir_entry in os.scandir(f"{data_dir}/simulations/{sim_id}/results/"):
if dir_entry.is_dir():
direntries.append(dir_entry.name)
# return the last simulation performed
direntries.sort(reverse=False)
retval.append(direntries[-1])
# logger.error("SIM DIR NAMES FOUND:\n%s", retval)
return retval
def validate_json(obj, schema_path):
"""
Validates if an object corresponds to a schema.
Returns nothing, riases Exception if invalid.
"""
schema = json.load(open(schema_path, "r", encoding="utf-8"))
logger.debug(schema)
jsonschema.validate(obj, schema)
def get_last_iteration(folder_path: str) -> int:
"""
Finds the last iteration number.
"""
big_number = 0
for dir_entry in os.scandir(folder_path): # direntries are often not sorted
if "it." in dir_entry.name:
num = int(dir_entry.name.split(".")[-1])
if num > big_number:
big_number = num
return big_number
def get_max_traffic_time(sim_id):
"""
Returns the time with highest traffic for the simulation.
"""
curr_max_traffic = 0.0
curr_max_time = 0
simulation_dir_name = get_sim_dir_names(sim_id)[0]
folder_path = (
f"{data_dir}/simulations/{sim_id}/results/{simulation_dir_name}/ITERS/"
)
last_iteration = get_last_iteration(folder_path)
with open(
f"{folder_path}/it.{last_iteration}/{last_iteration}.legHistogram.txt",
"r",
encoding="utf-8",
) as f:
# initial line has to be read because it is just description
print(f.readline().split(" ")[0].split(":")[0])
full_line = f.readline().split()
time = int(full_line[0].split(":")[0])
while time < 24:
if int(full_line[5]) > curr_max_traffic:
curr_max_traffic = int(full_line[5])
curr_max_time = time
full_line = f.readline().split()
time = int(full_line[0].split(":")[0])
return curr_max_time
def getEventsFromKeywords(inputPath, outputPath, keywords: list):
"""
Creates the file with the events that contain words inside 'keywords' list
"""
input = open(inputPath, "r")
output = open(outputPath, "a")
output.write('<?xml version="1.0" encoding="utf-8"?>')
output.write('<events version="1.0">')
for index, line in enumerate(input):
for keyword in keywords:
if keyword in line:
output.write(line)
output.write("</events>")
input.close()
output.close()
def get_network_and_events(simulation_id):
"""
Returns the network and the events objects.
"""
start = time.time()
date = get_sim_dir_names([simulation_id])[0]
network_path = f"{data_dir}/simulations/{simulation_id}/network.xml"
events_path = (
f"{data_dir}/simulations/{simulation_id}/results/{date}/output_events.xml"
)
if not os.path.exists(network_path):
raise ValueError("Path does not exist", network_path)
if not os.path.exists(events_path):
raise ValueError("Path does not exist", events_path)
network = Network(city=city, path=network_path)
events = VehicleEvents(path=events_path, network=network)
app.logger.warn(
"loaded vehicle events in %s s", time.time() - start
)
return network, events
def deep_merge_dicts(orig_dict, new_dict):
"""
Recursively merges two dicts. Common keys are stored into the list.
Example:
dic1 = {"a": "a1", "b": {"c": "c1", "d": {"e": "e1"}}}
dic2 = {"a": "a2", "b": {"c": "c2", "d": {"e": "e2"}}}
dic3 = {"a": "a3", "b": {"c": "c3", "d": {"e": "e3"}}}
deep_merge_dicts(dic1, dic2)
deep_merge_dicts(dic1, dic3)
Output:
{
'a': ['a1', 'a2', 'a3'],
'b':
{
'c': ['c1', 'c2', 'c3'],
'd': {'e': ['e1', 'e2', 'e3']}
}
}
"""
for key, val in new_dict.items():
if isinstance(val, dict):
tmp = deep_merge_dicts(orig_dict.get(key, {}), val)
orig_dict[key] = tmp
elif isinstance(val, list):
orig_dict[key] = orig_dict.get(key, []) + val
else:
if key not in orig_dict:
orig_dict[key] = new_dict[key]
elif isinstance(orig_dict[key], list):
orig_dict[key].append(new_dict[key])
else:
orig_dict[key] = [orig_dict[key], new_dict[key]]
return orig_dict