Skip to content

Instantly share code, notes, and snippets.

@chrisbloom7
Last active November 18, 2024 16:51
Show Gist options
  • Save chrisbloom7/d2d36c8ca158a99c2a09bee1509b0e57 to your computer and use it in GitHub Desktop.
Save chrisbloom7/d2d36c8ca158a99c2a09bee1509b0e57 to your computer and use it in GitHub Desktop.
Improved implementation of a websocket connector for Slack using the `async-websockets` gem

Implenting a Slack WebSocket Client in Ruby using the async-websockets Gem

See https://api.slack.com/apis/socket-mode#implementing for more info about implementing a custom websocket client for Slack.

A previous version of this was posted to https://gist.github.com/chrisbloom7/52bc954b4df09b9cb829a73cfd5466c0, but that version no longer works with newer versions of the async-websockets gem. (It also wasn't easily testable.)

Setup

Add async-websockets to your gemfile

gem "async-websockets", "~> 0.30.0"

If you are using rspec for testing, also add the async-rspec gem to your test group

group :test do
  gem "async-rspec"
end

See also the test helpers for the sus testing framework.

# frozen_string_literal: true
namespace :slack do
namespace :socket_mode do
desc "Start a socket mode connection to Slack and listen for messages"
task :start, [:debug] => [:environment] do |_task, args|
debug = args[:debug].present? && args[:debug] != "false"
Slack::SocketMode.start(debug:)
rescue Interrupt => _error
puts "Interrupted. Exiting..."
end
end
end
# frozen_string_literal: true
module Slack
module SocketMode
module_function
def start(debug: false)
client = Slack::SocketMode::Client.new(debug:)
client.listen do |parsed_payload|
Slack::SocketMode::Handler.process(parsed_payload)
end
ensure
client&.close
end
end
end
# frozen_string_literal: true
require "async"
require "async/http/endpoint"
require "async/websocket/client"
module Slack
module SocketMode
class Client
ConnectionError = Class.new(StandardError)
InvalidPayloadFormatError = Class.new(StandardError)
attr_reader :connection, :debug, :url
alias debug? debug
# `url` should be a temporary Socket Mode WebSocket URL fetched from
# https://api.slack.com/methods/apps.connections.open
def initialize(url:, debug: false)
@connection = nil
@debug = debug
@url = url
end
def listen
connect! do
while (payload = connection.read)
logger.debug("Received payload", payload:)
parsed_payload = parse_payload(payload)
acknowledge!(parsed_payload)
yield parsed_payload
end
end
ensure
close
end
def connect!
return @connection if @connection
logger.debug("Requesting connection to", endpoint_url:)
endpoint = Async::HTTP::Endpoint.parse(endpoint_url)
Async::WebSocket::Client.connect(endpoint) do |conn|
logger.debug("Confirming connection", connection: conn)
say_hello!(conn.read)
@connection = conn
yield
end
rescue ConnectionError
close
raise
end
private :connect!
def endpoint_url
return @endpoint_url if defined?(@endpoint_url)
@endpoint_url = url
@endpoint_url += "&debug_reconnects=true" if debug?
@endpoint_url
end
# The first message is always the hello message. It doesn't have an
# envelope_id and we don't need to acknowledge it, but it has some
# useful information in it so we should log it.
def say_hello!(hello_payload)
hello = parse_payload(hello_payload)
raise(ConnectionError, "Missing hello message") unless hello[:type].to_s == "hello"
logger.info("Connected to Slack", hello:)
end
private :say_hello!
def parse_payload(payload)
parsed_payload = {}
logger.debug "Parsing payload", payload: payload.inspect
if payload.is_a?(Protocol::WebSocket::Message)
parsed_payload = payload.parse(Protocol::WebSocket::Coder::JSON.new(symbolize_names: true))
logger.debug "Payload parsed as Protocol::WebSocket::Message", envelope_id: parsed_payload[:envelope_id]
elsif payload.is_a?(Hash)
parsed_payload = payload.transform_keys(&:to_sym)
logger.debug "Payload parsed as Hash", envelope_id: parsed_payload[:envelope_id]
else
# Log this, but don't raise an error that breaks the connection
logger.warn "Unrecognized payload format", payload: payload.inspect
end
logger.debug("Parsed payload", parsed_payload:)
parsed_payload
end
def acknowledge!(parsed_payload)
if parsed_payload[:envelope_id]
logger.debug "Acknowledging message", envelope_id: parsed_payload[:envelope_id]
Protocol::WebSocket::TextMessage.generate({ envelope_id: parsed_payload[:envelope_id] }).send(connection)
logger.debug "Message acknowledged", envelope_id: parsed_payload[:envelope_id]
else
# Log this, but don't raise an error that breaks the connection
logger.warn "No envelope_id to acknowledge!", payload: parsed_payload
end
end
private :acknowledge!
def close
if connection && !connection.closed?
# Finish sending any buffered messages
connection.flush
# Tell the server we're closing the connection
connection.send_close
# Close the client connection, but don't reset @connection to nil; we'll
# need a new URL if we want to reconnect in which case we should open a
# new client instance.
connection.close
end
end
end
end
end
# frozen_string_literal: true
require "rails_helper"
# Async testing contexts
require "async/rspec"
require "async/websocket/adapters/http"
RSpec.describe Slack::SocketMode::Client do
include_context Async::RSpec::Reactor
let(:client) { described_class.new(url:) }
let(:handler) { Slack::SocketMode::Handler }
# Port 999 is currently unused in the test environment, but may need to be
# changed in the future or for CI.
let(:url) { "http://0.0.0.0:999/?asdf=123" }
let(:protocol) { Async::HTTP::Protocol::HTTP1 }
let(:endpoint) { Async::HTTP::Endpoint.parse(url, timeout: 0.8, reuse_port: true, protocol:) }
let(:hello_message) { { type: "hello" } }
let(:events_api_message) { { type: "events_api", envelope_id: generate(:envelope_id) } }
let(:interactive_message) { { type: "interactive", envelope_id: generate(:envelope_id) } }
let(:no_envelope_id_message) { { type: "events_api" } }
let(:invalid_message) { "not json" }
# For some reason we can send 3 messages fine, but anything else sent beyond
# that isn't received by the client.
# See https://github.com/socketry/async-websocket/discussions/71#discussioncomment-10708059
let(:payload_proc) do
->(connection) do
connection.send_text(hello_message.to_json)
connection.send_text(events_api_message.to_json)
connection.send_text(interactive_message.to_json)
end
end
let(:app) do
Protocol::HTTP::Middleware.for do |request|
Async::WebSocket::Adapters::HTTP.open(request) do |connection|
payload_proc.call(connection)
rescue Protocol::WebSocket::ClosedError
# Ignore this error.
ensure
connection.close
end or Protocol::HTTP::Response[404, {}, []]
end
end
before do
# Suppress WebMock's auto-hijacking of Async::HTTP::Clients
# Uncomment next line if using Webmock
# WebMock::HttpLibAdapters::AsyncHttpClientAdapter.disable!
# Bind the endpoint before running the server so that we know incoming
# connections will be accepted
@bound_endpoint = endpoint.bound
# Make the bound endpoint quack like a regular endpoint for the server
# which expects an unbound endpoint.
@bound_endpoint.extend(SingleForwardable)
@bound_endpoint.def_delegators(:endpoint, :authority, :path, :protocol, :scheme)
# Bind an async server to the bound endpoint using our async websocket app
@server = Async::HTTP::Server.new(app, @bound_endpoint)
@server_task = Async do
@server.run
end
# As with the server endpoint, we need a bound client endpoint that quacks
# like an regular endpoint for the sake of the client.
@client_endpoint = @bound_endpoint.local_address_endpoint(timeout: endpoint.timeout)
@client_endpoint.instance_variable_set(:@endpoint, endpoint)
@client_endpoint.extend(SingleForwardable)
@client_endpoint.def_delegators(:@endpoint, :authority, :path, :protocol, :scheme)
# Configure the websocket client to use our bound client endpoint
allow(Async::WebSocket::Client).to receive(:connect).and_wrap_original do |original_method, *arguments, &block|
original_method.call(@client_endpoint, *arguments[1..], &block)
end
end
after do
# Use a timeout that is slightly longer than the endpoint timeout to avoid
# hanging when closing the client.
Async::Task.current.with_timeout(1) do
@server_task&.stop
end
rescue RuntimeError
# Ignore the error that is raised if there is no current async task running
nil
ensure
# Close our websocket client connection
client.close
# Close the bound endpoint to free up the address for the next test
@bound_endpoint&.close
# Re-enable WebMock's auto-hijacking of Async::HTTP::Clients
# Uncomment next line if using Webmock
# WebMock::HttpLibAdapters::AsyncHttpClientAdapter.enable!
end
describe "#listen" do
it "reads from the connection until closed" do
messages = []
client.listen do |payload|
messages << payload
end
expect(messages).to contain_exactly(events_api_message, interactive_message)
end
it "sends an acknowledgement for each message" do
acknowledgements = []
expect(Protocol::WebSocket::TextMessage).to receive(:generate).twice.and_wrap_original do |original_method, *arguments, &block|
acknowledgements << arguments.first
acknowledgement = original_method.call(*arguments, &block)
expect(client.connection).to be_a(Async::WebSocket::Connection)
expect(acknowledgement).to receive(:send).with(client.connection).and_call_original
acknowledgement
end
client.listen {}
expect(acknowledgements.map(&:to_h)).to contain_exactly(
{ envelope_id: events_api_message[:envelope_id] },
{ envelope_id: interactive_message[:envelope_id] }
)
end
context "when the payload does not include an envelope ID" do
let(:payload_proc) do
->(connection) do
connection.send_text(hello_message.to_json)
connection.send_text(no_envelope_id_message.to_json)
end
end
it "reports an error to failbot" do
client.listen {}
expect(Failbot.reports.count).to eq(1)
report = Failbot.reports.first
expect(report["exception_detail"].count).to eq(1)
expect(report["exception_detail"].first).to include(
"type" => described_class::InvalidPayloadFormatError.name,
"value" => "Missing envelope ID"
)
expect(report).to include("payload" => no_envelope_id_message.inspect)
end
end
context "when the first message is not a hello" do
let(:payload_proc) do
->(connection) do
connection.send_text(events_api_message.to_json)
end
end
it "raises an error" do
expect { client.listen {} }.to raise_error(described_class::ConnectionError)
end
end
end
describe "#endpoint_url" do
it "returns the URL" do
expect(client.endpoint_url).to eq url
end
context "when the debug option is set" do
it "adds debug info to the URL" do
client = described_class.new(url:, debug: true)
expect(client.endpoint_url).to eq "#{url}&debug_reconnects=true"
end
end
end
describe "#parse_payload" do
context "when the payload is a Protocol::WebSocket::Message" do
it "parses the Protocol::WebSocket::Message" do
message = Protocol::WebSocket::Message.new(hello_message.to_json)
parsed_payload = client.parse_payload(message)
expect(parsed_payload).to eq hello_message
end
end
context "when the payload is a Hash" do
it "passes the Hash through" do
parsed_payload = client.parse_payload(hello_message)
expect(parsed_payload).to eq hello_message
end
it "symbolizes the keys" do
parsed_payload = client.parse_payload(hello_message.stringify_keys)
expect(hello_message.keys).to all(be_a(Symbol))
expect(parsed_payload).to eq hello_message
end
end
context "when the payload is neither a Message nor a Hash" do
it "reports an error to failbot" do
client.parse_payload(invalid_message)
expect(Failbot.reports.count).to eq(1)
report = Failbot.reports.first
expect(report["exception_detail"].count).to eq(1)
expect(report["exception_detail"].first).to include(
"type" => described_class::InvalidPayloadFormatError.name,
"value" => "Unrecognized payload format"
)
expect(report).to include("payload" => invalid_message.inspect)
end
end
end
end
# frozen_string_literal: true
module Slack
module SocketMode
class Handler
InvalidPayloadTypeError = Class.new(StandardError)
VALID_PAYLOAD_TYPES = %i[disconnect events_api interactive].freeze
class << self
def process(parsed_payload)
payload_type = parsed_payload[:type]&.to_sym
if VALID_PAYLOAD_TYPES.include?(payload_type)
send(payload_type, parsed_payload)
else
# Log this, but don't raise an error that breaks the connection
logger.warn("Unrecognized payload", payload: parsed_payload)
end
end
private
def disconnect(parsed_payload)
logger.info("Disconnect message", payload: parsed_payload)
# Handle any disconnect steps you wish to take here.
# See https://api.slack.com/apis/socket-mode#disconnect
end
def events_api(parsed_payload)
logger.debug("Events API parsed_payload", payload: parsed_payload)
# Handle any registered event API events here.
# See https://api.slack.com/apis/events-api
rescue => error
# Log this, but don't raise an error that breaks the connection
logger.warn("events_api message handling error", { payload: parsed_payload }, error)
end
def interactive(parsed_payload)
logger.debug("Interactive message", payload: parsed_payload)
# Handle any registered interactive events here.
# See https://api.slack.com/reference/interaction-payloads
rescue => error
# Log this, but don't raise an error that breaks the connection
logger.warn("interactive message handling error", { payload: parsed_payload }, error)
end
end
end
end
end
# frozen_string_literal: true
require "rails_helper"
RSpec.describe Slack::SocketMode::Handler do
describe ".process" do
context "when the payload type is events_api" do
let(:message) { { type: "events_api", envelope_id: generate(:envelope_id) } }
it "handles events_api messages" do
# Test whatever you need to test here
# described_class.process(message)
end
end
context "when the payload type is interactive" do
let(:message) { { type: "interactive", envelope_id: generate(:envelope_id) } }
it "handles interactive messages" do
# Test whatever you need to test here
# described_class.process(message)
end
end
context "when the payload type is disconnect" do
let(:message) { { type: "disconnect", envelope_id: generate(:envelope_id) } }
it "handles disconnect messages" do
# Test whatever you need to test here
# described_class.process(message)
end
end
end
end
# frozen_string_literal: true
require "rails_helper"
RSpec.describe Slack::SocketMode do
let(:client) { Slack::SocketMode::Client }
let(:handler) { Slack::SocketMode::Handler }
let(:payload) { { type: :events_api } }
let(:client_instance) { instance_double(client, close: nil) }
describe ".start" do
it "instantiates a socket mode client" do
expect(Slack::SocketMode::Client).to receive(:new).with(debug: false).and_return(client_instance)
expect(client_instance).to receive(:listen).and_yield(payload)
expect(handler).to receive(:process).with(payload)
described_class.start
end
it "can request a debug mode connection" do
expect(Slack::SocketMode::Client).to receive(:new).with(debug: true).and_return(client_instance)
expect(client_instance).to receive(:listen).and_yield(payload)
expect(handler).to receive(:process).with(payload)
described_class.start(debug: true)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment