Skip to content

Instantly share code, notes, and snippets.

@Nate-Wessel
Created December 22, 2017 13:50
Show Gist options
  • Save Nate-Wessel/a1007aca9effcf3a00f31cfd31e218e4 to your computer and use it in GitHub Desktop.
Save Nate-Wessel/a1007aca9effcf3a00f31cfd31e218e4 to your computer and use it in GitHub Desktop.
OxD multithreaded requests to OTP itinerary API
import requests, json, psycopg2, multiprocessing, time, os
from random import shuffle
from datetime import datetime, timedelta
fileroot = 'itins/jv/sched-all-stops/'
# define the start time
start_time = datetime( year=2017, month=11, day=10 )
print 'from ', start_time
# and go for how long?
end_time = start_time + timedelta(hours=72)
print 'until', end_time
# how many processors to use?
max_procs = int(raw_input('processes to use -->'))
# connect and establish a cursor
conn_string = ("host='localhost' dbname='' user='' password=''")
connection = psycopg2.connect(conn_string)
cursor = connection.cursor()
# get the set of points to route amongst
cursor.execute("""
SELECT
uid,lat,lon
FROM jv_od_points
""")
points = {}
for uid,lat,lon in cursor:
points[uid] = {'lat':lat,'lon':lon}
cursor.close()
def calc_od(od_tuple):
"""calculate all shortest itineraries for this OD pair in the time
period of interest"""
function_start_time = time.time() # for timing the function
# get the OD geometries
oid,did = od_tuple
o,d = points[oid], points[did]
# define the static OTP routing parameters
options = {
'mode':'TRANSIT,WALK',
'maxWalkDistance':3000, # meters
'wheelchair':'false',
'arriveBy':'false', # meaning departAt
'maxTransfers':2,
'numItineraries':1,
'transferPenalty':300, # six minutes
'walkReluctance':1.2,
'waitReluctance':2,
'fromPlace':( str(o['lat'])+','+str(o['lon']) ),
'toPlace':( str(d['lat'])+','+str(d['lon']) )
}
# start at the start time
t = start_time
# results dictionary, keyed by arrival times
results = {}
while t <= end_time:
#print os.getpid(),oid,'->',did, t
# set the time of this query
options['date'] = t.strftime('%m-%d-%Y')
options['time'] = t.strftime('%I:%M%p')
# make the request
response = requests.get(
"http://localhost:8080/otp/routers/jv/plan",
params = options
)
response = json.loads(response.text)
# check for errors
if 'error' in response:
# print an error message
#print oid,'-->',did,' -- ',response['error']['message'],'at',t
# then add a good chunk of time before trying again
t += timedelta(minutes=30)
continue
# now we have an actual result from the router
itinerary = response['plan']['itineraries'][0]
# get details of the route
legs = []
for leg in itinerary['legs']:
if leg['mode'] == 'WALK':
legs.append('w_'+str(int(leg['distance'])))
elif leg['mode'] == 'BUS':
legs.append('stop_'+leg['from']['stopId'])
legs.append('route_'+leg['route'])
legs.append('stop_'+leg['to']['stopId'])
# overwrite any previous trip that may have had the same arrival time
# time from milliseconds to seconds
departure = itinerary['startTime'] / 1000.0
arrival = itinerary['endTime'] / 1000.0
results[arrival] = {
'departure':departure,
'itinerary':legs
}
# in case this is a walk only result, increment by more than a minute
# or we'll be here all day
if len(itinerary['legs']) == 1 and itinerary['legs'][0]['mode'] == 'WALK':
t = datetime.fromtimestamp(departure) + timedelta(minutes=20)
else:
# set the next request for just after the last departure time given here
t = datetime.fromtimestamp(departure) + timedelta(minutes=1)
# after all the results are in, output a file, even an empty one to
# show that the work is done.
outfile = open(fileroot+str(oid)+'/'+str(did)+'.csv','w')
outfile.write('o,d,depart,arrive,itinerary')
for arrival, result in results.items():
outfile.write(
'\n'+str(oid)+','+str(did)+','+
# cast these to datetime
str(datetime.fromtimestamp(arrival))+','+
str(datetime.fromtimestamp(result['departure']))+','+
# format as a postgresql array literal
'"{'+','.join(result['itinerary'])+'}"'
)
outfile.close()
print 'pid',os.getpid(),'for',oid,'->',did,'took',round(time.time()-function_start_time,2),'& found', len(results)
# build out a list of every O->D combination and make sure
# there is a directory structure to store the results
all_OD_pairs = []
for oid, op in points.items():
# see if the results folder exists, if not make it
if not os.path.exists(fileroot+str(oid)):
os.makedirs(fileroot+str(oid))
for did, dp in points.items():
# skip reflexive pairs, and results already found
fpath = fileroot+str(oid)+'/'+str(did)+'.csv'
if oid != did and not os.path.exists(fpath):
# append a tuple with the IDs
all_OD_pairs.append( (oid,did) )
# create the process pool
pool = multiprocessing.Pool(max_procs)
# and get it started
pool.map(calc_od,all_OD_pairs,chunksize=1)
print 'COMPLETED!'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment