Skip to content

Instantly share code, notes, and snippets.

@kshitizkhanal7
Created December 29, 2024 17:27
Show Gist options
  • Save kshitizkhanal7/4bed7ac04f9f89f64c99a5d297a611b7 to your computer and use it in GitHub Desktop.
Save kshitizkhanal7/4bed7ac04f9f89f64c99a5d297a611b7 to your computer and use it in GitHub Desktop.
Benchmarking power system optimization: CPU (using PyPSA) and GPU (using custom solution)
#pip install pypsa cupy-cuda12x clarabel cvxpy
import numpy as np
import cupy as cp
import cvxpy as cvx
import pandas as pd
import time
import pypsa
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')
class PowerSystem:
def __init__(self):
self.buses = {}
self.generators = {}
self.loads = {}
self.lines = {}
def add_bus(self, bus_id, carrier='AC'): # Added carrier
self.buses[bus_id] = {
'generators': [],
'loads': [],
'lines': [],
'carrier': carrier
}
def add_generator(self, gen_id, bus_id, p_nom, cost):
if bus_id not in self.buses:
raise ValueError(f"Bus {bus_id} does not exist")
self.generators[gen_id] = {
'bus': bus_id,
'p_nom': p_nom,
'cost': cost
}
self.buses[bus_id]['generators'].append(gen_id)
def set_load(self, bus_id, p):
if bus_id not in self.buses:
raise ValueError(f"Bus {bus_id} does not exist")
self.loads[bus_id] = p
def add_line(self, line_id, bus0, bus1, x, r, capacity): # Added resistance
if bus0 not in self.buses or bus1 not in self.buses:
raise ValueError("Bus does not exist")
self.lines[line_id] = {
'bus0': bus0,
'bus1': bus1,
'x': x,
'r': r, # Added resistance
'capacity': capacity
}
self.buses[bus0]['lines'].append(line_id)
self.buses[bus1]['lines'].append(line_id)
class GPUOptimizer:
def __init__(self, power_system):
self.ps = power_system
self.setup_problem()
def setup_problem(self):
# Move data to GPU in batches
n_buses = len(self.ps.buses)
n_gens = len(self.ps.generators)
# Preallocate GPU memory
with cp.cuda.Device(0):
self.p_gen = cvx.Variable(n_gens)
self.costs = []
self.constraints = []
# Process in batches of 100 generators
batch_size = 100
for i in range(0, n_gens, batch_size):
batch_end = min(i + batch_size, n_gens)
batch_gens = list(self.ps.generators.items())[i:batch_end]
for j, (gen_id, gen) in enumerate(batch_gens):
idx = i + j
self.costs.append(gen['cost'] * self.p_gen[idx])
self.constraints.extend([
0 <= self.p_gen[idx],
self.p_gen[idx] <= gen['p_nom']
])
# Process power balance constraints in batches
for bus_id in self.ps.buses:
bus_gens = []
for i, (gen_id, gen) in enumerate(self.ps.generators.items()):
if gen['bus'] == bus_id:
bus_gens.append(self.p_gen[i])
if bus_gens:
total_gen = sum(bus_gens)
load = self.ps.loads.get(bus_id, 0)
self.constraints.append(total_gen == load)
self.objective = cvx.Minimize(sum(self.costs))
self.prob = cvx.Problem(self.objective, self.constraints)
def optimize(self, skip_transfer=False):
if not skip_transfer:
cp.cuda.Stream.null.synchronize()
self.prob.solve(solver=cvx.OSQP, eps_abs=1e-4, eps_rel=1e-4)
return {
'status': self.prob.status,
'cost': self.prob.value,
'generation': self.p_gen.value
}
def benchmark_systems(sizes=[10, 100, 500, 1000, 2000, 5000, 10000, 20000]):
results = []
for size in tqdm(sizes, desc="Testing system sizes"):
try:
# Create test system with improved parameters
ps = PowerSystem()
for i in range(size):
ps.add_bus(f'b{i}', carrier='AC') # Specified carrier
ps.add_generator(f'g{i}', f'b{i}',
p_nom=1000,
cost=50+i/size*100)
ps.set_load(f'b{i}', 500 + 100*np.sin(i/size*2*np.pi))
if i > 0:
ps.add_line(f'l{i}', f'b{i}', f'b{i-1}',
x=0.1,
r=0.01, # Added small resistance
capacity=1000)
# Create equivalent PyPSA network
n = pypsa.Network()
for i in range(size):
n.add("Bus", f"b{i}", carrier="AC")
n.add("Generator", f"g{i}",
bus=f"b{i}",
p_nom=1000,
marginal_cost=50+i/size*100)
n.add("Load", f"l{i}",
bus=f"b{i}",
p_set=500 + 100*np.sin(i/size*2*np.pi))
if i > 0:
n.add("Line", f"l{i}",
bus0=f"b{i}",
bus1=f"b{i-1}",
x=0.1,
r=0.01,
s_nom=1000)
# GPU Timing with memory management
start = time.perf_counter()
gpu_opt = GPUOptimizer(ps)
gpu_results = gpu_opt.optimize()
gpu_time = time.perf_counter() - start
# Force GPU sync and cleanup
cp.cuda.Stream.null.synchronize()
mempool = cp.get_default_memory_pool()
pinned_mempool = cp.get_default_pinned_memory_pool()
mempool.free_all_blocks()
pinned_mempool.free_all_blocks()
# CPU Timing
start = time.perf_counter()
n.optimize()
cpu_time = time.perf_counter() - start
results.append({
'size': size,
'cpu_time': cpu_time,
'gpu_time': gpu_time,
'speedup': cpu_time/gpu_time,
'cpu_obj': float(n.objective),
'gpu_obj': float(gpu_results['cost']),
'status': 'success'
})
except Exception as e:
print(f"Error at size {size}: {str(e)}")
results.append({
'size': size,
'cpu_time': None,
'gpu_time': None,
'speedup': None,
'cpu_obj': None,
'gpu_obj': None,
'status': 'failed'
})
return pd.DataFrame(results)
if __name__ == "__main__":
print("Running extended benchmarks...")
# Test with larger range
sizes = [10, 100, 500, 1000, 2000, 5000, 10000, 20000]
results = benchmark_systems(sizes)
print("\nDetailed Results:")
print(results.round(3))
# Create visualization
successful_results = results[results['status'] == 'success']
plt.figure(figsize=(15, 5))
plt.subplot(121)
plt.plot(successful_results['size'], successful_results['cpu_time'], 'b-', label='CPU Time')
plt.plot(successful_results['size'], successful_results['gpu_time'], 'r-', label='GPU Time')
plt.xlabel('System Size (nodes)')
plt.ylabel('Time (seconds)')
plt.title('Execution Time vs System Size')
plt.legend()
plt.xscale('log')
plt.yscale('log')
plt.subplot(122)
plt.plot(successful_results['size'], successful_results['speedup'], 'g-')
plt.xlabel('System Size (nodes)')
plt.ylabel('Speedup (x)')
plt.title('GPU Speedup vs System Size')
plt.xscale('log')
plt.tight_layout()
plt.show()
print("\nGPU Memory Stats:")
print(cp.cuda.runtime.memGetInfo())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment