Mix . install ( [
{ :kino_k8s , "~> 2.0" } ,
{ :kubereq , "~> 0.4.1" } ,
{ :thousand_island , github: "mruoss/thousand_island" , ref: "handler-refactoring" } ,
] )
Defining the PortForward Module
defmodule PortForward do
use Kubereq.Connect
require ThousandIsland.Handler
require Logger
@ behaviour ThousandIsland.Handler
@ data_channel 0x00
@ error_channel 0x01
def child_spec ( init_args ) do
default = % {
id: __MODULE__ ,
start: { __MODULE__ , :start_link , [ init_args ] } ,
restart: :temporary
}
Supervisor . child_spec ( default , [ ] )
end
@ spec start_link ( { handler_options :: term ( ) , GenServer . options ( ) } ) :: GenServer . on_start ( )
def start_link ( { handler_options , _genserver_options } ) do
{ req , handler_options } = Keyword . pop! ( handler_options , :req )
{ target_port , handler_options } = Keyword . pop! ( handler_options , :target_port )
{ path_params , handler_options } = Keyword . split ( handler_options , [ :namespace , :name ] )
opts =
Keyword . merge ( handler_options ,
subresource: "portforward" ,
params: [ ports: target_port ] ,
path_params: path_params
)
req = Req . merge ( req , opts )
Kubereq.Connect . start_link ( __MODULE__ , req , % { target_port: target_port } )
end
Module . eval_quoted ( __MODULE__ , ThousandIsland.Handler . genserver_impl ( ) )
Module . eval_quoted ( __MODULE__ , ThousandIsland.Handler . handler_impl ( ) )
@ impl ThousandIsland.Handler
def handle_data ( msg , _socket , state ) do
Kubereq.Connect . send_frame ( self ( ) , { :text , << @ data_channel , msg :: binary >> } )
{ :continue , state }
end
@ impl ThousandIsland.Handler
def handle_close ( _socket , _state ) do
Kubereq.Connect . close ( self ( ) , 1000 , "" )
:ok
end
@ impl Kubereq.Connect
def handle_frame (
{ :binary , << @ data_channel , target_port :: little - size ( 16 ) >> } ,
{ _socket , % { target_port: target_port } } = state
) do
# noop
{ :noreply , state }
end
def handle_frame (
{ :binary , << @ error_channel , target_port :: little - size ( 16 ) >> } ,
{ _socket , % { target_port: target_port } } = state
) do
# noop
{ :noreply , state }
end
def handle_frame ( { :binary , << @ data_channel , data :: binary >> } , { socket , state } ) do
ThousandIsland.Socket . send ( socket , data )
{ :noreply , { socket , state } }
end
def handle_frame ( { :binary , << @ error_channel , data :: binary >> } , { socket , state } ) do
Logger . error ( data )
{ :noreply , { socket , state } }
end
def handle_frame ( { :close , _reason , _code } , { socket , state } ) do
dbg ( { :remote_close , self ( ) } )
ThousandIsland.Socket . close ( socket )
{ :stop , :normal , state }
end
end
Using kino_k8s
to select the pod we want to forward traffic to. Use the GET
operation and select a Pod in your Kubernetes cluster:
req = Req . new ( ) |> Kubereq . attach ( context: "old-infra" , api_version: "v1" , kind: "Pod" )
% { body: pod } = Kubereq . get! ( req , "promtail" , "promtail-6qdww" )
Kino.Tree . new ( pod )
ports =
get_in ( pod , [ "spec" , "containers" , Access . all ( ) , "ports" ] )
|> List . flatten ( )
|> Enum . map ( & { & 1 [ "containerPort" ] , "#{ & 1 [ "name" ] } (#{ & 1 [ "containerPort" ] } )" } )
form =
Kino.Control . form (
[
client_port: Kino.Input . number ( "Local Port" ) ,
target_port: Kino.Input . select ( "Remote Port on Pod #{ pod [ "metadata" ] [ "namespace" ] } /#{ pod [ "metadata" ] [ "name" ] } " , ports ) ,
] ,
submit: "Start Port-Forwarding"
)
listener = Kino . listen ( form , fn event ->
ThousandIsland . start_link (
port: event . data . client_port ,
handler_module: PortForward ,
handler_options: [
req: req ,
namespace: pod [ "metadata" ] [ "namespace" ] ,
name: pod [ "metadata" ] [ "name" ] ,
target_port: event . data . target_port
] ,
transport_options: [ packet: :raw ]
)
Kino.Markdown . new ( "Connection Established: http://127.0.0.1:#{ event . data . client_port } " ) |> Kino . render ( )
end )
form
Stop the listener (and close all opened sockets)
Kino . terminate_child ( listener )