""" Implements the Events object and related methods. Since the creation of the Events object is rather slow due to building search trees etc, do no recreate the object too often. """ from __future__ import annotations from collections import namedtuple import math import json from time import time from typing import Union, List, Dict from xml.etree import ElementTree as ET import logging from flask import current_app as app import pandas as pd import numpy as np TimeSlot = namedtuple("TimeSlot", "delta slot_index") ts_url = app.dotenv["TS_URL"] city_id = f"{app.dotenv['URBANITE_CITY']}" data_dir = f"{app.dotenv['DATA_DIR']}/{city_id}" dexi_dir = app.dotenv["DEXI_DIR"] assets_dir = app.dotenv["ASSETS_DIR"] logger = logging.getLogger(__name__) class Events: """ Base class representing events. Subclasses VehicleEvents and EmissionEvents should be used instead of this one. """ def __init__(self, path, network): """ Constructs the events by reading from the path. A network object should be provided that matches the network, used to produce the simulation results - events. """ self._element_tree: Union[ET, None] = None self.events: List[dict] = [] self.network = network self.read_events(path) def read_events(self, path): """ Reads the events and stores to list. """ self._element_tree = ET.parse(path) events: list = [] for event in self._element_tree.getroot(): events.append(event.attrib) self.events = list(sorted(events, key=lambda event: float(event["time"]))) class TimeSlotGenerator: """ Class implements iterator/generator that return events in custom sized windows. """ def __init__(self, events_object, time_slot: int): """ :param events_object: initialized Events object. :param time_slot: length of time slot in seconds. """ self._events = events_object self.current_index = 0 self.time_slot = time_slot self._last_min_index = 0 def __len__(self): t_last = float(self._events.events[-1]["time"]) return math.ceil((t_last - 0) / self.time_slot) def __iter__(self): return self def __next__(self) -> List[dict]: return self.send(None) def send(self, value: Union[int, None]) -> List[dict]: """ Get custom window from the iterator, for random access instead of sequential access. """ if value is not None: self._last_min_index = 0 self.current_index = value if self.current_index > len(self): raise StopIteration min_time = self.current_index * self.time_slot max_time = (self.current_index + 1) * self.time_slot time_slot = [] for i in range(self._last_min_index, len(self._events.events)): event = self._events.events[i] if float(event["time"]) > max_time: # it is done self.current_index += 1 return time_slot elif float(event["time"]) < min_time: continue else: time_slot.append(event) self.current_index += 1 return [] # no override needed def throw(self, typ, val=None, tb=None): """ Raise an exception in the generator. Return next yielded value or raise StopIteration. """ if val is None: if tb is None: raise typ val = typ() if tb is not None: val = val.with_traceback(tb) raise val def get_time_slot_iterator(self, time_slot: int) -> TimeSlotGenerator: """ Returns the TimeSlotGenerator that allows access to events in custom sized time slots. Both sequential access (iterator) and random access (via send()) are supported. """ return Events.TimeSlotGenerator(self, time_slot) def timeslot_filter_events(self, timeslot): """ Filters the events based on the given `timeslot` object. If timeslot=None returns `self.events`. """ if timeslot: return self.get_time_slot_iterator(timeslot.delta).send(timeslot.slot_index) else: return self.events class VehicleEvents(Events): """ VehicleEvents class represents the events produced by the traffic simulation. """ def __init__(self, path, network=None): Events.__init__(self, path, network) self.trips_number = self.trips_amount() # TODO this doesn't work for Amsterdam - is always 10 or 0 def bike_safety_index(self, local_links=None, timeslot=None) -> Dict: """ Calculates the bike safety KPI, based on numbers of cyclists sharing roads with motorized vehicles. """ filter_expr = lambda event: event["type"] == "left link" if local_links is not None: filter_expr = ( lambda event: event["type"] == "left link" and event["link"] in local_links ) links = {} filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_expr, filtered_events): link = self.network.get_link(event["link"]) capacity = float(link["capacity"]) if not event["link"] in links: links[event["link"]] = { "car_count": 0, "bike_count": 0, "bus_count": 0, "bikeability_safety": 0, "capacity": capacity, } ev_vehicle = event["vehicle"] if ev_vehicle[1] == "bicycle": links[event["link"]]["bike_count"] += 1 elif ev_vehicle[1] == "bus": links[event["link"]]["bus_count"] += 1 else: links[event["link"]]["car_count"] += 1 for link_id, link_data in links.items(): if links.get(link_id, False) and link_data["bike_count"] > 0: links[link_id]["bikeability_safety"] = 10 * ( 1 - (link_data["car_count"] / link_data["capacity"]) ) # links[link_id].pop("bike_count", None) # links[link_id].pop("bus_count", None) # links[link_id].pop("car_count", None) # links[link_id].pop("capacity", None) return links # TODO fix after fixing bike_safety_index above def bike_safety_aggregate(self, safety_score: List[Dict]): safety_avg = 0 n_scores = 0 for link_id, counts in safety_score.items(): link = self.network.get_link(link_id) capacity = float(link["capacity"]) safety_score = 10 * (1 - (counts["car_count"] / capacity)) safety_avg += safety_score n_scores += 1 try: return safety_avg / n_scores except ZeroDivisionError: return 0 def bikeability_index(self, local_links=None) -> Dict: """ Returns infra_score, speed_score """ filter_expr = lambda link: True if local_links is not None: filter_expr = lambda link: link["id"] in local_links score = {} n_links_with_osm_way = 0 for link in filter(filter_expr, self.network.links): if "osm:way:highway" in link: n_links_with_osm_way += 1 # logger.info("link iwth osm:way:h %s", link["osm:way:highway"]) if link["osm:way:highway"] == "steps": score[link["id"]] = {"infra": 0, "speed": 0} elif link["osm:way:highway"] == "motorway": score[link["id"]] = {"infra": 0, "speed": 0} elif link["osm:way:highway"] == "motorway_link": score[link["id"]] = {"infra": 0, "speed": 0} elif link["osm:way:highway"] == "primary": score[link["id"]] = {"infra": 1, "speed": 0} elif link["osm:way:highway"] == "secondary": score[link["id"]] = {"infra": 2, "speed": 0} elif link["osm:way:highway"] == "secondary_link": score[link["id"]] = {"infra": 2, "speed": 0} elif link["osm:way:highway"] == "tertiary": score[link["id"]] = {"infra": 3, "speed": 0} elif link["osm:way:highway"] == "residential": score[link["id"]] = {"infra": 4, "speed": 0} elif link["osm:way:highway"] == "service": score[link["id"]] = {"infra": 4, "speed": 0} elif link["osm:way:highway"] == "track": score[link["id"]] = {"infra": 5, "speed": 0} elif link["osm:way:highway"] == "unclassified": score[link["id"]] = {"infra": 5, "speed": 0} elif link["osm:way:highway"] == "footway": score[link["id"]] = {"infra": 5, "speed": 0} elif link["osm:way:highway"] == "pedestrian": score[link["id"]] = {"infra": 5, "speed": 0} elif link["osm:way:highway"] == "living_street": score[link["id"]] = {"infra": 7, "speed": 0} elif link["osm:way:highway"] == "path": score[link["id"]] = {"infra": 7, "speed": 0} elif link["osm:way:highway"] == "cycleway": score[link["id"]] = {"infra": 10, "speed": 0} # speed limit 30 km/h - 8.7 m/s - 10 points if float(link["freespeed"]) < 9: if not score.get(link["id"], False): score[link["id"]] = {"infra": 0, "speed": 10} else: score[link["id"]]["speed"] = 10 # speed limit 50 km/h - 13.88 m/s - 7 points elif float(link["freespeed"]) < 14: if not score.get(link["id"], False): score[link["id"]] = {"infra": 0, "speed": 7} else: score[link["id"]]["speed"] = 7 # higher speeds - 0 points else: if not score.get(link["id"], False): score[link["id"]] = {"infra": 0, "speed": 0} else: score[link["id"]]["speed"] = 0 # logger.info("bikeability index found %s links with osm:way: property", n_links_with_osm_way) return score @staticmethod def bikeability_aggregate(scores: Dict): infra_avg = 0 speed_avg = 0 for link_id, link_scores in scores.items(): infra_avg += link_scores["infra"] speed_avg += link_scores["speed"] try: infra_avg /= len(scores) speed_avg /= len(scores) except ZeroDivisionError: logger.warn("No infra or speed scores - missing data in network?") return (len(scores) / (len(scores) + len(scores))) * infra_avg + ( len(scores) / (len(scores) + len(scores)) ) * speed_avg def vehicles_count_per_link(self, local_links=None, only=None, timeslot=None) -> Dict: """ Calculates the daily vehicle counts for each link. """ if local_links is None: def filter_exp(event): return event["type"] == "entered link" else: def filter_exp(event): return event["type"] == "entered link" and event["link"] in local_links check_only = lambda mode: only is None or mode == only links: Dict[str, Dict] = {} filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): if not event["link"] in links: links[event["link"]] = { "bus_count": 0, "car_count": 0, "bike_count": 0, } ev_vehicle = event["vehicle"] if "bicycle" in ev_vehicle: links[event["link"]]["bike_count"] += 1 elif "bus" in ev_vehicle and not check_only == "bike": links[event["link"]]["bus_count"] += 1 elif not check_only == "bike": links[event["link"]]["car_count"] += 1 return links def vehicles_count(self, local_links: List[str] = None, timeslot=None) -> Dict: """ Calculates daily vehicle counts in the network. """ if local_links is None: def filter_exp(event): return event["type"] == "entered link" else: def filter_exp(event): return event["type"] == "entered link" and event["link"] in local_links bicycles = set() cars = set() busses = set() filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): ev_vehicle = event["vehicle"] if "bicycle" in ev_vehicle: bicycles.add(event["vehicle"]) elif "bus" in ev_vehicle: busses.add(event["vehicle"]) else: cars.add(event["vehicle"]) logger.error(f"BUSSES: {len(busses)}") logger.error(f"BIKES: {len(bicycles)}") logger.error(f"CARS: {len(cars)}") results = {} if len(busses) > 0: results["bus_count"] = len(busses) if len(cars) > 0: results["car_count"] = len(cars) if len(bicycles) > 0: results["bike_count"] = len(bicycles) return results def capacity_to_moyua(self, moyua_square_links, timeslot=None): links = {} free_capacity = {} filtered_events = self.timeslot_filter_events(timeslot) events = pd.DataFrame(filtered_events, columns=["type", "link", "vehicle"]) combined_moyua_square_links = "|".join(moyua_square_links) events_filtered = events[ (events["type"] == "entered link") & (events["link"].str.contains(combined_moyua_square_links)) ] for _, event in events_filtered.iterrows(): link = self.network.get_link(event["link"]) if event["link"] in moyua_square_links: if not event["link"] in links: links[event["link"]] = { "bus_count": 0, "car_count": 0, "bike_count": 0, "capacity": link["capacity"], } if event["link"].isnumeric(): ev_vehicle = event["vehicle"].split("_") if len(ev_vehicle) > 1: if ev_vehicle[1] == "bicycle": links[event["link"]]["bike_count"] += 1 elif ev_vehicle[1] == "bus": links[event["link"]]["bus_count"] += 1 elif ( event["vehicle"].isnumeric() or event["vehicle"].find(".1") >= 0 ): links[event["link"]]["car_count"] += 1 # links is a dictionary of link ids and {many data within curly braces as in a json format} for link, data in links.items(): free_capacity[link] = float(data["capacity"]) - ( float(data["bus_count"]) + float(data["car_count"]) + float(data["bike_count"]) ) total_capacity = 0 for link in free_capacity: total_capacity += free_capacity[link] return total_capacity def average_bus_speed(self, local_links=None, timeslot=None) -> float: vehicles = {} if local_links is None: def filter_exp(event): return ( event["type"] == "entered link" or event["type"] == "VehicleArrivesAtFacility" ) else: def filter_exp(event): if event["type"] == "entered link": return event["link"] in local_links if event["type"] == "VehicleArrivesAtFacility": id_start_position = event["facility"].find(":") if id_start_position != -1: return event["facility"][id_start_position + 1 :] in local_links return False filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): veh_id = event["vehicle"] # only busses if veh_id.find("_bus") >= 0: if event["type"] == "VehicleArrivesAtFacility": # arrives event link_time = {"link": False, "time": event["time"]} else: # link enter event link = self.network.get_link(event["link"]) link_time = {"link": link, "time": event["time"]} if not event["vehicle"] in vehicles: vehicles[veh_id] = [] vehicles[veh_id].append(link_time) total_length = 0 total_time = 0 for veh_id, link_times in vehicles.items(): length = 0 for link_time in link_times: if link_time["link"]: length += self.network.get_link_length( link_time["link"]["from"], link_time["link"]["to"] ) time = float(link_times[-1]["time"]) - float(link_times[0]["time"]) if length != 0 and length != 1.0 and time > 0: total_length += float(length) total_time += time if total_time > 0: average_speed = total_length / total_time else: average_speed = 0 return average_speed def average_bus_speed_geojson(self): data_by_link = {} filter_exp = lambda event: ( (event["type"] == "entered link" or event["type"] == "left link") and "bus" in event["type"] ) open_links = {} open_id = lambda link_id, veh_id: link_id + "-" + veh_id for event in filter(filter_exp, self.events): link_id = event["link"] vehicle_id = event["vehicle"] if event["type"] == "entered link": open_links[open_id(link_id, vehicle_id)] = event["time"] data_by_link[link_id] = [] elif event["type"] == "left link": link_time = event["time"] - open_links[open_id(link_id, vehicle_id)] link_length = self.network.get_link_length_link(link_id) if link_time == 0: continue data_by_link[link_id].append(link_length / link_time) for link, data in data_by_link.copy().items(): data_by_link[link] = sum(data) / len(data) return data_by_link def pedestrian_travel_time(self, local_links: List[str] = None, timeslot=None): if local_links is None: def filter_exp(_event): return ( (_event["type"] == "departure" or _event["type"] == "arrival") and _event["legMode"] == "walk" ) else: def filter_exp(_event): return ( (_event["type"] == "departure" or _event["type"] == "arrival") and _event["legMode"] == "walk" and _event["link"] in local_links) filtered_events = self.timeslot_filter_events(timeslot) trips_per_person = {} for event in filter(filter_exp, filtered_events): if event["type"] == "departure": if not event["person"] in trips_per_person: trips_per_person[event["person"]] = [] trips_per_person[event["person"]].append( { "start_link_id": event["link"], "begin_time": float(event["time"]), "end_time": 0, "end_link_id": -1 } ) elif event["type"] == "arrival": if ( event["person"] in trips_per_person.keys() and trips_per_person.get(event["person"])[-1]["end_time"] == 0 ): trips_per_person[event["person"]][-1]["end_time"] = float(event["time"]) trips_per_person[event["person"]][-1]["end_link_id"] = event["link"] trips = [] for _, p_trips in trips_per_person.items(): for trip in p_trips: t_len = float(trip["end_time"]) - float(trip["begin_time"]) # if t_len < 3600: continue # TODO: Ignore trips under one hour trip_data = { "trip_duration": t_len, "start_link": trip["start_link_id"], "end_link": trip["end_link_id"] } trips.append(trip_data) trips_start_end = {} for trip in trips: if trip["start_link"] not in trips_start_end.keys(): trips_start_end[trip["start_link"]] = {} if trip["end_link"] not in trips_start_end[trip["start_link"]].keys(): trips_start_end[trip["start_link"]][trip["end_link"]] = [] trips_start_end[trip["start_link"]][trip["end_link"]].append(trip["trip_duration"]) links = set() for inner_dict in trips_start_end.values(): for inner_key in inner_dict.keys(): links.add(inner_key) links.update(trips_start_end.keys()) links = list(links) matrix = pd.DataFrame(index=links, columns=links) matrix.fillna(0, inplace=True) m_counts = pd.DataFrame(index=links, columns=links) m_counts.fillna(1, inplace=True) for s_link in trips_start_end.keys(): for e_link in trips_start_end[s_link].keys(): trip_duration_sum = sum(trips_start_end[s_link][e_link]) if trip_duration_sum < 3600: continue trip_duration_count = len(trips_start_end[s_link][e_link]) if trip_duration_count < 5: continue trip_duration_sum = sum(trips_start_end[s_link][e_link]) matrix.loc[s_link, e_link] = matrix.loc[s_link, e_link] + trip_duration_sum m_counts.loc[s_link, e_link] = m_counts.loc[s_link, e_link] + trip_duration_count upper_triangle = np.triu(matrix.values, k=1) lower_triangle = np.tril(matrix.values) sum_matrix = lower_triangle + upper_triangle.T matrix_unidirect = pd.DataFrame(sum_matrix, index=matrix.index, columns=matrix.columns) matrix_unidirect.fillna(0, inplace=True) upper_triangle = np.triu(m_counts.values, k=1) lower_triangle = np.tril(m_counts.values) sum_matrix = lower_triangle + upper_triangle.T m_counts_unidirect = pd.DataFrame(sum_matrix, index=m_counts.index, columns=m_counts.columns) m_counts_unidirect.fillna(1, inplace=True) result = matrix_unidirect.div(m_counts_unidirect) result = pd.DataFrame(result).fillna(0) final_result = [] for col_key in result.columns: for index_key in result.index: dat = { "start_link": col_key, "end_link": index_key, "average_time": result.loc[col_key, index_key] } final_result.append(dat) average = matrix.sum().sum() / m_counts.sum().sum() if pd.isna(average): average = 0 return {"average_trip_duration": average, "detailed_long_trips": final_result} def users_per_mode(self, local_links=None, timeslot=None): agents = {} count = {"car": 0, "bike": 0, "PT": 0, "pedestrian": 0} if local_links is None: def filter_exp(event): return event["type"] == "departure" else: def filter_exp(event): return event["type"] == "departure" and event["link"] in local_links filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): agent = event["person"] if not agent in agents: agents[agent] = {"car": 0, "bike": 0, "PT": 0, "pedestrian": 0} if agent.find("_bus") >= 0: continue elif event["legMode"] == "pt": if agents[agent]["PT"] == 0: agents[agent]["PT"] = 1 count["PT"] += 1 elif event["legMode"] == "walk": if agents[agent]["pedestrian"] == 0: agents[agent]["pedestrian"] = 1 count["pedestrian"] += 1 elif event["legMode"] == "car": if agents[agent]["car"] == 0: agents[agent]["car"] = 1 count["car"] += 1 elif event["legMode"] == "bicycle": if agents[agent]["bike"] == 0: agents[agent]["bike"] = 1 count["bike"] += 1 return count def get_link_average_speed(self, link, hour_searched): timeslots = self.get_time_slot_iterator(time_slot=3600) link = self.network.get_link(link) length = self.network.get_link_length(link["from"], link["to"]) enter_left_vehicles: Dict = {} hours = timeslots.send(hour_searched) events = pd.DataFrame(hours, columns=["type", "link", "vehicle", "time"]) events_filtered = events[ (events["link"] == link["id"]) & ((events["type"] == "entered link") | (events["type"] == "left link")) ] for _, event in events_filtered.iterrows(): veh_id = str(event["vehicle"]) if veh_id not in enter_left_vehicles: enter_left_vehicles[veh_id] = { "enter": 0, "left": 0, } if event["type"] == "entered link": enter_left_vehicles[veh_id]["enter"] = event["time"] elif event["type"] == "left link": enter_left_vehicles[veh_id]["left"] = event["time"] total_time = 0 vehicle_count = 0 for veh_id, times in enter_left_vehicles.items(): if float(times["left"]) != 0: vehicle_count += 1 total_time += float(times["left"]) - float(times["enter"]) # if more than 50% of vehicles dont come through the link in one hour, # than the link is congested, and we return -1 if vehicle_count < len(enter_left_vehicles) * 0.5: return -1 return length / (total_time / vehicle_count) def get_congested_links(self, rush_hour, vehicle_mode=None, local_links=None): if local_links is None: if vehicle_mode is None: filter_expr = lambda evt: evt["type"] in ["entered link", "left link"] else: filter_expr = lambda evt: (evt["type"] in ["entered link", "left link"] and vehicle_mode in evt["vehicle"]) else: if vehicle_mode is None: filter_expr = lambda evt: (evt["type"] in ["entered link", "left link"] and evt["link"] in local_links) else: filter_expr = lambda evt: (evt["type"] in ["entered link", "left link"] and evt["link"] in local_links and vehicle_mode in evt["vehicle"]) events = filter(filter_expr, self.get_time_slot_iterator(3600).send(rush_hour)) # find travel times for link - collect entered and left times link_event_times = {} for event in events: link_id = event["link"] if "pt" in link_id: continue vehicle_id = event["vehicle"] time = event["time"] entered = event["type"] == "entered link" if link_id in link_event_times: if vehicle_id in link_event_times[link_id]: pass else: link_event_times[link_id][vehicle_id] = {} else: link_event_times[link_id] = {vehicle_id: {}} link_event_times[link_id][vehicle_id]["entered" if entered else "left"] = time # calculate times on each link link_travel_times = {} # {link_id: [dT, dT, ...], } for link_id in link_event_times.keys(): if "pt" in link_id: continue for veh_id in link_event_times[link_id]: if link_event_times[link_id][veh_id].get("entered", None) is None \ or link_event_times[link_id][veh_id].get("left", None) is None: continue if link_id in link_travel_times: pass else: link_travel_times[link_id] = [] link_travel_times[link_id].append( float(link_event_times[link_id][veh_id]["left"]) - float(link_event_times[link_id][veh_id]["entered"])) # calculate ideal link times using speed and length attributes ideal_travel_times = {} for link in self.network.links: if "pt" in link["id"]: continue ideal_time = float(link["length"]) / float(link["freespeed"]) ideal_travel_times[link["id"]] = ideal_time # compare average times with ideal and report congestion congestion_length_sum = 0 congested_links = {} for link_id in link_travel_times: if ideal_travel_times[link_id] < 2.0: # causes errors due to time being measured with second resolution continue avg_time = sum(link_travel_times[link_id])/len(link_travel_times[link_id]) if avg_time > (ideal_travel_times[link_id] * 1.1): # allow some slowness if link_id not in congested_links: congestion_length_sum += self.network.get_link_length_link(link_id) congested_links[link_id] = {"average time": avg_time} # logger.debug("congested length %s", congestion_length_sum) return {"congestion_length": congestion_length_sum, "congested_links": congested_links} def public_transport_use_geojson(self, local_links=None): filter_expression = lambda event: event["type"] in [ "PersonEntersVehicle", "PersonLeavesVehicle", "left link", ] if local_links is not None: filter_expression = lambda event: event["link"] in local_links and event[ "type" ] in ["PersonEntersVehicle", "entered link"] evt_tmp = { "link": None, "vehicle": None, "d_occ": 0, } # d_occ -> delta occupancy data = [] for event in filter(filter_expression, self.events): evt = evt_tmp.copy() if event["type"] == "left link" and "bus" in event["vehicle"]: veh = event["vehicle"] lnk = event["link"] evt["vehicle"] = veh evt["link"] = lnk elif ( event["type"] in ["PersonEntersVehicle", "PersonLeavesVehicle"] and "bus" in event["vehicle"] ): # if event["person"].startswith("pt"): # continue # driver, ignore veh = event["vehicle"] evt["vehicle"] = veh evt["d_occ"] = 1 if event["type"] == "PersonEntersVehicle" else -1 else: continue data.append(evt) # group by vehicle data_by_veh = {} for item in data: if item["vehicle"] not in data_by_veh: data_by_veh[item["vehicle"]] = [item] else: data_by_veh[item["vehicle"]].append(item) # aggregate paths path_by_veh = {} for veh_id, data in data_by_veh.items(): path = [] old_item = None for i, item in enumerate(data): if i == 0: old_item = {"link": data[0]["link"], "occ": 0} if old_item["link"] != item["link"]: path.append(old_item) # make new item old_item = {"link": item["link"], "occ": old_item["occ"]} else: old_item["occ"] = old_item["occ"] + item["d_occ"] path_by_veh[veh_id] = path # aggregate by link data_by_link = {} for veh_id, path in path_by_veh.items(): for item in path: if item["link"] not in data_by_link: if item["link"] is None or item["link"].startswith("pt"): continue data_by_link[item["link"]] = { "pt_users": item["occ"], # "pt_vehicles": [veh_id], } else: data_by_link[item["link"]]["pt_users"] += item["occ"] # if veh_id not in data_by_link[item["link"]]["pt_vehicles"]: # data_by_link[item["link"]]["pt_vehicles"].append(veh_id) return data_by_link # aggregated values only def public_transport_use(self, local_links=None, timeslot=None): if local_links is None: def filter_exp(event): return event["type"] == "departure" and event["legMode"] == "pt" else: def filter_exp(event): return ( event["type"] == "departure" and event["legMode"] == "pt" and event["link"] in local_links ) pt_counter = 0 filtered_events = self.timeslot_filter_events(timeslot) for _ in filter(filter_exp, filtered_events): pt_counter += 1 return pt_counter def bicycle_use(self, local_links=None, timeslot=None): if local_links is None: def filter_exp(event): return event["type"] == "departure" and event["legMode"] == "bicycle" else: def filter_exp(event): return ( event["type"] == "departure" and event["legMode"] == "bicycle" and event["link"] in local_links ) bicycle_counter = 0 filtered_events = self.timeslot_filter_events(timeslot) for _ in filter(filter_exp, filtered_events): bicycle_counter += 1 return bicycle_counter def cars_use(self, local_links=None, timeslot=None): if local_links is None: def filter_exp(event): return event["type"] == "departure" and event["legMode"] == "car" else: def filter_exp(event): return ( event["type"] == "departure" and event["legMode"] == "car" and event["link"] in local_links ) cars_counter = 0 filtered_events = self.timeslot_filter_events(timeslot) for _ in filter(filter_exp, filtered_events): cars_counter += 1 return cars_counter def trips_amount(self, timeslot=None): trips_number = 0 filtered_events = self.timeslot_filter_events(timeslot) for event in filtered_events: if event["type"] == "departure" and not event["legMode"] == "walk": trips_number += 1 return trips_number def share_public_transport(self, local_links=None, timeslot=None): try: return self.public_transport_use(local_links, timeslot) / self.trips_number except: app.logger.warn("division by zero in share_public_transport") return 0 def share_cars(self, local_links=None, timeslot=None): try: return self.cars_use(local_links, timeslot) / self.trips_number except: app.logger.warn("division by zero in share_cars") return 0 def share_bicycles(self, local_links=None, timeslot=None): logger.debug("share of bicycles") try: results = self.vehicles_count(local_links=local_links, timeslot=timeslot) # logger.debug(results) divisor = ( (results["car_count"] + results["bike_count"] + results["bus_count"]) if local_links is not None else self.trips_number ) result = self.bicycle_use(local_links, timeslot) / divisor # logger.debug(result) return result except ZeroDivisionError as e: app.logger.warn("division by zero in share_bicycles") return 0 # for messina PT fleet def count_pt_vehicles_drivers(self, timeslot=None): drv_pool_driving = [] # [ person_id ] drv_pool_waiting = [] # [ person_id ] drv_start_times = {} # { person_id: start_time} veh_pool_driving = [] # [ veh_id ] veh_pool_waiting = [] # [ veh_id ] filter_exp = lambda event: event["type"] in [ "vehicle enters traffic", "vehicle leaves traffic", "PersonEntersVehicle", "PersonLeavesVehicle", ] max_vehs = 0 max_drivers = 0 filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): # vehicles if event["type"] == "vehicle enters traffic" and event["vehicle"].endswith( "bus" ): # check if any vehicles waiting if len(veh_pool_waiting) > 0: # add to driving, remove from waiting veh_pool_waiting.pop(0) veh_pool_driving.append(event["vehicle"]) elif event["type"] == "vehicle leaves traffic" and event[ "vehicle" ].endswith("bus"): # add to waiting, remove from driving veh_pool_driving.remove(event["vehicle"]) veh_pool_waiting.append(event["vehicle"]) # drivers elif event["type"] == "PersonEntersVehicle" and event["person"].startswith( "pt" ): # record start time if event["person"] not in drv_start_times: drv_start_times[event["person"]] = event["time"] # check if a driver is waiting if len(drv_pool_waiting) > 0: # add to driving, remove from waiting drv_pool_waiting.pop(0) drv_pool_driving.append(event["person"]) elif event["type"] == "PersonLeavesVehicle" and event["person"].startswith( "pt" ): # remove from driving drv_pool_driving.remove(event["person"]) # add to waiting if 8 hour shift not over if ( float(event["time"]) - float(drv_start_times[event["person"]]) < 8 * 60 * 60 ): drv_pool_waiting.append(event["person"]) if len(drv_pool_driving) > max_drivers: max_drivers = len(drv_pool_driving) if len(veh_pool_driving) > max_vehs: max_vehs = len(veh_pool_driving) return {"vehicles": max_vehs, "drivers": max_drivers} class EmissionEvents(Events): """ EmissionEvents represents emissions calculated from the simulation results. """ def __init__(self, path, network): Events.__init__(self, path, network) def emissions_total(self, timeslot=None): filtered_events = self.timeslot_filter_events(timeslot) emissions = { # "CO": 0.0, "CO2_TOTAL": 0.0, # "HC": 0.0, "NOx": 0.0, "PM": 0.0, # "CO2_rep": 0.0, } for event in filter( lambda event: event["type"].find("missionEvent") >= 0, filtered_events ): # emissions["CO"] += float(event["CO"]) emissions["CO2_TOTAL"] += float(event["CO2_TOTAL"]) # emissions["HC"] += float(event["HC"]) emissions["NOx"] += float(event["NOx"]) emissions["PM"] += float(event["PM"]) # emissions["CO2_rep"] += float(event["CO2_rep"]) return emissions def total_emissions_by_link(self, input_links=None, timeslot=None): if input_links is None: def filter_exp(_event): return _event else: def filter_exp(_event): return ( _event["type"] == "departure" or _event["type"] == "arrival" ) and _event["link"] in input_links filtered_events = self.timeslot_filter_events(timeslot) links = {} for event in filter(filter_exp, filtered_events): if not event["linkId"] in links: links[event["linkId"]] = { # "CO": 0.0, "CO2_TOTAL": 0.0, # "HC": 0.0, "NOx": 0.0, "PM": 0.0, # "CO2_rep": 0.0, } # links[event["linkId"]]["CO"] += float(event["CO"]) links[event["linkId"]]["CO2_TOTAL"] += float(event["CO2_TOTAL"]) # links[event["linkId"]]["HC"] += float(event["HC"]) links[event["linkId"]]["NOx"] += float(event["NOx"]) links[event["linkId"]]["PM"] += float(event["PM"]) # links[event["linkId"]]["CO2_rep"] += float(event["CO2_rep"]) return links def emissions_total_links_sum(self, local_links=None, timeslot=None): if local_links is None: def filter_exp(_event): return True else: def filter_exp(_event): return _event["linkId"] in local_links emissions_sum = { # "CO": 0.0, "CO2_TOTAL": 0.0, # "HC": 0.0, "NOx": 0.0, "PM": 0.0, # "CO2_rep": 0.0, } filtered_events = self.timeslot_filter_events(timeslot) for event in filter(filter_exp, filtered_events): # emissions_sum["CO"] += float(event["CO"]) emissions_sum["CO2_TOTAL"] += float(event["CO2_TOTAL"]) # emissions_sum["HC"] += float(event["HC"]) emissions_sum["NOx"] += float(event["NOx"]) emissions_sum["PM"] += float(event["PM"]) # emissions_sum["CO2_rep"] += float(event["CO2_rep"]) return emissions_sum