|
package main |
|
|
|
import ( |
|
"bufio" |
|
"context" |
|
"encoding/json" |
|
"io" |
|
"log" |
|
"net" |
|
"net/http" |
|
"sync" |
|
) |
|
|
|
type handler struct { |
|
mu sync.Mutex |
|
varlinkConns map[net.Conn]net.Conn |
|
} |
|
|
|
type varlinkConnKey struct{} |
|
|
|
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|
if err := context.Cause(r.Context()); err != nil { |
|
http.Error(w, err.Error(), http.StatusInternalServerError) |
|
return |
|
} |
|
|
|
// reuse varlink connection across requests on same HTTP 1.1 / 2 connection |
|
varlinkConn := r.Context().Value(varlinkConnKey{}).(net.Conn) |
|
|
|
// map path to varlink method |
|
method := r.PathValue("method") |
|
|
|
log.Print(method) |
|
|
|
// map request body to varlink parameters |
|
parameterBytes, err := io.ReadAll(r.Body) |
|
if err != nil { |
|
http.Error(w, err.Error(), http.StatusInternalServerError) |
|
return |
|
} |
|
parameters := json.RawMessage(parameterBytes) |
|
|
|
req := struct { |
|
Method string `json:"method"` |
|
Parameters json.RawMessage `json:"parameters"` |
|
More bool `json:"more"` |
|
}{ |
|
Method: method, |
|
Parameters: parameters, |
|
More: true, |
|
} |
|
reqBytes, err := json.Marshal(req) |
|
if err != nil { |
|
http.Error(w, err.Error(), http.StatusBadRequest) |
|
return |
|
} |
|
|
|
// send varlink request |
|
if _, err := varlinkConn.Write(reqBytes); err != nil { |
|
http.Error(w, err.Error(), http.StatusInternalServerError) |
|
return |
|
} |
|
varlinkConn.Write([]byte{0}) |
|
|
|
// stream back (potentially multiple!) varlink responses (separated by nul byte), |
|
// flushing the HTTP stream after each nul byte, mapping nul bytes to \r\n to ease use with HTTP |
|
flusher, _ := w.(http.Flusher) |
|
br := bufio.NewReader(varlinkConn) |
|
for { |
|
msg, err := br.ReadBytes(0) |
|
if len(msg) > 0 { |
|
body := msg[:len(msg)-1] |
|
w.Write(body) |
|
w.Write([]byte("\r\n")) |
|
if flusher != nil { |
|
flusher.Flush() |
|
} |
|
var resp struct { |
|
Continues bool `json:"continues"` |
|
} |
|
json.Unmarshal(body, &resp) |
|
if !resp.Continues { |
|
return |
|
} |
|
} |
|
if err != nil { |
|
if err != io.EOF { |
|
log.Print(err) |
|
} |
|
return |
|
} |
|
} |
|
} |
|
|
|
func main() { |
|
h := &handler{ |
|
varlinkConns: make(map[net.Conn]net.Conn), |
|
} |
|
|
|
mux := http.NewServeMux() |
|
|
|
mux.Handle("POST /{method}", h) |
|
server := http.Server{ |
|
Addr: ":8080", |
|
Handler: mux, |
|
ConnState: func(conn net.Conn, state http.ConnState) { |
|
if state == http.StateClosed { |
|
h.mu.Lock() |
|
unixConn, ok := h.varlinkConns[conn] |
|
if ok { |
|
delete(h.varlinkConns, conn) |
|
} |
|
h.mu.Unlock() |
|
if ok { |
|
log.Printf("closing %s", unixConn.RemoteAddr()) |
|
unixConn.Close() |
|
} |
|
} |
|
}, |
|
ConnContext: func(ctx context.Context, c net.Conn) context.Context { |
|
unixConn, err := net.Dial("unix", "/run/systemd/userdb/io.systemd.Multiplexer") |
|
if err != nil { |
|
ctx, cancel := context.WithCancelCause(ctx) |
|
cancel(err) |
|
return ctx |
|
} |
|
h.mu.Lock() |
|
h.varlinkConns[c] = unixConn |
|
h.mu.Unlock() |
|
log.Printf("connecting to %s", unixConn.RemoteAddr()) |
|
return context.WithValue(ctx, varlinkConnKey{}, unixConn) |
|
}, |
|
} |
|
if err := server.ListenAndServe(); err != nil { |
|
log.Fatal(err) |
|
} |
|
} |