Skip to content

Instantly share code, notes, and snippets.

@daniestevez
Last active December 16, 2024 08:09
Show Gist options
  • Save daniestevez/a7ff5472317f13af325056cb5f4fe79b to your computer and use it in GitHub Desktop.
Save daniestevez/a7ff5472317f13af325056cb5f4fe79b to your computer and use it in GitHub Desktop.
Testing UNIX domain socket sizes
#!/usr/bin/env python3
import argparse
import os
import socket
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--timeout', default=1,
help='Timeout for socket read [default=%(default)r seconds]')
parser.add_argument(
'--set_buf_size', type=int, default=None,
help='Set SO_SNDBUF/SO_RCVBUF [default=%(default)r]')
parser.add_argument(
'--socket_path', default='/tmp/custom-connector/custom.sock',
help='Path to the UNIX domain socket [default=%(default)r]')
return parser.parse_args()
def set_buf(s, size):
if size is None:
return
for b in [socket.SO_SNDBUF, socket.SO_RCVBUF]:
s.setsockopt(socket.SOL_SOCKET, b, size)
def main():
args = parse_args()
s = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
set_buf(s, args.set_buf_size)
s.connect(args.socket_path)
set_buf(s, args.set_buf_size)
input('Press Enter to start reading packets from socket > ')
s.settimeout(args.timeout)
packets = []
print('Reading packets from socket')
while True:
try:
packets.append(s.recv(4096))
except TimeoutError:
print('Read timed out')
break
total_size = sum([len(p) for p in packets])
print(f'Read {len(packets)}, with a total size of {total_size}')
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import argparse
import os
import socket
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--socket_path', default='/tmp/custom-connector/custom.sock',
help='Path to the UNIX domain socket [default=%(default)r]')
return parser.parse_args()
def main():
args = parse_args()
s = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
s.connect(args.socket_path)
while True:
print(s.recv(4096))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import argparse
import os
import socket
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--dgram_length', default=1510,
help='Length of each datagram [default=%(default)r]')
parser.add_argument(
'--set_buf_size', type=int, default=None,
help='Set SO_SNDBUF/SO_RCVBUF [default=%(default)r]')
parser.add_argument(
'socket_path', help='Path to the UNIX domain socket')
return parser.parse_args()
def set_buf(s, size):
if size is None:
return
for b in [socket.SO_SNDBUF, socket.SO_RCVBUF]:
s.setsockopt(socket.SOL_SOCKET, b, size)
def main():
args = parse_args()
# make sure that the socket does not exist already
try:
os.unlink(args.socket_path)
except FileNotFoundError:
pass
sock_server = socket.socket(
socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock_server.bind(args.socket_path)
sock_server.listen(1)
sock_client = socket.socket(
socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock_client.connect(args.socket_path)
conn, client_addr = sock_server.accept()
set_buf(conn, args.set_buf_size)
datagram = b'\x00' * args.dgram_length
n = 0
while True:
print(f'Sending datagram {n} to socket')
conn.sendall(datagram)
n += 1
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment