Last active
March 21, 2025 15:08
-
-
Save bendem/edf6cdf37aabf659ddbb0f77d9b50e2f to your computer and use it in GitHub Desktop.
Bitwarden scripts to keep a session open without the need to export env variables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import grp | |
import os | |
import pwd | |
import socket | |
import sys | |
import subprocess | |
import struct | |
import typing | |
import logging | |
logging.basicConfig( | |
format="%(asctime)s %(levelname)s - %(message)s", level=os.getenv("LOG_LEVEL", logging.DEBUG) | |
) | |
BW_SOCKET = os.environ.get("BW_SOCKET", "/bw-agent/bw-agent.sock") | |
PROPAGATED_ENV_VARIABLES = ["BW_SESSION", "NODE_EXTRA_CA_CERTS", "PATH"] | |
BW_ENV = {name: os.environ[name] for name in PROPAGATED_ENV_VARIABLES} | |
UCRED_FORMAT = "III" | |
AUDIT_L = logging.getLogger("audit") | |
SERVER_L = logging.getLogger("server") | |
def read_all(connection: socket.socket): | |
data = b"" | |
while True: | |
read = connection.recv(1024) | |
if read == b"": | |
break | |
data += read | |
return data.decode("utf-8").strip() | |
def pid_to_process_tree(pid: int) -> typing.List[typing.List[str]]: | |
cmdlines: typing.List[typing.List[str]] = [] | |
while pid > 1: | |
with open(f"/proc/{pid}/cmdline") as cmdline: | |
cmdlines.append(cmdline.read().strip("\0 ").split("\0")) | |
with open(f"/proc/{pid}/stat") as stat: | |
pid = int(stat.read().split(" ", 4)[3]) | |
cmdlines.reverse() | |
return cmdlines | |
def audit_connection(connection: socket.socket): | |
ucred = connection.getsockopt( | |
socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize(UCRED_FORMAT) | |
) | |
pid, uid, gid = struct.unpack(UCRED_FORMAT, ucred) | |
try: | |
cmdlines = pid_to_process_tree(pid) | |
except: # noqa E722 | |
cmdlines = None | |
if not cmdlines: | |
AUDIT_L.warning("No data from caller") | |
return | |
user = pwd.getpwuid(uid) | |
group = grp.getgrgid(gid) | |
user_group = user.pw_name | |
if user.pw_name != group.gr_name: | |
user_group += f":{group.gr_name}" | |
AUDIT_L.info( | |
f"connection from pid {pid} by {uid}:{gid} -> {cmdlines[-1][0]} by {user_group}" | |
) | |
AUDIT_L.debug("%s", "\n-> ".join(str(x) for x in cmdlines)) | |
def execute_command(cmd: str): | |
AUDIT_L.info(f'running "{cmd}"') | |
p = subprocess.run( | |
["sh", "-c", cmd], stdout=subprocess.PIPE, stderr=subprocess.STDOUT | |
) | |
return p.returncode, p.stdout | |
def run_server(sock: socket.socket): | |
while True: | |
connection = None | |
try: | |
connection, _ = sock.accept() | |
audit_connection(connection) | |
data = read_all(connection) | |
if not data or data == ":": | |
# client is just checking if the server is listening | |
SERVER_L.debug("no data read, closing") | |
continue | |
code, response = execute_command(data) | |
connection.sendall((str(code) + "\n").encode("ascii") + response) | |
except BrokenPipeError: | |
SERVER_L.debug("connection lost") | |
finally: | |
if connection: | |
connection.close() | |
def main(): | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
try: | |
os.unlink(BW_SOCKET) | |
except OSError: | |
if os.path.exists(BW_SOCKET): | |
raise | |
try: | |
sock.bind(BW_SOCKET) | |
except socket.error as e: | |
SERVER_L.error(e) | |
sys.exit(1) | |
sock.listen(2) | |
try: | |
run_server(sock) | |
except KeyboardInterrupt: | |
SERVER_L.info(f"closing {BW_SOCKET}") | |
sock.close() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -Eeuo pipefail | |
BW_SOCKET="${BW_SOCKET:-$HOME/.bw-agent.sock}" | |
ret=0 | |
socat UNIX-CONNECT:"$BW_SOCKET" - <<< ':' &> /dev/null || { | |
ret=$? | |
} | |
if (( ret != 0 )); then | |
if (( ret == 1)); then | |
rm -f "$BW_SOCKET" | |
fi | |
if (( $# != 0 )); then | |
echo "> bw-agent is not started or its socket is not at $BW_SOCKET." >&2 | |
fi | |
exit 150 | |
fi | |
if (( $# == 0 )); then | |
exit | |
fi | |
command=(bw) | |
post=(cat) | |
case "$1" in | |
--vault-id) | |
command+=(get password "ansible://$2") | |
;; | |
search) | |
command+=(list items --search "$2") | |
if [[ $# -ge 3 ]]; then | |
# shellcheck disable=SC2016 | |
post=(jq -r --arg url "$2" --arg user "$3" '.[].login | select((.uris[].uri | startswith($url)) and .username == $user) | .password') | |
else | |
# shellcheck disable=SC2016 | |
post=(jq -r --arg url "$2" '.[].login | select((.uris[].uri | startswith($url)) and .username == null) | .password') | |
fi | |
;; | |
*) | |
command+=("$@") | |
esac | |
socat -t 10 "UNIX-CONNECT:$BW_SOCKET" - <<< "${command[@]@Q}" | grep -v -e 'mac failed' -e 'MAC comparison failed' | { | |
IFS= read -r code | |
if [[ $code != "0" ]]; then | |
# echo "Got code $code" >&2 | |
cat - >&2 | |
exit "$code" | |
fi | |
if { | |
IFS=$'\n' read -r -d '' CAPTURED_STDERR; | |
IFS=$'\n' read -r -d '' CAPTURED_STDOUT; | |
(IFS=$'\n' read -r -d '' _ERRNO_; exit "$_ERRNO_"); | |
} < <((printf '\0%s\0%d\0' "$("${post[@]}")" "${?}" 1>&2) 2>&1); then | |
echo "$CAPTURED_STDOUT" | |
else | |
err=$? | |
echo "Failed to postprocess:" "${post[@]@Q}" >&2 | |
echo "Input:" >&2 | |
echo "-----------------" >&2 | |
echo "$CAPTURED_STDOUT" >&2 | |
echo "-----------------" >&2 | |
echo "Output:" >&2 | |
echo "-----------------" >&2 | |
echo "$CAPTURED_STDERR" >&2 | |
echo "-----------------" >&2 | |
exit "$err" | |
fi | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment