123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #!/usr/bin/env python3
- ''' plan_gen main functions '''
- import lxml.etree as etree
- import numpy as np
- import pandas
- import time
- import utm
- # constants
- # ------------------
- MIN_DEPARTURE_TIME = '08:00:00'
- MAX_DEPARTURE_TIME = '09:00:00'
- WORK_DURATION = '04:00:00'
- # utils
- # ------------------
- def parse_value(string):
- ''' convert string to int, float or string '''
- try:
- return int(string)
- except ValueError:
- try:
- return float(string)
- except ValueError:
- return string
- def parse_params(param_str):
- ''' parse a param string to a dict '''
- dict_params = {}
- if param_str:
- for key_value_str in param_str.split(','):
- key, value = key_value_str.split('=')
- if key in ['hc', 'hw', 'wc', 'ww']:
- coords = value.split('|')
- dict_params[key] = [np.fromstring(str(x), dtype=int, sep=':') for x in coords]
- elif key in ['hr', 'wr']:
- dict_params[key] = np.fromstring(str(value), dtype=int, sep='|')
- else:
- dict_params[key] = parse_value(value)
- return dict_params
- def parse_latlon(csv):
- ''' parse lat and lon from a csv '''
- df = pandas.read_csv(csv, sep=';')
- lat = df.lat.tolist()
- lon = df.lon.tolist()
- return lat, lon
- def parse_weights(csv):
- ''' parse weights from a csv '''
- df = pandas.read_csv(csv, sep=';')
- weights = df.nbpeople.tolist()
- return weights
- def latlon_to_utm(lat, lon):
- ''' convert lat lon to utm coordinates '''
- utms = [utm.from_latlon(a, b) for a, b in zip(lat, lon)]
- xutm = [utm_coords[0] for utm_coords in utms]
- yutm = [utm_coords[1] for utm_coords in utms]
- return xutm, yutm
- def xy_to_ij(x, y, dx, dy, nb_clusters):
- i, j = (int(x/dx), int(y/dy))
- if i >= nb_clusters:
- i -= 1
- if j >= nb_clusters:
- j -= 1
- return i, j
- def get_seconds(time_str):
- ''' returns seconds from a time string '''
- h, m, s = time_str.split(':')
- return int(h) * 3600 + int(m) * 60 + int(s)
- def make_gaussian(size, center=None, radius=10):
- ''' make a square gaussian kernel '''
- x = np.arange(0, size, 1, float)
- y = x[:, np.newaxis]
- if center is None:
- x0 = y0 = size // 2
- else:
- x0 = center[0]
- y0 = center[1]
- return np.exp(-4*np.log(2) * ((x-x0)**2 + (y-y0)**2) / radius**2)
- # main functions
- # ------------------
- def make_clusters(nb_clusters, nodes):
- ''' make a grid of (nb_clusters*nb_clusters) from a nodes list '''
- xmin, xmax, ymin, ymax, dx, dy = get_min_max_steps(nodes, nb_clusters)
- clusters = np.empty((nb_clusters, nb_clusters), dtype=object)
- for node in nodes:
- x, y = (float(node.get('x')) - xmin, float(node.get('y')) - ymin)
- i, j = xy_to_ij(x, y, dx, dy, nb_clusters)
- if clusters[i][j] is None:
- clusters[i][j] = []
- clusters[i][j] += [node]
- return clusters
- def make_centers(csv, nb_clusters, nodes):
- ''' make centers from a csv file '''
- lat, lon = parse_latlon(csv)
- xutm, yutm = latlon_to_utm(lat, lon)
- xmin, xmax, ymin, ymax, dx, dy = get_min_max_steps(nodes, nb_clusters)
- centers = []
- for x, y in zip(xutm, yutm):
- i, j = xy_to_ij(x - xmin, y - ymin, dx, dy, nb_clusters)
- centers.append([i, j])
- return centers
- def make_densities(nb_clusters, centers=None, radius=None, weights=None):
- ''' make a list of gaussian probability densities '''
- if centers is None:
- return make_gaussian(nb_clusters, radius=nb_clusters/2)
- densities = np.zeros((nb_clusters, nb_clusters))
- for n, (c, w) in enumerate(zip(centers, weights)):
- densities += w * make_gaussian(nb_clusters, center=c, radius=radius[n])
- return densities
- def clean_densities(densities, clusters):
- ''' clean density probability if a cluster is empty '''
- densities = densities.flatten()
- clusters = clusters.flatten()
- for n, c in enumerate(clusters):
- if c is None:
- densities[n] = 0.0
- return densities
- # random generators
- # ------------------
- def rand_time(low, high):
- ''' returns a random time between low and high bounds '''
- low_s = get_seconds(low)
- high_s = get_seconds(high)
- delta = np.random.randint(high_s - low_s)
- return time.strftime('%H:%M:%S', time.gmtime(low_s + delta))
- def rand_node_xy(nodes, clusters, densities):
- ''' returns a random node coordinates from a random cluster '''
- node = None
- clusters = clusters.flatten()
- densities = densities.flatten()
- cluster = np.random.choice(clusters, p=densities/sum(densities))
- node = cluster[np.random.randint(len(cluster))]
- return (node.get('x'), node.get('y'))
- def rand_person(nodes, clusters, h_dens, w_dens):
- ''' returns a person as a dictionnary of random parameters '''
- home_xy = rand_node_xy(nodes, clusters, h_dens)
- work_xy = rand_node_xy(nodes, clusters, w_dens)
- home_departure = rand_time(MIN_DEPARTURE_TIME, MAX_DEPARTURE_TIME)
- return {'home': home_xy, 'work': work_xy, 'home_departure': home_departure}
- # xml builders
- # ------------------
- def make_child(parent_node, child_name, child_attrs=None):
- ''' creates an xml child element and set its attributes '''
- child = etree.SubElement(parent_node, child_name)
- if child_attrs is None:
- return child
- for attr, value in child_attrs.items():
- child.set(attr, value)
- return child
- def make_plans(persons):
- ''' makes xml tree of plans based on persons list '''
- plans = etree.Element('plans')
- for n, p in enumerate(persons):
- person = make_child(plans, 'person', {'id': str(n+1)})
- plan = make_child(person, 'plan')
- # plan
- make_child(plan, 'act', {'type': 'h', 'x': p['home'][0], 'y': p['home'][1], 'end_time': p['home_departure']})
- make_child(plan, 'leg', {'mode': 'car'})
- make_child(plan, 'act', {'type': 'w', 'x': p['work'][0], 'y': p['work'][1], 'dur': WORK_DURATION})
- make_child(plan, 'leg', {'mode': 'car'})
- make_child(plan, 'act', {'type': 'h', 'x': p['home'][0], 'y': p['home'][1]})
- return plans
- # xml readers
- # ------------------
- def get_nodes(input_network):
- ''' returns all network nodes as a list '''
- if not input_network:
- return None
- tree = etree.parse(input_network)
- return [node for node in tree.xpath("/network/nodes/node")]
- def get_extrem_nodes(nodes):
- ''' returns extremum coordinates of a nodeslist '''
- x = [float(node.get('x')) for node in nodes]
- y = [float(node.get('y')) for node in nodes]
- return min(x), max(x), min(y), max(y)
- def get_min_max_steps(nodes, nb_clusters):
- ''' returns min max steps '''
- xmin, xmax, ymin, ymax = get_extrem_nodes(nodes)
- dx = (xmax - xmin) / nb_clusters
- dy = (ymax - ymin) / nb_clusters
- return xmin, xmax, ymin, ymax, dx, dy
|