Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

utils.py

Blame
  • utils.py 5.15 KiB
    import os
    import json
    import jsonschema
    import logging
    import time
    from typing import Union, List
    from flask import current_app as app
    from app.netedit.network import Network
    from app.netedit.events import VehicleEvents
    
    data_dir = f"{app.dotenv['DATA_DIR']}/{app.dotenv['URBANITE_CITY']}"
    city = app.dotenv["URBANITE_CITY"]
    
    logger = logging.getLogger(__name__)
    
    
    def get_sim_dir_names(sim_ids):
        """
        Get the directory name of the last run simulation for each of the simulations
        """
        # logger.error("SIM DIR NAMES LOOKING IN:\n%s", data_dir)
        retval = []
        if isinstance(sim_ids, int):
            sim_ids = [sim_ids]
        for sim_id in sim_ids:
            direntries = []
            for dir_entry in os.scandir(f"{data_dir}/simulations/{sim_id}/results/"):
                if dir_entry.is_dir():
                    direntries.append(dir_entry.name)
            # return the last simulation performed
            direntries.sort(reverse=False)
            retval.append(direntries[-1])
        # logger.error("SIM DIR NAMES FOUND:\n%s", retval)
        return retval
    
    
    def validate_json(obj, schema_path):
        """
        Validates if an object corresponds to a schema.
        Returns nothing, riases Exception if invalid.
        """
        schema = json.load(open(schema_path, "r", encoding="utf-8"))
        logger.debug(schema)
        jsonschema.validate(obj, schema)
    
    
    def get_last_iteration(folder_path: str) -> int:
        """
        Finds the last iteration number.
        """
        big_number = 0
        for dir_entry in os.scandir(folder_path):  # direntries are often not sorted
            if "it." in dir_entry.name:
                num = int(dir_entry.name.split(".")[-1])
                if num > big_number:
                    big_number = num
        return big_number
    
    
    def get_max_traffic_time(sim_id):
        """
        Returns the time with highest traffic for the simulation.
        """
        curr_max_traffic = 0.0
        curr_max_time = 0
    
        simulation_dir_name = get_sim_dir_names(sim_id)[0]
        folder_path = (
            f"{data_dir}/simulations/{sim_id}/results/{simulation_dir_name}/ITERS/"
        )
    
        last_iteration = get_last_iteration(folder_path)
        with open(
            f"{folder_path}/it.{last_iteration}/{last_iteration}.legHistogram.txt",
            "r",
            encoding="utf-8",
        ) as f:
            # initial line has to be read because it is just description
            print(f.readline().split(" ")[0].split(":")[0])
    
            full_line = f.readline().split()
            time = int(full_line[0].split(":")[0])
    
            while time < 24:
                if int(full_line[5]) > curr_max_traffic:
                    curr_max_traffic = int(full_line[5])
                    curr_max_time = time
                full_line = f.readline().split()
                time = int(full_line[0].split(":")[0])
    
        return curr_max_time
    
    
    def getEventsFromKeywords(inputPath, outputPath, keywords: list):
        """
        Creates the file with the events that contain words inside 'keywords' list
        """
        input = open(inputPath, "r")
        output = open(outputPath, "a")
        output.write('<?xml version="1.0" encoding="utf-8"?>')
        output.write('<events version="1.0">')
    
        for index, line in enumerate(input):
            for keyword in keywords:
                if keyword in line:
                    output.write(line)
    
        output.write("</events>")
    
        input.close()
        output.close()
    
    
    def get_network_and_events(simulation_id):
        """
        Returns the network and the events objects.
        """
        start = time.time()
    
        date = get_sim_dir_names([simulation_id])[0]
        network_path = f"{data_dir}/simulations/{simulation_id}/network.xml"
        events_path = (
            f"{data_dir}/simulations/{simulation_id}/results/{date}/output_events.xml"
        )
        if not os.path.exists(network_path):
            raise ValueError("Path does not exist", network_path)
        if not os.path.exists(events_path):
            raise ValueError("Path does not exist", events_path)
    
        network = Network(city=city, path=network_path)
    
    
    
    
        events = VehicleEvents(path=events_path, network=network)
    
        app.logger.warn(
            "loaded vehicle events in %s s", time.time() - start
        )
        return network, events
    
    
    def deep_merge_dicts(orig_dict, new_dict):
        """
        Recursively merges two dicts. Common keys are stored into the list.
    
        Example:
            dic1 = {"a": "a1", "b": {"c": "c1", "d": {"e": "e1"}}}
            dic2 = {"a": "a2", "b": {"c": "c2", "d": {"e": "e2"}}}
            dic3 = {"a": "a3", "b": {"c": "c3", "d": {"e": "e3"}}}
    
            deep_merge_dicts(dic1, dic2)
            deep_merge_dicts(dic1, dic3)
    
        Output:
            {
             'a': ['a1', 'a2', 'a3'],
             'b':
                {
                    'c': ['c1', 'c2', 'c3'],
                    'd': {'e': ['e1', 'e2', 'e3']}
                }
            }
        """
        for key, val in new_dict.items():
            if isinstance(val, dict):
                tmp = deep_merge_dicts(orig_dict.get(key, {}), val)
                orig_dict[key] = tmp
            elif isinstance(val, list):
                orig_dict[key] = orig_dict.get(key, []) + val
            else:
                if key not in orig_dict:
                    orig_dict[key] = new_dict[key]
                elif isinstance(orig_dict[key], list):
                    orig_dict[key].append(new_dict[key])
                else:
                    orig_dict[key] = [orig_dict[key], new_dict[key]]
    
        return orig_dict