Skip to content

Instantly share code, notes, and snippets.

@FrankRuns
Created December 1, 2024 10:02
Show Gist options
  • Save FrankRuns/d385c2dd2400ef345c2087236651af27 to your computer and use it in GitHub Desktop.
Save FrankRuns/d385c2dd2400ef345c2087236651af27 to your computer and use it in GitHub Desktop.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2
from scipy.spatial.distance import cdist
import matplotlib.cm as cm
# Set random seed for reproducibility
np.random.seed(42)
class ScenarioSettings:
def __init__(self, name='Scenario',
transit_speed_scenario='baseline',
unload_time_scenario='baseline',
driver_shift_range_scenario='baseline'):
self.name = name
self.transit_speed_scenario = transit_speed_scenario
self.unload_time_scenario = unload_time_scenario
self.driver_shift_range_scenario = driver_shift_range_scenario
def generate_stops(num_stops=100, random_seed=42):
# Generate synthetic coordinates for locations within a city grid (0 to 100 units)
np.random.seed(random_seed)
x_coords = np.random.uniform(0, 100, size=num_stops)
y_coords = np.random.uniform(0, 100, size=num_stops)
# Create a DataFrame for stops
stops = pd.DataFrame({
'Stop_ID': np.arange(1, num_stops + 1), # Stop IDs starting from 1
'X': x_coords,
'Y': y_coords
})
# Assign location types
location_types = ['Urban', 'Suburban', 'Rural']
stops['Location_Type'] = np.random.choice(location_types, size=num_stops, p=[0.5, 0.3, 0.2])
# Generate unload times based on location type
unload_time_means = {'Urban': 20, 'Suburban': 30, 'Rural': 40} # in minutes
unload_time_std = {'Urban': 5, 'Suburban': 7, 'Rural': 10}
stops['Unload_Time'] = stops['Location_Type'].apply(
lambda lt: np.random.normal(unload_time_means[lt], unload_time_std[lt]))
# Ensure unload times are positive
stops['Unload_Time'] = stops['Unload_Time'].clip(lower=5)
# Add depot at position (0,0)
depot = pd.DataFrame({'Stop_ID': [0],
'X': [0],
'Y': [0],
'Location_Type': ['Depot'],
'Unload_Time': [0]})
stops = pd.concat([depot, stops], ignore_index=True)
stops.reset_index(drop=True, inplace=True)
return stops, unload_time_means, unload_time_std
def compute_distance_matrix(stops):
# Compute the distance matrix
coordinates = stops[['X', 'Y']].values
distance_matrix = cdist(coordinates, coordinates, metric='euclidean')
distance_matrix_miles = distance_matrix # in miles
return distance_matrix_miles
def compute_transit_time_matrix(stops, distance_matrix_miles, settings):
if settings.transit_speed_scenario == 'baseline':
# Use baseline transit speed
average_speed = 30 # mph
distance_matrix_minutes = (distance_matrix_miles / average_speed) * 60 # Convert to minutes
distance_matrix_minutes = distance_matrix_minutes.astype(int)
transit_time_matrix = distance_matrix_minutes
elif settings.transit_speed_scenario == 'alternative':
# Use alternative transit speed
# Define average speeds based on location types
average_speeds = {
'Urban': 20, # mph
'Suburban': 30, # mph
'Rural': 60 # mph
}
# Define variation parameters
speed_variation = 0.1 # 10% standard deviation
# Create a speed matrix based on location types with variation
num_locations = len(stops)
speed_matrix = np.zeros((num_locations, num_locations))
for i in range(num_locations):
for j in range(num_locations):
loc_type_i = stops.loc[i, 'Location_Type']
loc_type_j = stops.loc[j, 'Location_Type']
# Get base speeds
speed_i = average_speeds.get(loc_type_i, 30) # Default to 30 mph if unknown
speed_j = average_speeds.get(loc_type_j, 30)
# Calculate average speed with variation
avg_speed = (speed_i + speed_j) / 2
# Add random variation using normal distribution
varied_speed = np.random.normal(avg_speed, avg_speed * speed_variation)
speed_matrix[i][j] = max(varied_speed, 5) # Ensure speed doesn't go below 5 mph
# Compute the transit times based on the varied speed matrix
transit_time_matrix = np.zeros((num_locations, num_locations), dtype=int)
for i in range(num_locations):
for j in range(num_locations):
distance = distance_matrix_miles[i][j]
avg_speed = speed_matrix[i][j]
if avg_speed == 0:
transit_time = 0
else:
transit_time = (distance / avg_speed) * 60 # in minutes
transit_time_matrix[i][j] = int(transit_time)
else:
raise ValueError(f"Unknown transit speed scenario: {settings.transit_speed_scenario}")
return transit_time_matrix
def compute_service_times(stops, unload_time_means, settings):
if settings.unload_time_scenario == 'baseline':
# BASELINE for unload time (fixed at 75th percentile of all unload times which is 35)
baseline_unload_time = 35 # Fixed at 35 minutes
service_times = [baseline_unload_time] * len(stops) # Fixed service time for all stops
elif settings.unload_time_scenario == 'alternative':
# ALTERNATIVE for unload time (average value for each location type)
service_times = stops['Location_Type'].apply(
lambda lt: 0 if lt == 'Depot' else unload_time_means[lt]
).tolist()
else:
raise ValueError(f"Unknown unload time scenario: {settings.unload_time_scenario}")
return service_times
def compute_driver_shift_duration(settings):
if settings.driver_shift_range_scenario == 'baseline':
driver_ontime_range = 60
elif settings.driver_shift_range_scenario == 'alternative':
driver_ontime_range = 10
else:
raise ValueError(f"Unknown driver shift range scenario: {settings.driver_shift_range_scenario}")
# Compute driver shift duration
driver_shift_duration = 14 * 60 - driver_ontime_range # 14 hours in minutes
return driver_shift_duration
def create_data_model(distance_matrix_miles, transit_time_matrix, service_times, adjusted_time_windows, num_stops, max_num_vehicles, depot_index):
data = {}
data['distance_matrix'] = distance_matrix_miles.tolist()
data['time_matrix'] = transit_time_matrix.tolist()
data['num_vehicles'] = max_num_vehicles
data['depot'] = depot_index
data['service_times'] = service_times
data['time_windows'] = adjusted_time_windows
data['demands'] = [0] + [1] * num_stops # Depot has demand 0
data['vehicle_capacities'] = [num_stops] * max_num_vehicles # Vehicles can serve all stops
data['fixed_cost'] = 10000 # Fixed cost for using a vehicle
return data
def print_driver_schedule(vehicle_id, routing, solution, manager, time_dimension, data, num_used_vehicles, stops, depot_index, distance_matrix_miles):
index = routing.Start(vehicle_id)
route = []
while not routing.IsEnd(index):
node_index = manager.IndexToNode(index)
route.append(node_index)
index = solution.Value(routing.NextVar(index))
if len(route) <= 1:
print(f"Driver {vehicle_id + 1} was not used.")
return
print(f"\nDetailed Schedule for Driver {vehicle_id + 1}:")
index = routing.Start(vehicle_id)
plan_output = ''
route_distance = 0
route_time = 0
while not routing.IsEnd(index):
node_index = manager.IndexToNode(index)
cumulative_time = solution.Value(time_dimension.CumulVar(index))
if node_index == depot_index:
location_type = 'Depot'
else:
location_type = stops.loc[node_index, 'Location_Type']
arrival_time = cumulative_time
service_time = data['service_times'][node_index]
departure_time = arrival_time + service_time
plan_output += f"Location {stops.loc[node_index, 'Stop_ID']} ({location_type})\n"
plan_output += f" - Arrival Time: {arrival_time} minutes\n"
plan_output += f" - Service Time: {service_time} minutes\n"
plan_output += f" - Departure Time: {departure_time} minutes\n"
previous_index = index
index = solution.Value(routing.NextVar(index))
if not routing.IsEnd(index):
next_node_index = manager.IndexToNode(index)
travel_time = data['time_matrix'][node_index][next_node_index]
distance = distance_matrix_miles[node_index][next_node_index]
plan_output += f" - Travel Time to Next: {travel_time} minutes\n"
plan_output += f" - Distance to Next: {distance:.2f} miles\n\n"
route_time += travel_time + service_time
route_distance += distance
else:
# Return to depot
travel_time = data['time_matrix'][node_index][depot_index]
distance = distance_matrix_miles[node_index][depot_index]
route_time += travel_time + service_time
route_distance += distance
plan_output += f" - Travel Time to Depot: {travel_time} minutes\n"
plan_output += f" - Distance to Depot: {distance:.2f} miles\n"
cumulative_time = solution.Value(time_dimension.CumulVar(index))
plan_output += f"Return to Depot\n"
plan_output += f" - Arrival Time at Depot: {cumulative_time} minutes\n"
print(plan_output)
print(f"Total Distance for Driver {vehicle_id + 1}: {route_distance:.2f} miles")
print(f"Total Time for Driver {vehicle_id + 1}: {route_time:.2f} minutes")
def plot_routes(routes, stops, num_used_vehicles, scenario_name):
plt.figure(figsize=(12, 10))
plt.scatter(stops['X'][1:], stops['Y'][1:], c='gray', label='Delivery Locations') # Exclude depot
plt.scatter(stops['X'][0], stops['Y'][0], c='red', label='Depot')
colors = cm.rainbow(np.linspace(0, 1, num_used_vehicles))
for idx, route_info in enumerate(routes):
route = route_info['route']
route_coords = stops.loc[route, ['X', 'Y']].values
plt.plot(route_coords[:, 0], route_coords[:, 1], color=colors[idx], label=f'Route {idx+1}')
plt.scatter(route_coords[1:-1, 0], route_coords[1:-1, 1], color=colors[idx]) # Stops excluding depot
plt.title(f'{scenario_name} Vehicle Routes')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.legend(loc='upper right')
plt.grid(True)
plt.show()
def main():
import itertools # Import itertools for generating combinations
# Possible values for each setting
transit_speed_options = ['baseline', 'alternative']
unload_time_options = ['baseline', 'alternative']
driver_shift_range_options = ['baseline', 'alternative']
# Generate all combinations of settings
combinations = list(itertools.product(transit_speed_options,
unload_time_options,
driver_shift_range_options))
# Create scenarios based on combinations
scenarios = []
for idx, (transit_speed, unload_time, driver_shift_range) in enumerate(combinations):
scenario_name = f"Scenario_{idx+1}"
scenarios.append(
ScenarioSettings(
name=scenario_name,
transit_speed_scenario=transit_speed,
unload_time_scenario=unload_time,
driver_shift_range_scenario=driver_shift_range
)
)
for settings in scenarios:
print(f"\nRunning scenario: {settings.name}")
print(f" - Transit Speed Scenario: {settings.transit_speed_scenario}")
print(f" - Unload Time Scenario: {settings.unload_time_scenario}")
print(f" - Driver Shift Range Scenario: {settings.driver_shift_range_scenario}")
# Generate data
num_stops = 100
stops, unload_time_means, unload_time_std = generate_stops(num_stops=num_stops, random_seed=42)
distance_matrix_miles = compute_distance_matrix(stops)
transit_time_matrix = compute_transit_time_matrix(stops, distance_matrix_miles, settings)
service_times = compute_service_times(stops, unload_time_means, settings)
driver_shift_duration = compute_driver_shift_duration(settings)
adjusted_time_windows = [(0, driver_shift_duration) for _ in range(len(stops))]
# Vehicle and depot information
max_num_vehicles = 20 # Set a reasonable maximum number of vehicles
depot_index = 0 # Index of the depot in the stops DataFrame
data = create_data_model(distance_matrix_miles, transit_time_matrix, service_times,
adjusted_time_windows, num_stops, max_num_vehicles, depot_index)
# SHOW VISUALIZATIONS HERE OF HISTOGRAMS FOR UNLOAD TIMES AND TRANSIT TIMES
# 1. Histogram of Unload Times
plt.figure(figsize=(8, 6))
plt.hist(stops['Unload_Time'][stops['Unload_Time'] > 0], bins=20, color='skyblue', edgecolor='black')
plt.title(f"Histogram of Unload Times ({settings.name})")
plt.xlabel('Unload Time (minutes)')
plt.ylabel('Frequency')
plt.grid(axis='y')
plt.show()
# 2. Histogram of Transit Times
# Flatten the transit time matrix and filter out zeros (self-loops)
transit_times = np.array(transit_time_matrix).flatten()
transit_times = transit_times[transit_times > 0]
plt.figure(figsize=(8, 6))
plt.hist(transit_times, bins=50, color='lightgreen', edgecolor='black')
plt.title(f"Histogram of Transit Times ({settings.name})")
plt.xlabel('Transit Time (minutes)')
plt.ylabel('Frequency')
plt.grid(axis='y')
plt.show()
# Create the routing index manager
manager = pywrapcp.RoutingIndexManager(
len(data['distance_matrix']),
data['num_vehicles'],
[data['depot']] * data['num_vehicles'], # List of start nodes
[data['depot']] * data['num_vehicles']) # List of end nodes
# Create Routing Model
routing = pywrapcp.RoutingModel(manager)
# Define cost of each arc (distance)
def distance_callback(from_index, to_index):
"""Returns the travel distance between the two nodes."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
return data['distance_matrix'][from_node][to_node]
transit_callback_index = routing.RegisterTransitCallback(distance_callback)
routing.SetArcCostEvaluatorOfAllVehicles(transit_callback_index)
# Add Capacity constraint
def demand_callback(from_index):
"""Returns the demand of the node."""
from_node = manager.IndexToNode(from_index)
return data['demands'][from_node]
demand_callback_index = routing.RegisterUnaryTransitCallback(demand_callback)
routing.AddDimensionWithVehicleCapacity(
demand_callback_index,
0, # Null capacity slack
data['vehicle_capacities'], # Vehicle maximum capacities
True, # Start cumul to zero
'Capacity')
# Add Time Windows constraint
def time_callback(from_index, to_index):
"""Returns the total time between the two nodes."""
from_node = manager.IndexToNode(from_index)
to_node = manager.IndexToNode(to_index)
travel_time = data['time_matrix'][from_node][to_node]
service_time = data['service_times'][from_node]
return travel_time + service_time
time_callback_index = routing.RegisterTransitCallback(time_callback)
routing.AddDimension(
time_callback_index,
30, # Allow waiting time of up to 30 minutes at each location
driver_shift_duration, # Maximum time per vehicle (driver shift duration)
True, # Fix start cumul to zero (shift starts at time 0)
'Time')
time_dimension = routing.GetDimensionOrDie('Time')
# Add time window constraints for each location
for location_idx in range(len(stops)):
index = manager.NodeToIndex(location_idx)
time_dimension.CumulVar(index).SetRange(
adjusted_time_windows[location_idx][0],
adjusted_time_windows[location_idx][1])
# Penalize the use of vehicles by adding a fixed cost
for vehicle_id in range(data['num_vehicles']):
routing.SetFixedCostOfVehicle(data['fixed_cost'], vehicle_id)
# Setting first solution heuristic and metaheuristic
search_parameters = pywrapcp.DefaultRoutingSearchParameters()
search_parameters.first_solution_strategy = (
routing_enums_pb2.FirstSolutionStrategy.SAVINGS)
search_parameters.local_search_metaheuristic = (
routing_enums_pb2.LocalSearchMetaheuristic.GUIDED_LOCAL_SEARCH)
search_parameters.time_limit.seconds = 60 # Adjust time limit as needed
search_parameters.log_search = False
# Solve the problem
solution = routing.SolveWithParameters(search_parameters)
if solution:
# Extract routes and compute metrics
total_distance = 0
total_time = 0
num_used_vehicles = 0
routes = []
for vehicle_id in range(data['num_vehicles']):
index = routing.Start(vehicle_id)
route_distance = 0
route_time = 0
route = []
while not routing.IsEnd(index):
node_index = manager.IndexToNode(index)
route.append(node_index)
previous_index = index
index = solution.Value(routing.NextVar(index))
if not routing.IsEnd(index):
next_node_index = manager.IndexToNode(index)
travel_time = data['time_matrix'][node_index][next_node_index]
service_time = data['service_times'][node_index]
route_time += travel_time + service_time
distance = data['distance_matrix'][node_index][next_node_index]
route_distance += distance
else:
# Return to depot
travel_time = data['time_matrix'][node_index][depot_index]
service_time = data['service_times'][node_index]
route_time += travel_time + service_time
distance = data['distance_matrix'][node_index][depot_index]
route_distance += distance
if len(route) > 2:
# Vehicle services at least one stop (excluding depot)
num_used_vehicles += 1
total_distance += route_distance
total_time += route_time
routes.append({
'vehicle_id': vehicle_id,
'route': route + [depot_index], # Add depot at the end
'distance': route_distance,
'time': route_time
})
if num_used_vehicles > 0:
average_shift_time = total_time / num_used_vehicles
average_distance = total_distance / num_used_vehicles
else:
average_shift_time = 0
average_distance = 0
print(f"Number of drivers needed: {num_used_vehicles}")
print(f"All stops met: {'Yes' if num_used_vehicles > 0 else 'No'}")
print(f"Average driver shift time: {average_shift_time:.2f} minutes")
print(f"Average miles driven per driver: {average_distance:.2f} miles")
else:
print('No solution found!')
# Print schedules for all used drivers
# for idx, route_info in enumerate(routes):
# vehicle_id = route_info['vehicle_id']
# print_driver_schedule(
# vehicle_id=vehicle_id,
# routing=routing,
# solution=solution,
# manager=manager,
# time_dimension=time_dimension,
# data=data,
# num_used_vehicles=num_used_vehicles,
# stops=stops,
# depot_index=depot_index,
# distance_matrix_miles=distance_matrix_miles
# )
# Plot the routes on the map
plot_routes(routes, stops, num_used_vehicles, settings.name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment