Last active
April 12, 2024 18:56
-
-
Save sourceperl/10288663 to your computer and use it in GitHub Desktop.
Python script for do multi-threaded ping
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# ping a list of host with threads for increase speed | |
# use standard linux /bin/ping utility | |
from threading import Thread | |
import subprocess | |
try: | |
import queue | |
except ImportError: | |
import Queue as queue | |
import re | |
# some global vars | |
num_threads = 15 | |
ips_q = queue.Queue() | |
out_q = queue.Queue() | |
# build IP array | |
ips = [] | |
for i in range(1,200): | |
ips.append("192.168.0."+str(i)) | |
# thread code : wraps system ping command | |
def thread_pinger(i, q): | |
"""Pings hosts in queue""" | |
while True: | |
# get an IP item form queue | |
ip = q.get() | |
# ping it | |
args=['/bin/ping', '-c', '1', '-W', '1', str(ip)] | |
p_ping = subprocess.Popen(args, | |
shell=False, | |
stdout=subprocess.PIPE) | |
# save ping stdout | |
p_ping_out = str(p_ping.communicate()[0]) | |
if (p_ping.wait() == 0): | |
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms | |
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms', | |
p_ping_out, re.M|re.I) | |
ping_rtt = search.group(2) | |
out_q.put("OK " + str(ip) + " rtt= "+ ping_rtt) | |
# update queue : this ip is processed | |
q.task_done() | |
# start the thread pool | |
for i in range(num_threads): | |
worker = Thread(target=thread_pinger, args=(i, ips_q)) | |
worker.setDaemon(True) | |
worker.start() | |
# fill queue | |
for ip in ips: | |
ips_q.put(ip) | |
# wait until worker threads are done to exit | |
ips_q.join() | |
# print result | |
while True: | |
try: | |
msg = out_q.get_nowait() | |
except queue.Empty: | |
break | |
print(msg) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# ping a list of host with threads for increase speed | |
# design to use data from/to SQL database | |
# use standard linux /bin/ping utility | |
from threading import Thread | |
import mysql.connector | |
import subprocess | |
try: | |
import queue | |
except ImportError: | |
import Queue as queue | |
import time | |
import re | |
# some global vars | |
num_threads = 30 | |
ips_q = queue.Queue() | |
out_q = queue.Queue() | |
# thread code : wraps system ping command | |
def thread_pinger(i, q): | |
"""Pings hosts in queue""" | |
while True: | |
# get an IP item form queue | |
item = q.get() | |
# ping it | |
args=['/bin/ping', '-c', '1', '-W', str(item['timeout']), | |
str(item['ip'])] | |
p_ping = subprocess.Popen(args, | |
shell=False, | |
stdout=subprocess.PIPE) | |
# save ping stdout | |
p_ping_out = str(p_ping.communicate()[0]) | |
# ping return 0 if up | |
if (p_ping.wait() == 0): | |
# rtt min/avg/max/mdev = 22.293/22.293/22.293/0.000 ms | |
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms', | |
p_ping_out, re.M|re.I) | |
item['up'] = True | |
item['rtt'] = search.group(2) | |
else: | |
item['up'] = False | |
# update output queue | |
out_q.put(item) | |
# update queue : this ip is processed | |
q.task_done() | |
# start the thread pool | |
for i in range(num_threads): | |
worker = Thread(target=thread_pinger, args=(i, ips_q)) | |
worker.setDaemon(True) | |
worker.start() | |
# build IP array | |
ips = [] | |
for i in range(1,200): | |
ips.append("192.168.0."+str(i)) | |
# main loop | |
while True: | |
# retreive data from DB | |
# add SQL here | |
# test start time | |
start = time.time() | |
# fill queue | |
for ip in ips: | |
ips_q.put({'ip': ip, 'timeout': 1}) | |
# wait until worker threads are done to exit | |
ips_q.join() | |
# display result | |
print("next:") | |
while True: | |
try: | |
msg = out_q.get_nowait() | |
except queue.Empty: | |
break | |
if msg['up']: | |
print(msg) | |
# test start end | |
end = time.time() | |
loop_time = round(end - start, 2) | |
print("loop time: %s" % (loop_time)) | |
# update DB | |
#add SQL here | |
# wait 5s before next cycle | |
time.sleep(5.0) |
Thanks for this script @sourceperl
For those asking about Windows, you change the subprocess args like this:
args = ["PING.EXE", "-n", "1", "-w", "1", str(ip)]
Also replace this block of code with the following:
if p_ping.wait() == 0:
# Minimum = 1ms, Maximum = 1ms, Average = 1ms
search = re.search(
"Minimum = (.*)ms, Maximum = (.*)ms, Average = (.*)ms",
p_ping_out,
re.M | re.I,
)
ping_rtt = search.group(3)
out_q.put(f"OK {str(ip)} rtt (avg)={ping_rtt}ms")
Thanks @adrianyorke. I resorted to using: https://github.com/digineo/go-ping/tree/master/cmd/ping-test for Windows.
Hello @sourceperl, thank's for the code, i'm coding this
from threading import Thread, Lock
_db_lock = Lock()
import threading, time
import subprocess
import queue
import re
from redistimeseries.client import Client
from redis import StrictRedis, ConnectionError
import json
import sys
import os
import time, json
# some global vars
num_threads = 15
ips_q = queue.Queue()
out_qUp = queue.Queue()
out_qLow = queue.Queue()
ipRedis = '192.168.1.100'
def decode_redis(src):
if isinstance(src, list):
rv = list()
for key in src:
rv.append(decode_redis(key))
return rv
elif isinstance(src, dict):
rv = dict()
for key in src:
rv[key.decode()] = decode_redis(src[key])
return rv
elif isinstance(src, bytes):
return src.decode()
else:
raise Exception("type not handled: " +type(src))
def doQuerySUBS():
global r
subs = []
json_datos = {}
band = 0
print("Get data subscribers...")
data = decode_redis(r.hgetall('infraYI'))
if data == {}:
data = 'NO'
else:
data = json.loads(data['data'])
pass
for sub in range(len(data)):
if data[sub][0] != '' and data[sub][9] != '' and data[sub][10] != '' and data[sub][11] != '':
subs.append((data[sub][0], data[sub][1], data[sub][2], data[sub][3],
data[sub][4], data[sub][5], data[sub][6], data[sub][7],
data[sub][8], data[sub][9], data[sub][10], data[sub][11],
data[sub][12], data[sub][13], data[sub][14], data[sub][15]))
else:
pass
return subs
try:
rts = Client(host=ipRedis,port=6379,socket_keepalive=True,retry_on_timeout=True)
except Exception as e:
print(e)
try:
r = StrictRedis(host=ipRedis,port=6379,db=0,health_check_interval=30,socket_keepalive=True)
except Exception as e:
print(e)
# thread code : wraps system ping command
def thread_pinger(i, q):
"""Pings hosts in queue"""
p_ping_outS = []
while True:
# get an IP item form queue
ip = q.get()
# ping it
args=['/bin/ping', '-c', '1', '-W', '1', str(ip)]
p_ping = subprocess.Popen(args, close_fds=True, shell=False, stdout=subprocess.PIPE)
# save ping stdout
p_ping_out = p_ping.communicate()[0].decode('utf-8')
######## DEVICES UṔ ########
if (p_ping.wait() == 0):
search = re.search(r'rtt min/avg/max/mdev = (.*)/(.*)/(.*)/(.*) ms',p_ping_out, re.M|re.I)
ping_rtt = search.group(2)
out_qUp.put("UP " + str(ip) + " rtt= "+ ping_rtt+' ms')
try:
rts.add(str(ip), int(time.time()), float(ping_rtt))
print(out_qUp.get_nowait())
except Exception as e:
print("[ERROR IP UP]########### rts.create "+str(e))
#print(out_qUp.get_nowait())
######## DEVICES DOWN ########
if (p_ping.wait() != 0):
p_ping_outS = p_ping_out.split(' ')
try:
rts.add(str(p_ping_outS[1]), int(time.time()), float(0.0))
print("DOWN "+str(p_ping_outS[1])+' 0.0'+' ms')
except Exception as e:
if len(p_ping_outS) > 1:
print("[ERROR IP DOWN]########### rts.create "+str(e))
else:
pass
time.sleep(1)
# update queue : this ip is processed
q.task_done()
class Listener1(threading.Thread):
def __init__(self, r, channels):
threading.Thread.__init__(self)
self.redis,self.init = r,0
self.pubsub = self.redis.pubsub()
print('Listener1...')
try:
self.pubsub.subscribe(channels)
except Exception as e:
print(e)
def work(self):
try:
ips = []
ipSubs = doQuerySUBS()
for x in range(0,len(ipSubs)):
ips.append(ipSubs[x][9])
#start the thread pool
for i in range(num_threads):
worker = Thread(target=thread_pinger, args=(i, ips_q))
worker.setDaemon(True)
worker.start()
# fill queue
for ip in ips:
ips_q.put(ip)
# wait until worker threads are done to exit
ips_q.join()
except Exception as e:
print(e)
time.sleep(5)
def run(self):
while True:
try:
self.work()
except ConnectionError:
print('[lost connection]')
while True:
print('trying to reconnect...')
try:
self.redis.ping()
except ConnectionError:
time.sleep(10)
else:
self.pubsub.subscribe(['last_session'])
break
time.sleep(0.001) # be nice to the system :)
client = Listener1(r, ['last_session','LT01TP0LT'])
client.start()
.... for making your example th_pinger.py
every 5 seconds, but after minutes appear this error:
OSError: [Errno 24] Too many open files
File "/usr/lib/python3.8/subprocess.py", line 1605, in _execute_child
errpipe_read, errpipe_write = os.pipe()
OSError: [Errno 24] Too many open files
.... you know why occur it?
Unfortunately not, do you have the same problem with basic code like this one: https://gist.github.com/sourceperl/0ef3719e8fef2c95d98c590ff1e7cefd ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
hello, how would i go about modifying it for windows?
also, what is the difference in using queue here rather than async?
thanks