Last active
November 12, 2021 05:30
-
-
Save itsecurityco/9b0ed66dded62ec8b184ff6fe17d53ff to your computer and use it in GitHub Desktop.
Main CTF Ekoparty 2021 - PLC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import socket programming library | |
# @itsecurityco (Juan) | |
import socket | |
# import thread module | |
from _thread import * | |
door_closed = b""" | |
______________ | |
|\ ___________ /| | |
| | _ _ _ _ | | | |
| | | | | | | | | | |
| | |-+-+-+-| | | | |
| | |-+-+=+%| | | | |
| | |_|_|_|_| | | | |
| | ___ | | | |
| | [___] ()| | | |
| | ||| | | |
| | ()| | | |
| | | | | |
| | | | | |
| | | | | |
|_|___________|_| The door is closed | |
""" | |
door_has_been_closed = b""" | |
______________ | |
|\ ___________ /| | |
| | _ _ _ _ | | | |
| | | | | | | | | | |
| | |-+-+-+-| | | | |
| | |-+-+=+%| | | | |
| | |_|_|_|_| | | | |
| | ___ | | | |
| | [___] ()| | | |
| | ||| | | |
| | ()| | | |
| | | | | |
| | | | | |
| | | | | |
|_|___________|_| The door has been closed | |
""" | |
function_code_five_hint = b"You are close! read the protocol :)" | |
door_open = b""" | |
______________ | |
|\ ___________ /| | |
| | /|,| | | | | |
| | |,x,| | | | | |
| | |,x,' | | | | |
| | |,x | | | | |
| | |/ | | | | |
| | /] | | | | |
| | [/ ()\ | | | |
| | | \,^%--- | |
| | | <\/ \ The door has been open | |
| | |< | | | |
| | ,'< | | | |
| | ,' ^^|\ | | | |
|_|,'_____/__\|_| EKO{Write_Single_Coil_ON} | |
""" | |
# thread function | |
def threaded(c): | |
try: | |
c.settimeout(5) | |
# data received from client | |
data = c.recv(255) | |
if not data: | |
print('Bye') | |
c.close() | |
uid = data[6:7] # Unit Identifier | |
fn_code = data[7:8] # 05 (0x05) Write Single Coil | |
out_addr = data[8:10] # Output Address | |
out_val = data[10:12] # Output Value | |
mei = data[8:9] # MEI type: Read Device Identification (14) | |
print(""" | |
uid: %s | |
fn_code: %s | |
out_addr: %s | |
out_val: %s | |
mei: %s | |
""" % (uid, fn_code, out_addr, out_val, mei)) | |
if fn_code == b"\x01" and uid == b"\xff" and out_addr == b"\x00\x00" and out_val == b"\x00\x01": | |
# Function code 1 (Read coils) | |
# Se verifica si utiliza la funcion 1 | |
# Se verifica que el uid (Unit Identifier/SlaveID) sea 255 | |
# Se verifica si el out_addr (Starting Address a leer) es 0x0000 | |
# Se verifica si el out_val (Quantity of coils a leer) es 1 | |
# | |
# Los valores de UID, Starting Address y Quantity of coils los va sacar de la Funcion 43 (read device identificacion)... Ver el ultimo elif... | |
data = door_closed | |
elif fn_code == b"\x05" and out_val == b"\xff\x00": | |
# AQUI SE LE DA LA FLAG AL PARTICIPANTE | |
# Function code 5 (Write single coil) | |
# Se verifica si utiliza la funcion 5 y si envia el output value con valor ON (\xff\x00) | |
data = door_open | |
elif fn_code == b"\x05" and out_val == b"\x00\x00": | |
# Function code 5 (Write single coil) | |
# Aqui se verifica si utiliza la funcion 5 pero envia el output value con valor OFF (\x00\x00) | |
# Se le da un ascii cerrando (OFF) la puerta para que sepa que va por bien camino | |
data = door_has_been_closed | |
elif fn_code == b"\x05" and out_val == b"\x00\x01": | |
# Function code 5 (Write single coil) | |
# Aqui se verifica si utiliza la funcion 5 pero envia mal el output value | |
# Se le da una pista para que sepa que va por bien camino | |
data = function_code_five_hint | |
elif fn_code == b"\x05" and out_val == b"\xff\xff": | |
# Function code 5 (Write single coil) | |
# Aqui se verifica si utiliza la funcion 5 pero envia mal el output value | |
# Se le da una pista para que sepa que va por bien camino | |
data = function_code_five_hint | |
elif fn_code == b"\x2b" and mei == b"\x0e": | |
# Function code 43 (Read Device Identification) Hints via banner grabbing | |
# Hint sale con msf -> scanner/scada/modbus_banner_grabbing | |
# Object 1 | |
obj_one_s = "NULL Life PLC" | |
obj_one_l = (len(obj_one_s)).to_bytes(1, byteorder='big') | |
obj_one = bytearray([ord(x) for x in obj_one_s]) | |
# Object 2 | |
obj_two_s = "Start with function code 1" | |
obj_two_l = (len(obj_two_s)).to_bytes(1, byteorder='big') | |
obj_two = bytearray([ord(x) for x in obj_two_s]) | |
# Object 3 | |
obj_three_s = "\x00\x01\x00\x00\x00\x06\xff\x01\x00\x00\x00\x01" | |
obj_three_l = (48).to_bytes(1, byteorder='big') | |
obj_three = bytearray('\\x'.join(['%02x' % i for i in [ord(x) for x in obj_three_s]]).encode('utf-8')) | |
data = b"\x00\x01" # Transaction Identifier | |
data += b"\x00\x00" # Protocol Identifier | |
data += ( | |
10 + | |
len(obj_one_s) + | |
len(obj_two_s) + | |
int.from_bytes(obj_three_l, byteorder='big') | |
).to_bytes(2, byteorder='big') # Length | |
data += b"\xff" # Unit Identifier | |
data += b"\x2b" # 0010 1011 = Function Code: Encapsulated Interface Transport (43) | |
data += b"\x0e" # MEI Type: Read Device Identification (14) | |
data += b"\x01" # Read Device ID code: Basic Device Identification (1) | |
data += b"\x81" # Conformity level | |
data += b"\x00" # More Follows | |
data += b"\x00" # Next Object Id | |
data += b"\x03" # Number of Objets (3) | |
# List Of | |
data += b"\x00" # Object ID | |
data += obj_one_l # Object length | |
data += obj_one # Object Value | |
data += b"\x01" # Object ID | |
data += obj_two_l # Object length | |
data += obj_two # Object Value | |
data += b"\x02" # Object ID | |
data += obj_three_l # Object length | |
data += obj_three # Object Value | |
else: | |
data = b"I don't know that function code" | |
# send back to client | |
c.send(data) | |
# connection closed | |
c.close() | |
except: | |
print('Bye') | |
c.close() | |
def Main(): | |
host = "" | |
# reverse a port on your computer | |
# in our case it is 12345 but it | |
# can be anything | |
port = 65001 | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.bind((host, port)) | |
print("socket binded to port", port) | |
# put the socket into listening mode | |
s.listen(20) | |
print("socket is listening") | |
# a forever loop until client wants to exit | |
while True: | |
try: | |
# establish connection with client | |
c, addr = s.accept() | |
# lock acquired by client | |
#print_lock.acquire() | |
print('Connected to :', addr[0], ':', addr[1]) | |
# Start a new thread and return its identifier | |
start_new_thread(threaded, (c,)) | |
except KeyboardInterrupt: | |
s.close() | |
s.close() | |
if __name__ == '__main__': | |
Main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment