Last active
November 30, 2024 17:59
-
-
Save mkeen/4f092ddd660585ebe1968c8dc8dac083 to your computer and use it in GitHub Desktop.
DSX Beta 3.1 + Gemini for AI-Driven Custom Haptics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mss import mss | |
import requests | |
import google.generativeai | |
import os | |
import time | |
import socket | |
import sys | |
import json | |
import win32gui | |
import win32con | |
import win32ui | |
from ctypes import windll | |
from PIL import Image | |
from typing import List, Optional, Tuple, Dict | |
from enum import IntEnum | |
class Config: | |
DSX_IP = '127.0.0.1' | |
DSX_PORT = 6969 | |
SCREENSHOT_FILE = "star_citizen.png" | |
SCREENSHOT_INTERVAL = 1 # seconds | |
API_KEY = "YOUR KEY HERE" | |
MODEL_NAME = "gemini-1.5-pro" | |
FLASH_MODEL = "gemini-1.5-flash" | |
CONTROLLER_INDEX = 0 | |
WINDOW_TITLE = "Star Citizen " # The window title to look for | |
RETRY_INTERVAL = 2 # seconds to wait between window checks | |
class InstructionType(IntEnum): | |
TriggerUpdate = 1 | |
RGBUpdate = 2 | |
PlayerLED = 3 | |
TriggerThreshold = 4 | |
Rumble = 5 | |
class Trigger(IntEnum): | |
Left = 1 | |
Right = 2 | |
class TriggerMode(IntEnum): | |
Normal = 0 | |
GameCube = 1 | |
VerySoft = 2 | |
Soft = 3 | |
Hard = 4 | |
VeryHard = 5 | |
Hardest = 6 | |
Rigid = 7 | |
VibrateTrigger = 8 | |
Choppy = 9 | |
Medium = 10 | |
VibrateTriggerPulse = 11 | |
CustomTriggerValue = 12 | |
Resistance = 13 | |
Bow = 14 | |
Galloping = 15 | |
SemiAutomaticGun = 16 | |
AutomaticGun = 17 | |
Machine = 18 | |
class DSCommands: | |
@staticmethod | |
def create_trigger_packet(mode: TriggerMode, trigger: Trigger = Trigger.Left, *args) -> dict: | |
"""Creates a packet exactly matching the C# format""" | |
instruction = { | |
"type": InstructionType.TriggerUpdate, | |
"parameters": [Config.CONTROLLER_INDEX, int(trigger), int(mode)] + list(args) | |
} | |
return {"instructions": [instruction]} | |
@staticmethod | |
def create_rgb_packet(r: int, g: int, b: int) -> dict: | |
"""Creates a packet for RGB LED control""" | |
instruction = { | |
"type": InstructionType.RGBUpdate, | |
"parameters": [Config.CONTROLLER_INDEX, r, g, b] | |
} | |
return {"instructions": [instruction]} | |
@staticmethod | |
def get_test_commands() -> List[dict]: | |
"""Generate a single simple test command""" | |
return [ | |
# Just test the VibrateTrigger mode with full strength | |
{"instructions": [{"type": 1, "parameters": [0, 1, 8, 255]}]} | |
] | |
@staticmethod | |
def create_rumble_packet(light_motor: int, heavy_motor: int) -> dict: | |
"""Creates a packet for rumble control | |
Args: | |
light_motor: Intensity for the light (high-frequency) motor (0-255) | |
heavy_motor: Intensity for the heavy (low-frequency) motor (0-255) | |
""" | |
instruction = { | |
"type": InstructionType.Rumble, | |
"parameters": [Config.CONTROLLER_INDEX, light_motor, heavy_motor] | |
} | |
return {"instructions": [instruction]} | |
class StarCitizenHaptics: | |
def __init__(self): | |
self.setup_ai() | |
self.setup_socket() | |
self.screen = mss() | |
self.last_color = None | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.Normal, Trigger.Left)) | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.Normal, Trigger.Right)) | |
def setup_ai(self) -> None: | |
"""Configure the AI API.""" | |
try: | |
google.generativeai.configure(api_key=Config.API_KEY) | |
self.model = google.generativeai.GenerativeModel(model_name=Config.MODEL_NAME) | |
self.flash_model = google.generativeai.GenerativeModel(Config.FLASH_MODEL) | |
print("AI models initialized successfully") | |
except Exception as e: | |
print(f"Failed to initialize AI: {e}") | |
sys.exit(1) | |
def setup_socket(self) -> None: | |
"""Initialize UDP socket for DSX communication.""" | |
try: | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
print(f"Socket initialized. Target: {Config.DSX_IP}:{Config.DSX_PORT}") | |
except Exception as e: | |
print(f"Failed to create socket: {e}") | |
sys.exit(1) | |
def list_window_names(self): | |
def winEnumHandler(hwnd, ctx): | |
if win32gui.IsWindowVisible(hwnd): | |
print(hex(hwnd), '"' + win32gui.GetWindowText(hwnd) + '"') | |
win32gui.EnumWindows(winEnumHandler, None) | |
def get_window_rect(self) -> Optional[Dict[str, int]]: | |
""" | |
Find the Star Citizen window and return its coordinates, even when not focused. | |
Returns None if window is not found. | |
""" | |
def callback(hwnd, windows): | |
window_title = win32gui.GetWindowText(hwnd) | |
# Use exact match instead of 'in' | |
if window_title == Config.WINDOW_TITLE: | |
print(f"Found Star Citizen window: '{window_title}'") # Debug message | |
rect = win32gui.GetWindowRect(hwnd) | |
client_rect = win32gui.GetClientRect(hwnd) | |
# Calculate border sizes | |
border_width = ((rect[2] - rect[0]) - client_rect[2]) // 2 | |
title_height = ((rect[3] - rect[1]) - client_rect[3]) - border_width | |
# Store the client area coordinates | |
windows['rect'] = { | |
'left': rect[0] + border_width, | |
'top': rect[1] + title_height, | |
'width': client_rect[2], | |
'height': client_rect[3] | |
} | |
window_info = {'rect': None} | |
win32gui.EnumWindows(callback, window_info) | |
return window_info['rect'] | |
def take_screenshot(self) -> Optional[str]: | |
"""Capture Star Citizen window content even when not visible.""" | |
try: | |
# Find the Star Citizen window | |
hwnd = win32gui.FindWindow(None, Config.WINDOW_TITLE) | |
if not hwnd: | |
return None | |
# Get the window's dimensions | |
rect = win32gui.GetClientRect(hwnd) | |
width = rect[2] | |
height = rect[3] | |
# Create device contexts and bitmap | |
hwnd_dc = win32gui.GetWindowDC(hwnd) | |
mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc) | |
save_dc = mfc_dc.CreateCompatibleDC() | |
save_bitmap = win32ui.CreateBitmap() | |
save_bitmap.CreateCompatibleBitmap(mfc_dc, width, height) | |
save_dc.SelectObject(save_bitmap) | |
# Print window content directly to our bitmap | |
# PW_CLIENTONLY = 1 | PW_RENDERFULLCONTENT = 2 | |
# Combined flags (3) ensure we get the full window content regardless of visibility | |
result = windll.user32.PrintWindow(hwnd, save_dc.GetSafeHdc(), 3) | |
if not result: | |
raise Exception("PrintWindow failed to capture window content") | |
# Convert bitmap to PIL Image | |
bmpinfo = save_bitmap.GetInfo() | |
bmpstr = save_bitmap.GetBitmapBits(True) | |
image = Image.frombuffer( | |
'RGB', | |
(bmpinfo['bmWidth'], bmpinfo['bmHeight']), | |
bmpstr, 'raw', 'BGRX', 0, 1) | |
# Save and cleanup | |
image.save(Config.SCREENSHOT_FILE) | |
# Cleanup | |
win32gui.DeleteObject(save_bitmap.GetHandle()) | |
save_dc.DeleteDC() | |
mfc_dc.DeleteDC() | |
win32gui.ReleaseDC(hwnd, hwnd_dc) | |
return Config.SCREENSHOT_FILE | |
except Exception as e: | |
print(f"Failed to take screenshot: {e}") | |
return None | |
def parse_ai_response(self, text: str) -> Tuple[List[bool], Optional[Tuple[int, int, int]]]: | |
"""Parse both boolean responses and RGB color from AI response.""" | |
try: | |
lines = text.splitlines() | |
bools = [] | |
color = None | |
for line in lines: | |
line = line.strip() | |
if line.startswith(('1.', '2.', '3.')): | |
bools.append(line.lstrip('123. ').strip() == 'Yes') | |
elif 'RGB:' in line: | |
# Parse RGB values from format "RGB: (r, g, b)" | |
rgb_str = line.split('RGB:')[1].strip() | |
rgb_vals = [int(x) for x in rgb_str.strip('()').split(',')] | |
if len(rgb_vals) == 3: | |
color = tuple(rgb_vals) | |
return bools, color | |
except Exception as e: | |
print(f"Failed to parse response: {e}") | |
return [False] * 3, None | |
def analyze_screenshot(self, image_file: str) -> Tuple[List[bool], Optional[Tuple[int, int, int]]]: | |
"""Process screenshot through AI vision analysis.""" | |
try: | |
query = [ | |
image_file, | |
"\n\n", | |
"I am playing Star Citizen. It is a game and I want you to infer some things about the current state of the game. This is a screenshot of the game right now. " | |
"Answer the following questions directly and suscinctly with either yes no answers or strictly formatted responses depending on if the question calls for it. Absolutely no extra commentary or information please.\n\n" | |
"1. Does the screenshot show that the game character is in flight mode of a ship rather than on foot? Some visual cues to look for are if you see Move (Lateral) or Move (Vertical) or Strafe (Up) or Strafe (Down) in the bottom right hud area. If you don't at least one of those, answer No to this question. Really make sure that hud stuff is there if you suspect we might be in flight mode. You will see the pilot's seat in view sometimes, although you will not be in it. Always check the hud. Answer no if you do not see the hud items referenced in this question. Answer no if the player character is on a ship but not in the pilot seat also.", | |
"2. Does the screenshot show that the game character is holding a weapon in their hand?", | |
"3. If the answer to #2 is yes, is the weapon in automatic mode? This is indicated by 3 bullets stacked on top of each other in the lower right area of the on-screen hud. That area is right next to the ammo count area that shows how much ammo is in the current clip vs the max capacity of the clip. If you just see a single bullet indicated or don't see anything, answer no" | |
"3. Does the screenshot show that the game character is driving a ground vehicle?\n", | |
"4. What is the average or dominant color of the screenshot I provided? If you don't know then respond with a black color. Provide the answer in RGB format like this: RGB: (r, g, b)\n", | |
"5. If I am in flight mode based on question #1, please also check if the words 'ONLINE' or 'READY' appear in the middle of the screen area. The words QT engage in the bottom right hud area are also an indicator. Do those words appear? If this question is not applicable respond with no" | |
] | |
result = self.model.generate_content(query) | |
full_text = "".join(chunk.text or "" for chunk in result) | |
print(full_text) | |
return self.parse_ai_response(full_text) | |
except Exception as e: | |
print(f"Failed to process image: {e}") | |
return [False] * 3, None | |
def update_led_color(self, color: tuple) -> None: | |
"""Update the controller LED color if it has changed""" | |
if color and color != self.last_color: | |
r, g, b = color | |
packet = DSCommands.create_rgb_packet(r, g, b) | |
self.send_command(packet) | |
self.last_color = color | |
print(f"Updated LED color to RGB: {color}") | |
def send_command(self, packet: dict) -> bool: | |
"""Send UDP command to DSX in exact C# format.""" | |
try: | |
command_str = json.dumps(packet) | |
bytes_sent = self.sock.sendto(command_str.encode('utf-8'), (Config.DSX_IP, Config.DSX_PORT)) | |
print(f"Sent {bytes_sent} bytes to DSX: {command_str}") | |
return True | |
except Exception as e: | |
print(f"Error sending command: {e}") | |
return False | |
def run(self) -> None: | |
"""Main loop with window detection and game state analysis.""" | |
print("Star Citizen DualSense Haptics Controller starting...") | |
while True: | |
try: | |
# Check if the Star Citizen window exists | |
if not self.get_window_rect(): | |
print(f"\rWaiting for {Config.WINDOW_TITLE} window...", end='') | |
time.sleep(Config.RETRY_INTERVAL) | |
continue | |
print("\nStar Citizen window found! Starting capture loop...") | |
# Main capture loop | |
while True: | |
screenshot = self.take_screenshot() | |
if not screenshot: # Window was closed or minimized | |
print("\nLost connection to Star Citizen window") | |
break | |
result, color = self.analyze_screenshot( | |
google.generativeai.upload_file(screenshot) | |
) | |
print("\nVision Analysis Results:", result) | |
print("Detected Color:", color) | |
# Update LED color | |
if color: | |
self.update_led_color(color) | |
if result[0]: # Sitting in ship seat | |
print("Pilot seat detected - Sending Machine gun effect") | |
print(result) | |
if False: | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.Resistance, Trigger.Right, 10)) | |
#self.send_command(DSCommands.create_rumble_packet(5, 5)) | |
else: | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.VibrateTrigger, Trigger.Right, 10)) | |
elif result[1]: | |
if result[2]: | |
print("Semi-auto is active") | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.VibrateTrigger, Trigger.Right, 10)) | |
else: | |
print("Single fire more is active") | |
self.send_command(DSCommands.create_trigger_packet(TriggerMode.CustomTriggerValue, Trigger.Right, 30, 30, 30)) | |
time.sleep(Config.SCREENSHOT_INTERVAL) | |
except KeyboardInterrupt: | |
print("\nShutting down...") | |
break | |
except Exception as e: | |
print(f"Loop iteration failed: {e}") | |
time.sleep(1) | |
def __del__(self): | |
"""Cleanup resources.""" | |
try: | |
self.sock.close() | |
except: | |
pass | |
if __name__ == "__main__": | |
controller = StarCitizenHaptics() | |
controller.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment