Created
February 18, 2023 15:02
-
-
Save oyi77/7ce8dbf1918586126c58941f6b12b256 to your computer and use it in GitHub Desktop.
Modem Pool Communication
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from serial.tools import list_ports | |
import serial | |
import os, sys, inspect | |
from loguru import logger as logging | |
import datetime | |
import re | |
import json | |
import time | |
from gsmmodem.modem import GsmModem | |
from time import sleep | |
from pyin import * | |
# PENTING DAN OJOK DIUBAH | |
proj_path = resource_path() + "/" | |
if proj_path not in sys.path: | |
sys.path.insert(0, proj_path) | |
portList_file = proj_path+'datastore/portList.json' | |
simList_file = proj_path+'datastore/simList.json' | |
portList = [] | |
portListTemp = [] | |
simList = [] | |
simListTemp = [] | |
portDetail = {"portLoc": "", "status": ""} | |
simDetail = {"nomer": "", "pulsa": "", "port": "", "waktu": ""} | |
def _connect_modem(proty, mode="init"): | |
try: | |
modem = GsmModem(proty, 115200) | |
modem.connect(None) | |
modem.waitForNetworkCoverage(10) | |
portDetail = {"portLoc": proty, "status": "good"} | |
portListTemp.append(portDetail.copy()) | |
return modem | |
except Exception as e: | |
portDetail = {"portLoc": proty, "status": "error"} | |
portListTemp.append(portDetail.copy()) | |
logging.warning("[" + proty + "][CONNECT MODEM FAILED] " + str(e)) | |
return False | |
def _connect_modem2(proty, mode="init"): | |
try: | |
s = serial.Serial(port=proty, baudrate=115200, timeout=5) | |
s.write("ATZ\r".encode()) | |
s.write("AT+CPIN?\r".encode()) | |
res = s.readall() | |
res = res.decode() | |
res = res.replace(" ", "") | |
if "ERROR" in res: | |
portDetail = {"portLoc": proty, "status": "error"} | |
portListTemp.append(portDetail.copy()) | |
s.close() | |
return False | |
else: | |
if mode == "init": | |
delSMS(s) | |
portDetail = {"portLoc": proty, "status": "good"} | |
portListTemp.append(portDetail.copy()) | |
return s | |
except Exception as e: | |
logging.error("["+proty+"][CONNECT MODEM FAILED] "+str(e)) | |
return False | |
def sendUSSD(ser,ussd): | |
""" Kirim Message """ | |
try: | |
waktu = datetime.datetime.now() | |
format_waktu = waktu.strftime('%Y-%m-%d %H:%M:%S') | |
balesan = ser.sendUssd(ussd) | |
needed_answer = balesan.message.replace(" ","") | |
x = re.search("(\+62|62|0)8[1-9][0-9]{6,9}", needed_answer) | |
y = re.search(r"PulsaRp(.*?)s/d", needed_answer) | |
ser.close() | |
balesan = {"nomer": x.group(), "pulsa": y.group(1), "waktu": str(format_waktu)} | |
return balesan | |
except Exception as e: | |
logging.error(f"Send Ussd failed: {e}") | |
return False | |
def readSMS(ser): | |
""" Read Message """ | |
try: | |
waktu = datetime.datetime.now() | |
format_waktu = waktu.strftime('%Y-%m-%d %H:%M:%S') | |
srlcmd = 'AT+CMGF=1\r' | |
ser.write(srlcmd.encode()) | |
srlcmd = 'AT+CMGL="ALL"\r' | |
ser.write(srlcmd.encode()) | |
balesan = (ser.readall()).decode() | |
needed_answer = balesan.replace(" ","") | |
y = re.search(r"verifikasi:(.*?)\(berlaku", needed_answer) | |
ser.close() | |
balesan = y.group(1) | |
return balesan | |
except Exception as e: | |
logging.error("Send Message Error: "+str(e)) | |
return False | |
def delSMS(ser,port=""): | |
try: | |
if port != "": | |
ser = _connect_modem2(port, "otp") | |
ser.write('AT+CMGF=0\r\n'.encode()) | |
ser.write('AT+CMGD=0,4\r\n'.encode()) | |
ser.write('AT+CMGF=1\r\n'.encode()) | |
balesan = (ser.readall()).decode() | |
return balesan | |
except Exception as e: | |
logging.error("Delete message failed"+str(e)) | |
return False | |
def getInfo(port, mutex=None): | |
balesan = "" | |
try: | |
s = _connect_modem(port, "init") | |
if s: | |
balesan = sendUSSD(s,"*888#") | |
if balesan is not False: | |
balesan["port"] = port | |
balesan["status"] = "good" | |
#balesan["waktu"] = "" | |
simDetail = balesan | |
mutex.acquire(1) | |
simListTemp.append(simDetail.copy()) | |
mutex.release() | |
else: | |
balesan = "No simcard available/Error in Port: " + port | |
raise Exception(balesan) | |
except Exception as e: | |
logging.error("[-] ERROR GET CARD INFO : " + str(e) + " [-]") | |
pass | |
finally: | |
return balesan | |
def getOtp(port): | |
s = _connect_modem2(port, "otp") | |
if s: | |
balesan = readSMS(s) | |
updByNum(port,"0",balesan) | |
return balesan | |
else: | |
return False | |
def find_all_port(): | |
all_port_tuples = list(list_ports.comports()) | |
all_ports = [] | |
for port in all_port_tuples: | |
if "Exar" in port.manufacturer and "UART" in port.description: | |
if port.device.startswith("COM") or port.device.startswith("ttyACM"): | |
all_ports.append({"portLoc": port.device, "status": ""}) | |
if len(all_ports) == 0: | |
logging.error("No valid port detected!. Possibly, device not plugged/detected.") | |
pass | |
return all_ports | |
def _load_exPort(): | |
f = open (portList_file, "r") | |
data = json.loads(f.read()) | |
f.close() | |
return data | |
@logging.catch | |
def save_exPort(data): | |
dataL = [] | |
try: | |
for s_xdata in data: | |
dataL.append(s_xdata.copy()) | |
with open(portList_file, 'w') as outfile: | |
a = json.dumps(dataL) | |
outfile.write(a) | |
return True | |
except Exception as e: | |
logging.error("[-] ERROR SAVING PORT LIST : " + str(e) + " [-]") | |
return False | |
@logging.catch | |
def save_sim(data): | |
dataL = [] | |
try: | |
for s_xdata in data: | |
dataL.append(s_xdata.copy()) | |
with open(simList_file, 'w') as outfile: | |
a = json.dumps(dataL) | |
outfile.write(a) | |
return True | |
except Exception as e: | |
logging.error("[-] ERROR SAVING SIM LIST : " + str(e) + " [-]") | |
return False | |
@logging.catch | |
def save_sim_update(data): | |
dataL = [] | |
try: | |
for s_xdata in data: | |
dataL.append(s_xdata.copy()) | |
with open(simList_file, 'w') as outfile: | |
a = json.dumps(dataL) | |
outfile.write(a) | |
return True | |
except Exception as e: | |
logging.error("[-] ERROR SAVING SIM LIST : " + str(e) + " [-]") | |
return False | |
def chkByNum(port,numb=""): | |
try: | |
f = open (simList_file, "r") | |
datas = json.loads(f.read()) | |
f.close() | |
for data in datas: | |
if port != "0": | |
if data["port"] == str(port): | |
return data | |
else: | |
if data["nomer"] == str(numb): | |
return data | |
except Exception as e: | |
logging.error("Checking Simcard by Number Failed : "+str(e)) | |
return False | |
def chkFileAge(): | |
x=os.stat(portList_file) | |
Result=(time.time()-x.st_mtime) | |
return Result | |
def showAllSim(): | |
try: | |
f = open (simList_file, "r") | |
datas = json.loads(f.read()) | |
f.close() | |
return datas | |
except Exception as e: | |
logging.error("Get Simcard database failed : "+str(e)) | |
return False | |
def updByNum(port,numb="",otp=""): | |
try: | |
f = open (simList_file, "r") | |
datas = json.loads(f.read()) | |
f.close() | |
for data in datas: | |
if port != "0": | |
if data["port"] == str(port): | |
if isinstance(otp, str): | |
data["otp"] = otp | |
data["status"] = "good" | |
else: | |
data["otp"] = otp | |
data["status"] = "blacklist" | |
else: | |
if data["nomer"] == str(numb): | |
data["otp"] = otp | |
data["status"] = "good" | |
simList = datas | |
save_sim(simList) | |
except Exception as e: | |
logging.error("Checking by number failed : "+str(e)) | |
return False | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment