Created
April 13, 2025 19:23
-
-
Save saravr/989b43939613e88f14502522b0416378 to your computer and use it in GitHub Desktop.
Sample MCP OpenAI call
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Courtesy: Perplexity.AI | |
# | |
import json | |
from openai import OpenAI | |
class AutoMCPClient: | |
def __init__(self, api_key, model="gpt-4-0613"): | |
self.client = OpenAI(api_key=api_key) | |
self.model = model | |
self.functions = [ | |
{ | |
"name": "mcp_query", | |
"description": "General-purpose MCP protocol handler", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"operation": {"type": "string"}, | |
"parameters": {"type": "object"} | |
}, | |
"required": ["operation"] | |
} | |
} | |
] | |
def execute_mcp_operation(self, operation, parameters): | |
"""Mock MCP operation handler - replace with actual MCP server calls""" | |
print(f"Executing MCP operation: {operation}") | |
return { | |
"current_weather": {"temperature": 72, "unit": "F", "description": "sunny"}, | |
"stock_price": {"price": 150.25, "currency": "USD"}, | |
}.get(operation, {"error": "Unknown operation"}) | |
def get_response(self, prompt): | |
messages = [{"role": "user", "content": prompt}] | |
while True: | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=messages, | |
functions=self.functions, | |
function_call="auto" | |
) | |
message = response.choices[0].message | |
messages.append(message) | |
if message.function_call: | |
# Parse MCP request | |
args = json.loads(message.function_call.arguments) | |
result = self.execute_mcp_operation( | |
args["operation"], | |
args.get("parameters", {}) | |
) | |
# Add function result to conversation | |
messages.append({ | |
"role": "function", | |
"name": message.function_call.name, | |
"content": json.dumps(result) | |
}) | |
else: | |
# Final response reached | |
try: | |
return json.loads(message.content.strip()) | |
except json.JSONDecodeError: | |
return {"error": "Invalid JSON", "raw_response": message.content} | |
# Usage Example | |
client = AutoMCPClient(api_key="your-api-key") | |
# First query requiring MCP operation | |
response = client.get_response( | |
"What's the current weather in San Francisco and AAPL stock price?" | |
) | |
print(response) | |
# Direct query without MCP calls | |
response = client.get_response("Just return a test JSON: {'status': 'ok'}") | |
print(response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment