import subprocess import re import sys import time import ipaddress from concurrent.futures import ThreadPoolExecutor, as_completed # Windows: prevent subprocess from opening visible console windows _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 def _scan_with_scapy(network): """Scan using scapy (requires root/sudo, and Npcap on Windows).""" from scapy.all import ARP, Ether, srp arp = ARP(pdst=str(network)) ether = Ether(dst="ff:ff:ff:ff:ff:ff") packet = ether / arp result = srp(packet, timeout=3, verbose=0)[0] devices = [] for sent, received in result: devices.append({ "ip": received.psrc, "mac": received.hwsrc }) return devices def _ping_one(ip, is_win): """Ping a single IP to populate ARP table.""" try: if is_win: subprocess.run( ["ping", "-n", "1", "-w", "600", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=3, creationflags=_NO_WINDOW ) else: subprocess.run( ["ping", "-c", "1", "-W", "1", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=3 ) except Exception: pass def _ping_sweep(network, progress_cb=None): """Ping all IPs in network concurrently to populate ARP table. Calls progress_cb(done, total) after each ping completes if provided. """ net = ipaddress.ip_network(network, strict=False) # Only ping sweep for /24 or smaller to avoid flooding if net.num_addresses > 256: return is_win = sys.platform == "win32" hosts = list(net.hosts()) total = len(hosts) done_count = [0] def _ping_and_track(ip): _ping_one(ip, is_win) done_count[0] += 1 if progress_cb: progress_cb(done_count[0], total) with ThreadPoolExecutor(max_workers=70) as executor: futures = [executor.submit(_ping_and_track, ip) for ip in hosts] for f in as_completed(futures): pass def _scan_with_arp_table(network): """Fallback: read ARP table using system 'arp -a' (no root needed). Supports both macOS and Windows output formats. """ # Ping sweep first to populate ARP table with active devices _ping_sweep(network) # Brief pause to let the OS finalize ARP cache entries time.sleep(1) try: output = subprocess.check_output( ["arp", "-a"], text=True, creationflags=_NO_WINDOW ) except Exception: return [] devices = [] net = ipaddress.ip_network(network, strict=False) if sys.platform == "win32": # Windows format: # 192.168.4.1 cc-2d-21-a5-85-b0 dynamic pattern = re.compile( r"(\d+\.\d+\.\d+\.\d+)\s+" r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-" r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+" r"(dynamic|static)", re.IGNORECASE ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str = m.group(1) # Convert Windows MAC format (cc-2d-21-a5-85-b0) to standard (cc:2d:21:a5:85:b0) mac = m.group(2).replace("-", ":") if mac.upper() != "FF:FF:FF:FF:FF:FF": try: if ipaddress.ip_address(ip_str) in net: devices.append({"ip": ip_str, "mac": mac}) except ValueError: pass else: # macOS/Linux format: # ? (192.168.1.1) at aa:bb:cc:dd:ee:ff on en0 pattern = re.compile( r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)" ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str, mac = m.group(1), m.group(2) if mac.lower() != "(incomplete)" and mac != "ff:ff:ff:ff:ff:ff": try: if ipaddress.ip_address(ip_str) in net: devices.append({"ip": ip_str, "mac": mac}) except ValueError: pass return devices def scan_network(network, progress_cb=None, stage_cb=None): """Scan network: ping sweep first, then merge scapy ARP + arp table.""" # Phase 1: Ping sweep — wake up devices and populate ARP cache if stage_cb: stage_cb("ping") _ping_sweep(network, progress_cb) time.sleep(1) # Collect results from both methods and merge by IP seen = {} # ip -> device dict # Phase 2: ARP table (populated by ping sweep above) if stage_cb: stage_cb("arp") try: output = subprocess.check_output( ["arp", "-a"], text=True, creationflags=_NO_WINDOW ) net = ipaddress.ip_network(network, strict=False) if sys.platform == "win32": pattern = re.compile( r"(\d+\.\d+\.\d+\.\d+)\s+" r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-" r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+" r"(dynamic|static)", re.IGNORECASE ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str = m.group(1) mac = m.group(2).replace("-", ":") if mac.upper() != "FF:FF:FF:FF:FF:FF": try: if ipaddress.ip_address(ip_str) in net: seen[ip_str] = {"ip": ip_str, "mac": mac} except ValueError: pass else: pattern = re.compile( r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)" ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str, mac = m.group(1), m.group(2) if mac.lower() not in ("(incomplete)", "ff:ff:ff:ff:ff:ff"): try: if ipaddress.ip_address(ip_str) in net: seen[ip_str] = {"ip": ip_str, "mac": mac} except ValueError: pass except Exception: pass # Phase 3: scapy ARP scan (if Npcap available) — fills in any gaps if stage_cb: stage_cb("scapy") try: import io, os _stderr = sys.stderr sys.stderr = io.StringIO() try: from scapy.all import ARP, Ether, srp finally: sys.stderr = _stderr arp = ARP(pdst=str(network)) ether = Ether(dst="ff:ff:ff:ff:ff:ff") result = srp(ether / arp, timeout=2, verbose=0)[0] for sent, received in result: ip = received.psrc if ip not in seen: seen[ip] = {"ip": ip, "mac": received.hwsrc} except Exception: pass return sorted(seen.values(), key=lambda d: ipaddress.ip_address(d["ip"]))