Skip to content

Instantly share code, notes, and snippets.

@AstroEngineeer
Last active July 30, 2024 12:32
Show Gist options
  • Save AstroEngineeer/642575a95f1b4a2786e8fe97a7a41ec3 to your computer and use it in GitHub Desktop.
Save AstroEngineeer/642575a95f1b4a2786e8fe97a7a41ec3 to your computer and use it in GitHub Desktop.
Private chat room using TCP socket.
import socket
import select
import sys
import uuid
mac_add = (':'.join(['{:02x}'.format((uuid.getnode() >> ele) & 0xff) for ele in range(0,8*6,8)][::-1]))
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
IP_address = "192.168.48.128"
Port = 20001
client.connect((IP_address, Port))
client.send(bytes(mac_add,"utf-8"))
if client.recv(2048).decode("utf-8") == "Access denied":
print("Access denied")
client.close()
else:
while True:
sockets_list = [sys.stdin, client]
read_sockets,write_socket, error_socket = select.select(sockets_list,[],[])
for socks in read_sockets:
if socks == client:
message = socks.recv(2048)
message = message.decode("utf-8")
print (message)
else:
message = input()
client.send(bytes(message,"utf-8"))
print("<You>", end=" ")
print(message)
client.close()
import socket
import emoji
from _thread import start_new_thread
import csv
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
IP_address = "192.168.48.128"
Port = 20001
server.bind((IP_address, Port))
server.listen(100)
list_of_clients = []
list_of_approved_client = []
def clientthread(conn, addr):
conn.send(bytes("Welcome to Vicky's chatroom!", "utf-8"))
while True:
try:
message = conn.recv(2048)
message = message.decode("utf-8")
if message:
print("<" + addr[0] + "> " + emoji.emojize(message))
message_to_send = "<" + addr[0] + "> " + message
broadcast(message_to_send, conn)
else:
remove(conn)
except:
continue
def broadcast(message, connection):
for clients in list_of_clients:
if clients != connection:
try:
clients.send(bytes(message, "utf-8"))
except:
clients.close()
remove(clients)
def remove(connection):
if connection in list_of_clients:
list_of_clients.remove(connection)
def auth_mac(mac,ip):
w_macs = []
b_macs = []
with open('macs_white_listed.csv') as mac_list:
csv_reader = csv.reader(mac_list)
for row in csv_reader:
w_macs.append(row[0])
with open('macs_black_listed.csv') as mac_list:
csv_reader = csv.reader(mac_list)
for row in csv_reader:
b_macs.append(row[0])
if mac in b_macs:
return False
elif mac not in w_macs:
if input("{} wants to join your chat room.Allow(Y/N): ".format(mac)).lower() == "y":
with open('macs_white_listed.csv', mode='a') as mac_list:
csvfile = csv.writer(mac_list, delimiter=',')
csvfile.writerow([mac,ip])
return True
else:
if input("Add mac address to black list?(Y/N): ").lower() == "y":
with open('macs_black_listed.csv', mode='a') as mac_list:
csvfile = csv.writer(mac_list, delimiter=',')
csvfile.writerow([mac,ip])
return False
else:
return True
print("Waiting for clients to join!")
while True:
conn, addr = server.accept()
mac = conn.recv(2048).decode("utf-8")
if auth_mac(mac,addr[0]):
list_of_clients.append(conn)
print(addr[0] + " has connected to your chatroom.")
start_new_thread(clientthread, (conn, addr))
else:
conn.send(bytes("Access denied","utf-8"))
conn.close()
server.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment