Skip to content

Instantly share code, notes, and snippets.

@ukazap
Created February 15, 2026 00:27
Show Gist options
  • Select an option

  • Save ukazap/473202c435a22d6ed23bf30df9fde139 to your computer and use it in GitHub Desktop.

Select an option

Save ukazap/473202c435a22d6ed23bf30df9fde139 to your computer and use it in GitHub Desktop.
PiKVM quick switches

PiKVM

Mix.install([
  {:req, "~> 0.5.17"},
  {:nimble_totp, "~> 1.0"},
  {:kino, "~> 0.18.0"},
  {:websockex, "~> 0.5"}
])

defmodule PiKVM do
  @host System.fetch_env!("PIKVM_HOST")
  @otp_secret System.fetch_env!("PIKVM_OTP_SECRET") |> Base.decode32!()
  @user System.fetch_env!("PIKVM_USER")
  @passwd System.fetch_env!("PIKVM_PASSWD")

  def req do
    Req.new(
      base_url: "https://#{@host}",
      connect_options: [
        transport_opts: [
          # self-signed certificate
          verify: :verify_none
        ]
      ],
      headers: %{
        "X-KVMD-User" => @user,
        "X-KVMD-Passwd" => @passwd <> NimbleTOTP.verification_code(@otp_secret)
      }
    )
  end

  def websocket_url do
    "wss://#{@host}/api/ws?stream=0"
  end

  def websocket_opts do
    [
      extra_headers: [
        {"x-kvmd-user", @user},
        {"x-kvmd-passwd", @passwd <> NimbleTOTP.verification_code(@otp_secret)}
      ],
      insecure: true
    ]
  end
end

defmodule PiKVMWebSocket do
  use WebSockex
  require Logger

  def start_link(_opts \\ []) do
    url = PiKVM.websocket_url()
    ws_opts = PiKVM.websocket_opts()
    
    state = %{
      subscribers: [],
      last_atx_state: nil
    }

    WebSockex.start_link(url, __MODULE__, state, ws_opts ++ [name: __MODULE__])
  end

  def subscribe(pid) do
    WebSockex.cast(__MODULE__, {:subscribe, pid})
  end

  def get_current_state do
    WebSockex.cast(__MODULE__, {:get_state, self()})
    
    receive do
      {:current_state, state} -> state
    after
      1000 -> nil
    end
  end

  # WebSockex callbacks

  def handle_connect(_conn, state) do
    Logger.info("WebSocket connected!")
    {:ok, state}
  end

  def handle_frame({:text, msg}, state) do
    case Jason.decode(msg) do
      {:ok, %{"event_type" => "atx", "event" => event}} ->
        handle_atx_event(event, state)

      {:ok, %{"event_type" => type}} ->
        Logger.debug("Received event: #{type}")
        {:ok, state}

      _ ->
        {:ok, state}
    end
  end

  def handle_frame(_frame, state) do
    {:ok, state}
  end

  def handle_cast({:subscribe, pid}, state) do
    {:ok, %{state | subscribers: [pid | state.subscribers]}}
  end

  def handle_cast({:get_state, pid}, state) do
    send(pid, {:current_state, state.last_atx_state})
    {:ok, state}
  end

  def handle_disconnect(%{reason: reason}, state) do
    Logger.error("WebSocket disconnected: #{inspect(reason)}")
    {:reconnect, state}
  end

  defp handle_atx_event(event, state) do
    new_state = event["leds"]["power"]

    if new_state != state.last_atx_state do
      Logger.info("ATX state changed: #{new_state}")

      Enum.each(state.subscribers, fn pid ->
        send(pid, {:atx_status_changed, new_state})
      end)
    end

    {:ok, %{state | last_atx_state: new_state}}
  end
end

:ok

ATX switches

# Stop any existing WebSocket connection
if Process.whereis(PiKVMWebSocket) do
  Process.exit(Process.whereis(PiKVMWebSocket), :kill)
  Process.sleep(100)
end

# Start new WebSocket connection
{:ok, _pid} = PiKVMWebSocket.start_link()

"WebSocket connected!"
defmodule ATXUIManager do
  use GenServer
  require Logger

  def start_link(frame) do
    GenServer.start_link(__MODULE__, frame, name: __MODULE__)
  end

  def init(frame) do
    # Subscribe to WebSocket
    PiKVMWebSocket.subscribe(self())
    
    # Get initial state
    power_state = PiKVMWebSocket.get_current_state()
    
    # Render initial UI
    render_ui(frame, power_state, false)
    
    {:ok, %{frame: frame, power_state: power_state}}
  end

  def handle_info({:atx_status_changed, power_state}, state) do
    render_ui(state.frame, power_state, false)
    {:noreply, %{state | power_state: power_state}}
  end

  def handle_info(:redraw, state) do
    power_state = PiKVMWebSocket.get_current_state()
    render_ui(state.frame, power_state, false)
    {:noreply, %{state | power_state: power_state}}
  end

  def show_waiting(frame) do
    Kino.Frame.render(frame, Kino.Markdown.new("⏳ Please wait..."))
  end

  defp render_ui(frame, power_state, waiting) do
    if waiting do
      Kino.Frame.render(frame, Kino.Markdown.new("⏳ Please wait..."))
    else
      status = case power_state do
        true -> "🟒"
        false -> "πŸ”΄"
        _ -> "🟑"
      end
      
      power_short = Kino.Control.button("Power (Short Press)")
      power_long = Kino.Control.button("Power (Long Press)")
      reset = Kino.Control.button("Reset")
      
      Kino.Frame.render(
        frame,
        Kino.Layout.grid(
          [
            Kino.Markdown.new(status),
            power_short,
            power_long,
            reset
          ],
          columns: 1
        )
      )
      
      # Re-attach listeners
      Kino.listen(power_short, fn _event ->
        show_waiting(frame)
        response = PiKVM.req() |> Req.post!(url: "/api/atx/click?button=power&wait=true")
        unless response.body["ok"] do
          Kino.Frame.append(frame, Kino.Markdown.new("❌ Failed: #{inspect(response.body)}"))
        end
        Process.send_after(__MODULE__, :redraw, :timer.seconds(10), [])
      end)
      
      Kino.listen(power_long, fn _event ->
        show_waiting(frame)
        response = PiKVM.req() |> Req.post!(url: "/api/atx/click?button=power_long&wait=true")
        unless response.body["ok"] do
          Kino.Frame.append(frame, Kino.Markdown.new("❌ Failed: #{inspect(response.body)}"))
        end
        Process.send_after(__MODULE__, :redraw, :timer.seconds(10), [])
      end)
      
      Kino.listen(reset, fn _event ->
        show_waiting(frame)
        response = PiKVM.req() |> Req.post!(url: "/api/atx/click?button=reset&wait=true")
        unless response.body["ok"] do
          Kino.Frame.append(frame, Kino.Markdown.new("❌ Failed: #{inspect(response.body)}"))
        end
        Process.send_after(__MODULE__, :redraw, :timer.seconds(10), [])
      end)
    end
  end
end
# Stop old UI manager if exists
if Process.whereis(ATXUIManager) do
  GenServer.stop(ATXUIManager)
  Process.sleep(100)
end

# Create frame
frame = Kino.Frame.new()

# Start UI manager
{:ok, _pid} = ATXUIManager.start_link(frame)

frame
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment