Skip to content

Instantly share code, notes, and snippets.

@ailinykh
Last active October 8, 2025 07:25
Show Gist options
  • Select an option

  • Save ailinykh/36a3c4fb0234aa1d173b4559121907fa to your computer and use it in GitHub Desktop.

Select an option

Save ailinykh/36a3c4fb0234aa1d173b4559121907fa to your computer and use it in GitHub Desktop.
arping
import logging
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging import Logger
from typing import List, Tuple
from datetime import datetime, timezone
def get_logger() -> logging.Logger:
logger = logging.getLogger(__name__)
log_formatter = logging.Formatter("%(asctime)s [%(levelname)-5.5s] %(message)s")
logger.setLevel(logging.DEBUG)
# write to stdout
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# write to file
timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d_%H%M%S")
file_handler = logging.FileHandler(f"logs/ip_{timestamp}.log")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
return logger
def get_ip_addresses() -> List[str]:
ips = []
# это моя домашняя подсеть микротика 192.168.88.0/24
# у тебя здесь должно быть 0-255, то есть `range(256)`
for i in range(88, 89):
for j in range(256):
ips.append(f'192.168.{i}.{j}') # 192.168.88.0 -> 192.168.88.9
return ips
def get_arping(ip: str, logger: Logger) -> Tuple[str, bool]:
result = subprocess.Popen([
'arping', '-c', '3', ip
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.read().strip().decode('utf-8')
stderr = result.stderr.read().strip().decode('utf-8')
# на всякий случай прикопаем в логи всё, что выдал нам arping
logger.debug('stdout for %s: %s', ip, stdout)
logger.debug('stderr for %s: %s', ip, stderr)
'''
ARPING 192.168.88.1
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=0 time=36.133 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=1 time=503.349 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=2 time=5.833 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=3 time=2.486 msec
--- 192.168.88.1 statistics ---
3 packets transmitted, 4 packets received, 0% unanswered (1 extra)
rtt min/avg/max/std-dev = 2.486/136.950/503.349/211.946 ms
'''
parts = stdout.splitlines()
# if len(parts) < 7:
# logger.info('%s %s', ip, parts)
# logger.error('unexpected answer from %s: %s', ip, stdout)
# если девайс не ответил, строчки про `rtt` не будет
return ip, parts[-1].startswith('rtt ')
def check_sync(ips: List[str], logger: Logger):
for ip in ips:
device, answered = get_arping(
ip=ip,
logger=logger
)
logger.info('device %s %s', device, 'answered' if answered else 'not answered')
def check_async(ips: List[str], logger: Logger):
promises = []
with ThreadPoolExecutor(max_workers=None) as executor:
for ip in ips:
promises.append(executor.submit(get_arping, ip, logger))
results = as_completed(promises)
for result in results:
device, answered = result.result()
logger.info('device %s %s', device, 'answered' if answered else 'not answered')
def main():
logger = get_logger()
ips = get_ip_addresses()
logger.info('got %d ips', len(ips))
logger.debug(ips)
# start_time = time.time()
# check_sync(ips=ips, logger=logger)
# end_time = time.time()
# execution_time = end_time - start_time
# logger.info('sync check took: %.2f seconds', execution_time)
start_time = time.time()
check_async(ips=ips, logger=logger)
end_time = time.time()
execution_time = end_time - start_time
logger.info('async check took: %.2f seconds', execution_time)
if __name__ == '__main__':
main()
import logging
import os.path
import re
import sqlite3
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging import Logger
from typing import List, Tuple, Optional
from datetime import datetime, timezone
def get_logger() -> logging.Logger:
logger = logging.getLogger(__name__)
log_formatter = logging.Formatter("%(asctime)s [%(levelname)-5.5s] %(message)s")
logger.setLevel(logging.DEBUG)
# write to stdout
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# write to file
timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d_%H%M%S")
file_handler = logging.FileHandler(f"logs/ip_{timestamp}.log")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
return logger
def get_ip_addresses() -> List[str]:
ips = []
# это моя домашняя подсеть микротика 192.168.88.0/24
# у тебя здесь должно быть 0-255, то есть `range(256)`
for i in range(88, 89):
for j in range(256):
ips.append(f'192.168.{i}.{j}')
return ips
def get_arping(ip: str, logger: Logger) -> Tuple[str, Optional[str]]:
result = subprocess.Popen([
'arping', '-c', '3', ip
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = result.stdout.read().strip().decode('utf-8')
stderr = result.stderr.read().strip().decode('utf-8')
# на всякий случай прикопаем в логи всё, что выдал нам arping
logger.debug('stdout for %s: %s', ip, stdout)
logger.debug('stderr for %s: %s', ip, stderr)
'''
ARPING 192.168.88.1
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=0 time=36.133 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=1 time=503.349 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=2 time=5.833 msec
42 bytes from dc:2c:6e:f6:66:6e (192.168.88.1): index=3 time=2.486 msec
--- 192.168.88.1 statistics ---
3 packets transmitted, 4 packets received, 0% unanswered (1 extra)
rtt min/avg/max/std-dev = 2.486/136.950/503.349/211.946 ms
'''
mac_match = re.search(r' from (\S+)', stdout)
return (ip, mac_match.group(1)) if mac_match else (ip, None)
def check_async(ips: List[str], logger: Logger) -> List[Tuple[str, str]]:
promises = []
with ThreadPoolExecutor(max_workers=None) as executor:
for ip in ips:
promises.append(executor.submit(get_arping, ip, logger))
results = as_completed(promises)
answered = []
for result in results:
device, mac = result.result()
logger.info('device %s %s', device, mac if mac else 'not answered')
if mac:
answered.append((device, mac))
return answered
def save_collected_ips(
answered: List[Tuple[str, str]],
database_name: str = 'mydb.db',
):
if os.path.exists(database_name):
os.remove(database_name)
conn = sqlite3.connect(database_name)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE mytable
(id INTEGER PRIMARY KEY, IP TEXT, MAC TEXT)''')
cursor.executemany('INSERT INTO mytable (IP, MAC) VALUES (?, ?)', answered)
conn.commit()
conn.close()
# sqlite3 mydb.db "SELECT * FROM mytable;"
def main():
logger = get_logger()
ips = get_ip_addresses()
logger.info('got %d ips', len(ips))
logger.debug(ips)
start_time = time.time()
save_collected_ips(
answered=check_async(ips=ips, logger=logger),
)
end_time = time.time()
execution_time = end_time - start_time
logger.info('async check took: %.2f seconds', execution_time)
if __name__ == '__main__':
main()
import logging
import os.path
import re
import sqlite3
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from logging import Logger
from typing import List, Tuple, Set
from datetime import datetime, timezone
def get_logger() -> logging.Logger:
logger = logging.getLogger(__name__)
log_formatter = logging.Formatter("%(asctime)s [%(levelname)-5.5s] %(message)s")
logger.setLevel(logging.DEBUG)
# write to stdout
console_handler = logging.StreamHandler(stream=sys.stdout)
console_handler.setFormatter(log_formatter)
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
# write to file
timestamp = datetime.now(tz=timezone.utc).strftime("%Y%m%d_%H%M%S")
file_handler = logging.FileHandler(f"logs/ip_{timestamp}.log")
file_handler.setFormatter(log_formatter)
file_handler.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
return logger
def get_arping(ip: str, logger: Logger) -> Tuple[str, Set[str]]:
result = subprocess.Popen(
["arping", "-c", "3", ip],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout = result.stdout.read().strip().decode("utf-8")
stderr = result.stderr.read().strip().decode("utf-8")
# на всякий случай прикопаем в логи всё, что выдал нам arping
logger.debug("stdout for %s: %s", ip, stdout)
logger.debug("stderr for %s: %s", ip, stderr)
mac_match = re.findall(r" from (\S+)", stdout)
return (ip, set(mac_match)) if mac_match else (ip, set())
def check_async(ips: List[str], logger: Logger) -> List[Tuple[str, Set[str]]]:
promises = []
with ThreadPoolExecutor(max_workers=None) as executor:
[promises.append(executor.submit(get_arping, ip, logger)) for ip in ips]
results = as_completed(promises)
return [
(device, macs)
for result in results
for (device, macs) in [result.result()]
if macs
]
def save_collected_ips(
collected: List[Tuple[str, Set[str]]],
database_name: str = "mydb.db",
):
if os.path.exists(database_name):
os.remove(database_name)
conn = sqlite3.connect(database_name)
cursor = conn.cursor()
cursor.execute("""CREATE TABLE mytable
(id INTEGER PRIMARY KEY, IP TEXT, MAC TEXT)""")
cursor.executemany(
"INSERT INTO mytable (IP, MAC) VALUES (?, ?)",
[(ip, mac) for (ip, macs) in collected for mac in macs],
)
conn.commit()
conn.close()
# sqlite3 mydb.db "SELECT * FROM mytable;"
def get_devices_for_subnet(
subnet: int,
logger: Logger,
) -> List[Tuple[str, Set[str]]]:
logger.info("subnet 192.168.%d.0/16 lookup", subnet)
ips = [f"192.168.{subnet}.{i}" for i in range(256)]
start_time = time.time()
answered = check_async(ips=ips, logger=logger)
end_time = time.time()
execution_time = end_time - start_time
logger.info("%d devices answered for %.2f seconds", len(answered), execution_time)
return answered
def main():
logger = get_logger()
subnets = range(88, 89)
collected = []
[
collected.extend(
get_devices_for_subnet(
subnet=subnet,
logger=logger,
)
)
for subnet in subnets
]
def to_binary(ip):
octets = [int(o) for o in ip.split(".", 3)]
return ("{:08b}" * 4).format(*octets)
save_collected_ips(
collected=sorted(collected, key=lambda a: to_binary(a[0])),
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment