#!/usr/bin/env python3 ''' plan_gen main functions ''' import lxml.etree as etree import numpy as np import pandas import time import utm # constants # ------------------ MIN_DEPARTURE_TIME = '08:00:00' MAX_DEPARTURE_TIME = '09:00:00' WORK_DURATION = '04:00:00' # utils # ------------------ def parse_value(string): ''' convert string to int, float or string ''' try: return int(string) except ValueError: try: return float(string) except ValueError: return string def parse_params(param_str): ''' parse a param string to a dict ''' dict_params = {} if param_str: for key_value_str in param_str.split(','): key, value = key_value_str.split('=') if key in ['hc', 'hw', 'wc', 'ww']: coords = value.split('|') dict_params[key] = [np.fromstring(str(x), dtype=int, sep=':') for x in coords] elif key in ['hr', 'wr']: dict_params[key] = np.fromstring(str(value), dtype=int, sep='|') else: dict_params[key] = parse_value(value) return dict_params def parse_latlon(csv): ''' parse lat and lon from a csv ''' df = pandas.read_csv(csv, sep=';') lat = df.lat.tolist() lon = df.lon.tolist() return lat, lon def parse_weights(csv): ''' parse weights from a csv ''' df = pandas.read_csv(csv, sep=';') weights = df.nbpeople.tolist() return weights def latlon_to_utm(lat, lon): ''' convert lat lon to utm coordinates ''' utms = [utm.from_latlon(a, b) for a, b in zip(lat, lon)] xutm = [utm_coords[0] for utm_coords in utms] yutm = [utm_coords[1] for utm_coords in utms] return xutm, yutm def xy_to_ij(x, y, dx, dy, nb_clusters): i, j = (int(x/dx), int(y/dy)) if i >= nb_clusters: i -= 1 if j >= nb_clusters: j -= 1 return i, j def get_seconds(time_str): ''' returns seconds from a time string ''' h, m, s = time_str.split(':') return int(h) * 3600 + int(m) * 60 + int(s) def make_gaussian(size, center=None, radius=10): ''' make a square gaussian kernel ''' x = np.arange(0, size, 1, float) y = x[:, np.newaxis] if center is None: x0 = y0 = size // 2 else: x0 = center[0] y0 = center[1] return np.exp(-4*np.log(2) * ((x-x0)**2 + (y-y0)**2) / radius**2) # main functions # ------------------ def make_clusters(nb_clusters, nodes): ''' make a grid of (nb_clusters*nb_clusters) from a nodes list ''' xmin, xmax, ymin, ymax, dx, dy = get_min_max_steps(nodes, nb_clusters) clusters = np.empty((nb_clusters, nb_clusters), dtype=object) for node in nodes: x, y = (float(node.get('x')) - xmin, float(node.get('y')) - ymin) i, j = xy_to_ij(x, y, dx, dy, nb_clusters) if clusters[i][j] is None: clusters[i][j] = [] clusters[i][j] += [node] return clusters def make_centers(csv, nb_clusters, nodes): ''' make centers from a csv file ''' lat, lon = parse_latlon(csv) xutm, yutm = latlon_to_utm(lat, lon) xmin, xmax, ymin, ymax, dx, dy = get_min_max_steps(nodes, nb_clusters) centers = [] for x, y in zip(xutm, yutm): i, j = xy_to_ij(x - xmin, y - ymin, dx, dy, nb_clusters) centers.append([i, j]) return centers def make_densities(nb_clusters, centers=None, radius=None, weights=None): ''' make a list of gaussian probability densities ''' if centers is None: return make_gaussian(nb_clusters, radius=nb_clusters/2) densities = np.zeros((nb_clusters, nb_clusters)) for n, (c, w) in enumerate(zip(centers, weights)): densities += w * make_gaussian(nb_clusters, center=c, radius=radius[n]) return densities def clean_densities(densities, clusters): ''' clean density probability if a cluster is empty ''' densities = densities.flatten() clusters = clusters.flatten() for n, c in enumerate(clusters): if c is None: densities[n] = 0.0 return densities # random generators # ------------------ def rand_time(low, high): ''' returns a random time between low and high bounds ''' low_s = get_seconds(low) high_s = get_seconds(high) delta = np.random.randint(high_s - low_s) return time.strftime('%H:%M:%S', time.gmtime(low_s + delta)) def rand_node_xy(nodes, clusters, densities): ''' returns a random node coordinates from a random cluster ''' node = None clusters = clusters.flatten() densities = densities.flatten() cluster = np.random.choice(clusters, p=densities/sum(densities)) node = cluster[np.random.randint(len(cluster))] return (node.get('x'), node.get('y')) def rand_person(nodes, clusters, h_dens, w_dens): ''' returns a person as a dictionnary of random parameters ''' home_xy = rand_node_xy(nodes, clusters, h_dens) work_xy = rand_node_xy(nodes, clusters, w_dens) home_departure = rand_time(MIN_DEPARTURE_TIME, MAX_DEPARTURE_TIME) return {'home': home_xy, 'work': work_xy, 'home_departure': home_departure} # xml builders # ------------------ def make_child(parent_node, child_name, child_attrs=None): ''' creates an xml child element and set its attributes ''' child = etree.SubElement(parent_node, child_name) if child_attrs is None: return child for attr, value in child_attrs.items(): child.set(attr, value) return child def make_plans(persons): ''' makes xml tree of plans based on persons list ''' plans = etree.Element('plans') for n, p in enumerate(persons): person = make_child(plans, 'person', {'id': str(n+1)}) plan = make_child(person, 'plan') # plan make_child(plan, 'act', {'type': 'h', 'x': p['home'][0], 'y': p['home'][1], 'end_time': p['home_departure']}) make_child(plan, 'leg', {'mode': 'car'}) make_child(plan, 'act', {'type': 'w', 'x': p['work'][0], 'y': p['work'][1], 'dur': WORK_DURATION}) make_child(plan, 'leg', {'mode': 'car'}) make_child(plan, 'act', {'type': 'h', 'x': p['home'][0], 'y': p['home'][1]}) return plans # xml readers # ------------------ def get_nodes(input_network): ''' returns all network nodes as a list ''' if not input_network: return None tree = etree.parse(input_network) return [node for node in tree.xpath("/network/nodes/node")] def get_extrem_nodes(nodes): ''' returns extremum coordinates of a nodeslist ''' x = [float(node.get('x')) for node in nodes] y = [float(node.get('y')) for node in nodes] return min(x), max(x), min(y), max(y) def get_min_max_steps(nodes, nb_clusters): ''' returns min max steps ''' xmin, xmax, ymin, ymax = get_extrem_nodes(nodes) dx = (xmax - xmin) / nb_clusters dy = (ymax - ymin) / nb_clusters return xmin, xmax, ymin, ymax, dx, dy