import subprocess import re import sys import time import ipaddress from concurrent.futures import ThreadPoolExecutor, as_completed # Windows: prevent subprocess from opening visible console windows _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 def _ping_one(ip, is_win): """Ping a single IP to populate ARP table.""" try: if is_win: subprocess.run( ["ping", "-n", "1", "-w", "300", str(ip)], # 300ms (was 600ms) stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2, creationflags=_NO_WINDOW ) elif sys.platform == "darwin": subprocess.run( ["ping", "-c", "1", "-W", "500", str(ip)], # 500ms — macOS: -W unit là ms stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) else: subprocess.run( ["ping", "-c", "1", "-W", "1", str(ip)], # 1s — Linux: -W unit là giây stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) except Exception: pass def _ping_sweep(network, progress_cb=None): """Ping tất cả host trong network đồng thời để điền ARP cache. Gọi progress_cb(done, total) sau mỗi ping nếu được cung cấp. """ net = ipaddress.ip_network(network, strict=False) # Chỉ ping sweep cho /24 hoặc nhỏ hơn if net.num_addresses > 256: return is_win = sys.platform == "win32" hosts = list(net.hosts()) total = len(hosts) done_count = [0] def _ping_and_track(ip): _ping_one(ip, is_win) done_count[0] += 1 if progress_cb: progress_cb(done_count[0], total) # Tất cả host cùng lúc — loại bỏ overhead batching (was max_workers=100) with ThreadPoolExecutor(max_workers=len(hosts)) as executor: futures = [executor.submit(_ping_and_track, ip) for ip in hosts] for f in as_completed(futures): pass def scan_network(network, progress_cb=None, stage_cb=None): """Scan network: ping sweep → ARP table + Scapy song song.""" # Phase 1: Ping sweep if stage_cb: stage_cb("ping") _ping_sweep(network, progress_cb) time.sleep(0.3) # Giảm từ 1s xuống 0.3s # Phase 2 + 3: ARP table và Scapy chạy song song if stage_cb: stage_cb("arp") def _collect_arp(): result = {} try: output = subprocess.check_output( ["arp", "-a"], text=True, creationflags=_NO_WINDOW ) net = ipaddress.ip_network(network, strict=False) if sys.platform == "win32": pattern = re.compile( r"(\d+\.\d+\.\d+\.\d+)\s+" r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-" r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+" r"(dynamic|static)", re.IGNORECASE ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str = m.group(1) mac = m.group(2).replace("-", ":") if mac.upper() != "FF:FF:FF:FF:FF:FF": try: if ipaddress.ip_address(ip_str) in net: result[ip_str] = {"ip": ip_str, "mac": mac} except ValueError: pass else: pattern = re.compile( r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)" ) for line in output.splitlines(): m = pattern.search(line) if m: ip_str, mac = m.group(1), m.group(2) if mac.lower() not in ("(incomplete)", "ff:ff:ff:ff:ff:ff"): try: if ipaddress.ip_address(ip_str) in net: result[ip_str] = {"ip": ip_str, "mac": mac} except ValueError: pass except Exception: pass return result def _collect_scapy(): result = {} try: import io _stderr = sys.stderr sys.stderr = io.StringIO() try: from scapy.all import ARP, Ether, srp finally: sys.stderr = _stderr arp = ARP(pdst=str(network)) ether = Ether(dst="ff:ff:ff:ff:ff:ff") raw = srp(ether / arp, timeout=1, verbose=0)[0] # Giảm từ 2s xuống 1s for _, received in raw: result[received.psrc] = {"ip": received.psrc, "mac": received.hwsrc} except Exception: pass return result # Chạy ARP read và Scapy đồng thời with ThreadPoolExecutor(max_workers=2) as executor: f_arp = executor.submit(_collect_arp) f_scapy = executor.submit(_collect_scapy) seen = f_arp.result() # Scapy bổ sung những IP chưa có trong ARP table for ip, dev in f_scapy.result().items(): if ip not in seen: seen[ip] = dev return sorted(seen.values(), key=lambda d: ipaddress.ip_address(d["ip"]))