Created
December 21, 2019 09:10
-
-
Save snuke/492557822a885430ccd3deeb553c1556 to your computer and use it in GitHub Desktop.
天下一GameBattleContest(β)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
辺をランダムに取得し、点数を計算するサンプルプログラムです。 | |
実行には python3 環境が必要です。 | |
TOKEN 変数を書き換えて実行してください。 | |
""" | |
import os | |
import random | |
from typing import NamedTuple, List, Dict | |
from urllib.request import urlopen | |
import time | |
import copy | |
import heapq | |
# ゲームサーバのアドレス | |
GAME_SERVER = os.getenv('GAME_SERVER', 'https://obt.tenka1.klab.jp') | |
# あなたのトークン | |
TOKEN = os.getenv('TOKEN', '79c60c45bcc95e16226801122af377975f09d52b92e18db8ac4a09a5d0383562') | |
# 頂点と辺の数 | |
NUM_NODES = 400 | |
NUM_EDGES = 760 | |
edge_set = set() | |
def call_api(x): | |
with urlopen(f'{GAME_SERVER}{x}') as res: | |
return res.read().decode() | |
def main(): | |
global edge_set | |
while True: | |
# GET /api/game | |
game = call_api('/api/game') | |
game_id, remining_ms = list(map(int, game.split())) | |
print(f"ゲームID {game_id}") | |
print(f"残り時間 {remining_ms}ms") | |
if game_id < 0: | |
break | |
edge_set = set() | |
timer = time.time() | |
suc_num,tot_num = 0,0 | |
while 0 < remining_ms: | |
# GET /api/stage/<game_id> | |
stage_resp = call_api(f'/api/stage/{game_id}') | |
stage = list(map(int, stage_resp.split())) | |
num_source = stage[0] | |
num_sink = stage[1] | |
sources = stage[2:2 + num_source] | |
sinks = stage[2 + num_source:2 + num_source + num_sink] | |
caps = stage[2 + num_source + num_sink:] | |
assert len(caps) == NUM_EDGES | |
# 辺の取得状況を確認し、スコアを計算します | |
# GET /api/edges/<token>/<game_id> | |
edges_resp = call_api(f'/api/edges/{TOKEN}/{game_id}').strip().split('\n') | |
if edges_resp[0] == "ok": | |
claim_counts = list(map(int, edges_resp[1].split())) | |
my_claims = list(map(int, edges_resp[2].split())) | |
score = calculate_score(sources, sinks, caps, claim_counts, my_claims) | |
# print("スコア", score) | |
for ei in edge_set: | |
if my_claims[ei] == 0: | |
my_claims[ei] = 1 | |
claim_counts[ei] += 1 | |
# 辺をランダムに選び取得します | |
edge_index = solve(sources, sinks, caps, claim_counts, my_claims) | |
# GET /api/claim/<token>/<game_id>/<edge_index> | |
claim_resp = call_api(f'/api/claim/{TOKEN}/{game_id}/{edge_index}').strip() | |
# print(claim_resp) | |
# API制限のため少しの間待ちます | |
# time.sleep(0.5) | |
while time.time()-timer < 0.54: | |
time.sleep(0.01) | |
pre_timer,timer = timer,time.time() | |
delta = timer-pre_timer | |
print(delta) | |
tot_num += 1 | |
if claim_resp == "ok": | |
edge_set.add(edge_index) | |
# print(f"辺取得成功 {edge_index}") | |
suc_num += 1 | |
pass | |
elif claim_resp == "game_finished": | |
break | |
else: | |
print(f"辺取得失敗 {edge_index} {claim_resp} !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
pass | |
timer = time.time() | |
print(f'{suc_num} / {tot_num} : {suc_num/max(1,tot_num)}') | |
# cf. https://github.com/spaghetti-source/algorithm/blob/9cca6b826f19ed7e42dd326a4fbbb9f4d34f04d3/graph/maximum_flow_dinic.cc | |
INF = 1 << 50 | |
MX = 20 | |
class Edge(NamedTuple): | |
src: int | |
dst: int | |
capacity: int | |
rev: int | |
l: int | |
i: int | |
class Graph: | |
def __init__(self, n: int, sources: List[int], sinks: List[int]): | |
self.n = n | |
self.adj: List[List[Edge]] = [[] for _ in range(n)] | |
self.flow: Dict[Edge, int] = {} | |
self.sources = sources[:] | |
self.sinks = sinks[:] | |
self.now_l = 0 | |
self.es: List[Edge] = [] | |
def add_edge(self, src: int, dst: int, capacity: int): | |
e1 = Edge(src, dst, capacity, len(self.adj[dst]), self.now_l, len(self.es)) | |
self.adj[src].append(e1) | |
self.flow[e1] = 0 | |
self.es.append(e1) | |
e2 = Edge(dst, src, 0, len(self.adj[src])-1, self.now_l, len(self.es)) | |
self.adj[dst].append(e2) | |
self.flow[e2] = 0 | |
self.es.append(e2) | |
def max_flow(self, s: int, t: int): | |
level = [0 for _ in range(self.n)] | |
iter = [0 for _ in range(self.n)] | |
def levelize(): | |
for i in range(len(level)): | |
level[i] = -1 | |
level[s] = 0 | |
q = [s] | |
while q: | |
u, q = q[0], q[1:] | |
if u == t: | |
break | |
for e in self.adj[u]: | |
if e.capacity > self.flow[e] and level[e.dst] < 0: | |
q.append(e.dst) | |
level[e.dst] = level[u] + 1 | |
return level[t] | |
def augment(u, cur): | |
if u == t: return cur | |
while iter[u] < len(self.adj[u]): | |
e = self.adj[u][iter[u]] | |
r = self.adj[e.dst][e.rev] | |
if e.capacity > self.flow[e] and level[u] < level[e.dst]: | |
f = augment(e.dst, min(cur, e.capacity - self.flow[e])) | |
if f > 0: | |
self.flow[e] += f | |
self.flow[r] -= f | |
return f | |
iter[u] += 1 | |
return 0 | |
flow = 0 | |
while levelize() >= 0: | |
iter = [0 for _ in range(self.n)] | |
while True: | |
f = augment(s, INF) | |
if f <= 0: | |
break | |
flow += f | |
return flow | |
def scores(self): | |
res = {} | |
for v in range(self.n): | |
for e in self.adj[v]: | |
res[(e.src, e.dst)] = self.flow[e] | |
return res | |
def solve(self): | |
dp = [[(0,-1) for j in range(MX+1)] for i in range(self.n)] | |
q = [[] for i in range(MX+1)] | |
for v in self.sources: | |
dp[v][0] = (INF,-1) | |
heapq.heappush(q[0], (INF,v)) | |
for l in range(MX): | |
while len(q[l]) > 0: | |
_d,v = heapq.heappop(q[l]) | |
d = dp[v][l][0] | |
if d != _d: | |
continue | |
for e in self.adj[v]: | |
u = e.dst | |
nd = min(d, e.capacity-self.flow[e]) | |
nl = l + e.l | |
if dp[u][nl][0] >= nd: | |
continue | |
dp[u][nl] = (nd,e.i) | |
heapq.heappush(q[nl], (nd,u)) | |
best = (0,0,0) | |
for v in self.sinks: | |
for l in range(1,MX+1): | |
score = dp[v][l][0]/l | |
best = max(best, (score,v,l)) | |
_,v,l = best | |
res = set() | |
while l > 0: | |
ei = dp[v][l][1] | |
e = self.es[ei] | |
if e.l == 1: | |
res.add((e.src, e.dst)) | |
v = e.src | |
l -= e.l | |
# print(best, res) | |
return res | |
# フローを流して点数を計算します | |
def calculate_score(sources, sinks, caps, claim_counts, my_claims): | |
ratio = 1000 | |
g = Graph(NUM_NODES + 2, sources, sinks) | |
# 各始点に向けて容量無限の辺を貼る | |
for source_node in sources: | |
g.add_edge(NUM_NODES, source_node, INF) | |
# 各終点から容量無限の辺を貼る | |
for sink_node in sinks: | |
g.add_edge(sink_node, NUM_NODES + 1, INF) | |
# 自分が取得した辺を貼る | |
for edge_index in range(NUM_EDGES): | |
if not my_claims[edge_index]: | |
continue | |
q = edge_index // 39 | |
r = edge_index % 39 | |
if r < 19: | |
v1 = 20 * q + r | |
v2 = 20 * q + r + 1 | |
else: | |
v1 = 20 * q + r - 19 | |
v2 = 20 * q + r - 19 + 20 | |
cap = caps[edge_index] * ratio // claim_counts[edge_index] | |
g.add_edge(v1, v2, cap) | |
g.add_edge(v2, v1, cap) | |
return g.max_flow(NUM_NODES, NUM_NODES + 1) / ratio | |
# solve | |
def getV(edge_index): | |
q = edge_index // 39 | |
r = edge_index % 39 | |
if r < 19: | |
v1 = 20 * q + r | |
v2 = 20 * q + r + 1 | |
else: | |
v1 = 20 * q + r - 19 | |
v2 = 20 * q + r - 19 + 20 | |
return v1, v2 | |
def solve(sources, sinks, caps, claim_counts, my_claims): | |
ratio = 1000 | |
g = Graph(NUM_NODES + 2, sources, sinks) | |
# 各始点に向けて容量無限の辺を貼る | |
for source_node in sources: | |
g.add_edge(NUM_NODES, source_node, INF) | |
# 各終点から容量無限の辺を貼る | |
for sink_node in sinks: | |
g.add_edge(sink_node, NUM_NODES + 1, INF) | |
# 自分が取得した辺を貼る | |
for edge_index in range(NUM_EDGES): | |
if not my_claims[edge_index]: | |
continue | |
v1,v2 = getV(edge_index) | |
cap = caps[edge_index] * ratio // claim_counts[edge_index] | |
g.add_edge(v1, v2, cap) | |
g.add_edge(v2, v1, cap) | |
g.max_flow(NUM_NODES, NUM_NODES + 1) | |
g.now_l = 1 | |
for edge_index in range(NUM_EDGES): | |
if my_claims[edge_index]: | |
continue | |
v1,v2 = getV(edge_index) | |
cap = caps[edge_index] * ratio // (claim_counts[edge_index]+1) | |
g.add_edge(v1, v2, cap) | |
g.add_edge(v2, v1, cap) | |
# g.max_flow(NUM_NODES, NUM_NODES + 1) | |
# scores = g.scores() | |
# ans = (-100,0) | |
# for edge_index in range(NUM_EDGES): | |
# if my_claims[edge_index]: | |
# continue | |
# v1,v2 = getV(edge_index) | |
# score = max(scores[(v1,v2)], scores[(v2,v1)]) | |
# ans = max(ans, (score, edge_index)) | |
es = [i for i in range(NUM_EDGES)] | |
random.shuffle(es) | |
st = g.solve() | |
for edge_index in es: | |
if my_claims[edge_index]: | |
continue | |
v1,v2 = getV(edge_index) | |
if (v1,v2) in st or (v2,v1) in st: | |
return edge_index | |
return random.randint(0,NUM_EDGES-1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
間違えて最小ヒープを使っているので計算量が壊れているということに気づきました。
一応正しい答えは出すし、ランダムな小さいグラフなので大丈夫ではありますが。