Skip to content

Instantly share code, notes, and snippets.

@nadavkav
Created February 7, 2026 12:58
Show Gist options
  • Select an option

  • Save nadavkav/bf3403ca44c59f693f5ef38e9b8350e0 to your computer and use it in GitHub Desktop.

Select an option

Save nadavkav/bf3403ca44c59f693f5ef38e9b8350e0 to your computer and use it in GitHub Desktop.
Craft agents proxy to AWS bedrock claude models
import os
import time
import uuid
from typing import Any, Dict, List, Optional, Union
import boto3
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
import json
from sse_starlette.sse import EventSourceResponse
# I am using the best AWS Bedrock region us-west-2
AWS_REGION = os.getenv("AWS_REGION", "us-west-2")
BEDROCK_MODEL_ID = os.getenv("BEDROCK_MODEL_ID", "anthropic.claude-sonnet-4-5-20250929-v1:0")
# 123123 - replace with your account user id number, and also use your own profile name - my-claude45-test
INFERENCE_PROFILE_ARN = os.getenv("INFERENCE_PROFILE_ARN", "arn:aws:bedrock:us-west-2:123123:application-inference-profile/my-claude45-test") # optional
if not (BEDROCK_MODEL_ID or INFERENCE_PROFILE_ARN):
raise RuntimeError("Set BEDROCK_MODEL_ID or INFERENCE_PROFILE_ARN")
bedrock = boto3.client("bedrock-runtime", region_name=AWS_REGION)
app = FastAPI(title="Anthropic /v1/messages -> Bedrock Converse Proxy")
# ----- Helpers -----
def build_anthropic_message_response(
*,
text: str,
model: str,
input_tokens: Optional[int],
output_tokens: Optional[int],
inference_geo: Optional[str] = None,
stop_reason: str = "end_turn",
) -> Dict[str, Any]:
return {
"id": f"msg_{uuid.uuid4().hex}",
"type": "message",
"role": "assistant",
"model": model,
"content": [
{
"type": "text",
"text": text,
# Craft/Anthropic schema allows citations; empty list is ok.
"citations": [],
}
],
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
# These are in your example; keep them even if you don't use caching.
"cache_creation": {
"ephemeral_1h_input_tokens": 0,
"ephemeral_5m_input_tokens": 0,
},
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"inference_geo": inference_geo or None,
"input_tokens": input_tokens or 0,
"output_tokens": output_tokens or 0,
"server_tool_use": {
"web_search_requests": 0,
},
"service_tier": "standard",
},
}
def extract_bedrock_text_usage(resp: Dict[str, Any]) -> tuple[str, Optional[int], Optional[int], Optional[str], str]:
"""
Returns: (text, input_tokens, output_tokens, inference_geo, stop_reason)
Works for:
- bedrock-runtime converse response
- your wrapped "ModelInvocationLog" shape (outputBodyJson...)
"""
# Standard converse response
out_content = (
(resp.get("output", {}) or {})
.get("message", {})
.get("content", [])
)
usage = resp.get("usage", {}) or {}
stop = resp.get("stopReason")
# Wrapped invocation log shape
if not out_content and "outputBodyJson" in resp:
obj = resp.get("outputBodyJson", {}) or {}
out_content = (
(obj.get("output", {}) or {})
.get("message", {})
.get("content", [])
)
usage = obj.get("usage", {}) or usage
stop = obj.get("stopReason", stop)
inference_geo = obj.get("inference_geo") or resp.get("inferenceRegion")
else:
inference_geo = resp.get("inference_geo") or resp.get("inferenceRegion")
text = "".join([c.get("text", "") for c in out_content if isinstance(c, dict) and "text" in c])
input_tokens = usage.get("inputTokens")
output_tokens = usage.get("outputTokens")
stop_reason = _stop_reason_from_bedrock(stop) if isinstance(stop, str) else "end_turn"
return text, input_tokens, output_tokens, inference_geo, stop_reason
def _extract_text_from_content(content: Any) -> str:
"""
Accepts:
- "string"
- [{"type":"text","text":"..."}] (and ignores extra keys)
Returns concatenated text.
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(item.get("text", ""))
return "".join(parts)
return ""
def _anthropic_content_to_bedrock_content(content: Any) -> List[Dict[str, Any]]:
"""
Bedrock Converse: [{"text":"..."}] for text.
We only support text for now.
"""
text = _extract_text_from_content(content)
return [{"text": text}]
def _system_to_bedrock(system: Any) -> Optional[List[Dict[str, Any]]]:
if system is None:
return None
text = _extract_text_from_content(system)
if text == "":
return None
return [{"text": text}]
def _stop_reason_from_bedrock(sr: Optional[str]) -> str:
if not sr:
return "end_turn"
s = sr.lower()
if "max" in s:
return "max_tokens"
if "stop" in s:
return "stop_sequence"
return "end_turn"
def _estimate_tokens(text: str) -> int:
"""
Rough token estimator for planning.
Anthropic tokens differ, but Craft mainly needs a stable estimate.
Rule of thumb: ~4 chars/token (English); Hebrew often a bit fewer chars/token.
We'll clamp to at least 1 when text is non-empty.
"""
if not text:
return 0
est = max(1, int(len(text) / 4))
return est
def _get_model_id(requested_model: Optional[str]) -> str:
# If you want alias mapping, add it here.
# MODEL_MAP = {"claude-3-5-sonnet-latest": "anthropic.claude-3-5-sonnet-20240620-v1:0"}
# return INFERENCE_PROFILE_ARN or MODEL_MAP.get(requested_model, BEDROCK_MODEL_ID)
return INFERENCE_PROFILE_ARN or (requested_model or BEDROCK_MODEL_ID)
# ----- Endpoints -----
@app.post("/v1/messages/count_tokens")
async def count_tokens(req: Request):
"""
Anthropic-compatible: POST /v1/messages/count_tokens
Craft calls it with ?beta=true (ignored).
Returns: {"input_tokens": N}
"""
body = await req.json()
#print("CRAFT BODY:", body)
# Craft sends tools for agentic flows. We’re running a simple chat proxy,
# so ignore tools and force plain-text answers.
body.pop("tools", None)
body.pop("tool_choice", None)
body.pop("tool_choice", None)
body.pop("tool_choice", None) # harmless duplicate; keep just one if you want
# Force a system prompt that forbids tool use.
forced_system = (
"You are a plain chat assistant. "
"Do NOT call tools or request tool use. "
"Respond with normal text only."
)
existing_system = body.get("system")
if existing_system is None:
body["system"] = forced_system
elif isinstance(existing_system, str):
body["system"] = forced_system + "\n\n" + existing_system
elif isinstance(existing_system, list):
# Convert existing list to text-only and prepend
body["system"] = [{"type": "text", "text": forced_system}] + existing_system
else:
body["system"] = forced_system
system_text = _extract_text_from_content(body.get("system"))
msgs = body.get("messages") or []
msg_texts = []
if isinstance(msgs, list):
for m in msgs:
if isinstance(m, dict):
msg_texts.append(_extract_text_from_content(m.get("content")))
all_text = system_text + "\n" + "\n".join(msg_texts)
return {"input_tokens": _estimate_tokens(all_text)}
@app.post("/v1/messages")
async def v1_messages(req: Request):
"""
Anthropic-compatible: POST /v1/messages
Craft calls it with ?beta=true (ignored).
We parse loosely to avoid 422 from strict schemas.
"""
body = await req.json()
requested_model = body.get("model")
model_id = _get_model_id(requested_model)
max_tokens = body.get("max_tokens")
if not isinstance(max_tokens, int) or max_tokens <= 0:
max_tokens = 1024
temperature = body.get("temperature")
top_p = body.get("top_p")
stop_sequences = body.get("stop_sequences")
# Convert messages
msgs = body.get("messages")
if not isinstance(msgs, list) or len(msgs) == 0:
raise HTTPException(status_code=400, detail="Missing or invalid 'messages' array")
bedrock_messages: List[Dict[str, Any]] = []
for m in msgs:
if not isinstance(m, dict):
continue
role = m.get("role")
# Craft sometimes includes odd roles; handle lightly
if role == "system":
# If they send system as a message, merge it into system text
# (We'll just ignore here; prefer the top-level 'system' key)
continue
if role not in ("user", "assistant"):
# Ignore unknown roles instead of hard-failing (helps compatibility)
continue
bedrock_messages.append(
{
"role": role,
"content": _anthropic_content_to_bedrock_content(m.get("content")),
}
)
if not bedrock_messages:
raise HTTPException(status_code=400, detail="No supported messages found (need user/assistant roles)")
converse_kwargs: Dict[str, Any] = {
"modelId": model_id,
"messages": bedrock_messages,
"inferenceConfig": {"maxTokens": max_tokens},
}
if isinstance(temperature, (int, float)):
converse_kwargs["inferenceConfig"]["temperature"] = float(temperature)
if isinstance(top_p, (int, float)):
converse_kwargs["inferenceConfig"]["topP"] = float(top_p)
if isinstance(stop_sequences, list) and all(isinstance(s, str) for s in stop_sequences):
converse_kwargs["inferenceConfig"]["stopSequences"] = stop_sequences
system_content = _system_to_bedrock(body.get("system"))
if system_content is not None:
converse_kwargs["system"] = system_content
try:
resp = bedrock.converse(**converse_kwargs)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Bedrock error: {type(e).__name__}: {str(e)}")
text, in_tok, out_tok, inference_geo, stop_reason = extract_bedrock_text_usage(resp)
payload = build_anthropic_message_response(
text=text if text.strip() else "(empty)",
model=requested_model or model_id,
input_tokens=in_tok,
output_tokens=out_tok,
inference_geo=inference_geo,
stop_reason=stop_reason,
)
return JSONResponse(content=payload, media_type="application/json")
out_msg = resp.get("output", {}).get("message", {})
out_content = out_msg.get("content", [])
text_parts = []
for c in out_content:
if isinstance(c, dict) and "text" in c:
text_parts.append(c["text"])
final_text = "".join(text_parts)
if not final_text.strip():
final_text = "(No text output returned by the model.)"
usage = resp.get("usage", {}) or {}
payload = {
"id": f"msg_{uuid.uuid4().hex}",
"type": "message",
"role": "assistant",
"model": requested_model or "unknown",
"content": [{"type": "text", "text": final_text}],
"stop_reason": _stop_reason_from_bedrock(resp.get("stopReason")),
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("inputTokens"),
"output_tokens": usage.get("outputTokens"),
},
# Some clients look for this:
"created": int(time.time()),
}
print("RETURNING TO CRAFT:", payload) # temporary debug
return JSONResponse(content=payload, media_type="application/json")
# Streaming (disabled)
@app.post("/v1/messages")
async def v1_messages_streaming(req: Request):
body = await req.json()
# Many clients add beta=true in querystring; ignore.
stream = bool(body.get("stream", False))
requested_model = body.get("model")
model_id = _get_model_id(requested_model)
max_tokens = body.get("max_tokens")
if not isinstance(max_tokens, int) or max_tokens <= 0:
max_tokens = 1024
# Ignore Craft tools for now (text-only proxy)
body.pop("tools", None)
body.pop("tool_choice", None)
# Force text-only
forced_system = "Respond with normal text only. Do NOT call tools."
existing_system = body.get("system")
if existing_system is None:
body["system"] = forced_system
elif isinstance(existing_system, str):
body["system"] = forced_system + "\n\n" + existing_system
msgs = body.get("messages") or []
bedrock_messages = []
for m in msgs:
if isinstance(m, dict) and m.get("role") in ("user", "assistant"):
bedrock_messages.append(
{"role": m["role"], "content": _anthropic_content_to_bedrock_content(m.get("content"))}
)
if not bedrock_messages:
raise HTTPException(status_code=400, detail="No supported messages found")
inference_cfg = {"maxTokens": max_tokens}
if isinstance(body.get("temperature"), (int, float)):
inference_cfg["temperature"] = float(body["temperature"])
if isinstance(body.get("top_p"), (int, float)):
inference_cfg["topP"] = float(body["top_p"])
if isinstance(body.get("stop_sequences"), list):
inference_cfg["stopSequences"] = [s for s in body["stop_sequences"] if isinstance(s, str)]
kwargs = {
"modelId": model_id,
"messages": bedrock_messages,
"inferenceConfig": inference_cfg,
}
sys_block = _system_to_bedrock(body.get("system"))
if sys_block is not None:
kwargs["system"] = sys_block
# -------------------------
# Streaming (SSE) response
# -------------------------
if stream or ("text/event-stream" in (req.headers.get("accept") or "")):
msg_id = f"msg_{uuid.uuid4().hex}"
created = int(time.time())
async def gen():
# 1) message_start
yield {
"event": "message_start",
"data": {
"type": "message_start",
"message": {
"id": msg_id,
"type": "message",
"role": "assistant",
"model": requested_model or model_id,
"content": [],
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": None, "output_tokens": None},
"created": created,
},
},
}
# 2) content_block_start (text block index 0)
yield {
"event": "content_block_start",
"data": {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""},
},
}
out_text = []
stop_reason = "end_turn"
usage_in = None
usage_out = None
try:
resp_stream = bedrock.converse_stream(**kwargs)
except Exception as e:
yield {
"event": "error",
"data": {"type": "error", "error": {"message": str(e)}},
}
return
# Bedrock stream events vary a bit; handle common shapes
for ev in resp_stream.get("stream", []):
if not isinstance(ev, dict):
continue
# text delta
cbd = ev.get("contentBlockDelta")
if isinstance(cbd, dict):
delta = cbd.get("delta") or {}
if isinstance(delta, dict) and "text" in delta:
t = delta["text"]
out_text.append(t)
yield {
"event": "content_block_delta",
"data": {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": t},
},
}
# stop / metadata (usage)
ms = ev.get("messageStop")
if isinstance(ms, dict):
# Sometimes includes stopReason
sr = ms.get("stopReason")
if isinstance(sr, str) and sr:
stop_reason = _stop_reason_from_bedrock(sr)
md = ev.get("metadata")
if isinstance(md, dict):
u = md.get("usage")
if isinstance(u, dict):
usage_in = u.get("inputTokens")
usage_out = u.get("outputTokens")
# 3) content_block_stop
yield {
"event": "content_block_stop",
"data": {"type": "content_block_stop", "index": 0},
}
# 4) message_delta (IMPORTANT for some clients)
yield {
"event": "message_delta",
"data": {
"type": "message_delta",
"delta": {
"stop_reason": stop_reason,
"stop_sequence": None,
},
"usage": {
"output_tokens": usage_out,
},
},
}
# 5) message_stop
yield {
"event": "message_stop",
"data": {"type": "message_stop"},
}
# Important: EventSourceResponse handles SSE framing correctly
return EventSourceResponse(gen())
# -------------------------
# Non-streaming response
# -------------------------
try:
resp = bedrock.converse(**kwargs)
except Exception as e:
raise HTTPException(status_code=502, detail=f"Bedrock error: {type(e).__name__}: {e}")
out_msg = resp.get("output", {}).get("message", {})
out_content = out_msg.get("content", [])
text = "".join([c.get("text", "") for c in out_content if isinstance(c, dict) and "text" in c])
usage = resp.get("usage", {}) or {}
payload = {
"id": f"msg_{uuid.uuid4().hex}",
"type": "message",
"role": "assistant",
"model": requested_model or model_id,
"content": [{"type": "text", "text": text}],
"stop_reason": _stop_reason_from_bedrock(resp.get("stopReason")),
"stop_sequence": None,
"usage": {
"input_tokens": usage.get("inputTokens"),
"output_tokens": usage.get("outputTokens"),
},
"created": int(time.time()),
}
return JSONResponse(content=payload, media_type="application/json")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment