signals_gen.py 5.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env python3
  2. ''' generates signals xml files for MATSim from OSM data '''
  3. import lxml.etree as etree
  4. INPUT_NETWORK = 'input/network.xml'
  5. INPUT_SIGNALS = 'input/signals.xml'
  6. OUT_SIGNAL_SYSTEMS = 'out_signals_systems.xml'
  7. OUT_SIGNAL_GROUPS = 'out_signals_groups.xml'
  8. OUT_SIGNAL_CONTROL = 'out_signals_control.xml'
  9. CYCLE_TIME = 120
  10. def make_subelement(parent_node, sub_name, sub_attrs=None):
  11. ''' creates an xml sub element and set its attributes '''
  12. sub = etree.SubElement(parent_node, sub_name)
  13. if sub_attrs is None:
  14. return sub
  15. for attr, value in sub_attrs.items():
  16. sub.set(attr, value)
  17. return sub
  18. def make_signal_systems(network_root, signals):
  19. ''' make signal systems xml root '''
  20. signal_systems = etree.Element('signalSystems')
  21. signal_systems.set('xmlns', 'http://www.matsim.org/files/dtd')
  22. signal_systems.set('{http://www.w3.org/2001/XMLSchema-instance}schemaLocation', 'http://www.matsim.org/files/dtd http://www.matsim.org/files/dtd/signalSystems_v2.0.xsd')
  23. for n_sig, sig in enumerate(signals):
  24. sig_node_id = sig.get('id')
  25. links = network_root.xpath("/network/links/link[@to='{}']".format(sig_node_id))
  26. if links:
  27. signal_system = make_subelement(signal_systems, 'signalSystem', {'id': str(n_sig+1)})
  28. signals = make_subelement(signal_system, 'signals')
  29. for n_link, link in enumerate(links):
  30. make_subelement(signals, 'signal', {'linkIdRef': link.get('id'), 'id': str(n_link+1)})
  31. return signal_systems
  32. def make_signal_groups(signal_systems):
  33. ''' make signal groups xml root '''
  34. signal_groups = etree.Element('signalGroups')
  35. signal_groups.set('xmlns', 'http://www.matsim.org/files/dtd')
  36. signal_groups.set('{http://www.w3.org/2001/XMLSchema-instance}schemaLocation', 'http://www.matsim.org/files/dtd http://www.matsim.org/files/dtd/signalGroups_v2.0.xsd')
  37. for n_sig_sys, sig_sys in enumerate(signal_systems.xpath('/signalSystems/signalSystem')):
  38. signal_system = make_subelement(signal_groups, 'signalSystem', {'refId': sig_sys.get('id')})
  39. for n_sig, sig in enumerate(sig_sys.xpath('signals/signal')):
  40. signal_group = make_subelement(signal_system, 'signalGroup', {'id': str(n_sig+1)})
  41. make_subelement(signal_group, 'signal', {'refId': sig.get('id')})
  42. return signal_groups
  43. def make_signal_control(signal_systems, signal_groups):
  44. ''' make signal control xml root '''
  45. signal_control = etree.Element('signalControl')
  46. signal_control.set('xmlns', 'http://www.matsim.org/files/dtd')
  47. signal_control.set('{http://www.w3.org/2001/XMLSchema-instance}schemaLocation', 'http://www.matsim.org/files/dtd http://www.matsim.org/files/dtd/signalControl_v2.0.xsd')
  48. for sig_sys in signal_systems.xpath('/signalSystems/signalSystem'):
  49. sig_sys_id = sig_sys.get('id')
  50. signal_system = make_subelement(signal_control, 'signalSystem', {'refId': sig_sys_id})
  51. sig_sys_controller = make_subelement(signal_system, 'signalSystemController')
  52. controller_identifier = make_subelement(sig_sys_controller, 'controllerIdentifier')
  53. controller_identifier.text = 'DefaultPlanbasedSignalSystemController'
  54. signal_plan = make_subelement(sig_sys_controller, 'signalPlan', {'id': '1'})
  55. make_subelement(signal_plan, 'start', {'daytime': '06:00:00'})
  56. make_subelement(signal_plan, 'stop', {'daytime': '18:00:00'})
  57. make_subelement(signal_plan, 'cycleTime', {'sec': str(CYCLE_TIME)})
  58. make_subelement(signal_plan, 'offset', {'sec': '0'})
  59. sig_groups = signal_groups.xpath("/signalGroups/signalSystem[@refId='{}']/signalGroup".format(sig_sys_id))
  60. for n_sig_grp, sig_grp in enumerate(sig_groups):
  61. sig_group_settings = make_subelement(signal_plan, 'signalGroupSettings', {'refId': sig_grp.get('id')})
  62. delta_sec = int(CYCLE_TIME / len(sig_groups))
  63. onset_sec = delta_sec * n_sig_grp
  64. dropping_sec = onset_sec + delta_sec - 5
  65. make_subelement(sig_group_settings, 'onset', {'sec': str(onset_sec)})
  66. make_subelement(sig_group_settings, 'dropping', {'sec': str(dropping_sec)})
  67. return signal_control
  68. if __name__ == '__main__':
  69. # network nodes
  70. NETWORK_ROOT = etree.parse(INPUT_NETWORK)
  71. # signals nodes
  72. SIGNALS_ROOT = etree.parse(INPUT_SIGNALS)
  73. SIGNALS = SIGNALS_ROOT.xpath('/osm/node')
  74. # xml trees
  75. SIGNAL_SYSTEMS = make_signal_systems(NETWORK_ROOT, SIGNALS)
  76. SIGNAL_GROUPS = make_signal_groups(SIGNAL_SYSTEMS)
  77. SIGNAL_CONTROL = make_signal_control(SIGNAL_SYSTEMS, SIGNAL_GROUPS)
  78. # output
  79. TREE_SIGNAL_SYSTEMS = etree.ElementTree(SIGNAL_SYSTEMS)
  80. TREE_SIGNAL_GROUPS = etree.ElementTree(SIGNAL_GROUPS)
  81. TREE_SIGNAL_CONTROL = etree.ElementTree(SIGNAL_CONTROL)
  82. TREE_SIGNAL_SYSTEMS.write(OUT_SIGNAL_SYSTEMS, pretty_print=True, xml_declaration=True, encoding="utf-8")
  83. TREE_SIGNAL_GROUPS.write(OUT_SIGNAL_GROUPS, pretty_print=True, xml_declaration=True, encoding="utf-8")
  84. TREE_SIGNAL_CONTROL.write(OUT_SIGNAL_CONTROL, pretty_print=True, xml_declaration=True, encoding="utf-8")