import subprocess import sys import ipaddress import threading from concurrent.futures import ThreadPoolExecutor, as_completed # Windows: prevent subprocess from opening visible console windows _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 # Concurrent ping workers per platform _MAX_WORKERS_WIN = 50 _MAX_WORKERS_OTHER = 64 def _ping_one(ip, is_win): """Ping a single IP. Returns True if host responds.""" try: if is_win: # Capture stdout to check for TTL= — more reliable than returncode # on Windows (returncode can be 0 even for "Destination unreachable") r = subprocess.run( ["ping", "-n", "1", "-w", "500", str(ip)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=3, creationflags=_NO_WINDOW ) return r.returncode == 0 and b"TTL=" in r.stdout elif sys.platform == "darwin": r = subprocess.run( ["ping", "-c", "1", "-W", "300", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) else: r = subprocess.run( ["ping", "-c", "1", "-W", "1", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) return r.returncode == 0 except Exception: return False def _ping_sweep(network, progress_cb=None): """Ping tất cả host trong network đồng thời. Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping. """ net = ipaddress.ip_network(network, strict=False) if net.num_addresses > 256: return [] is_win = sys.platform == "win32" hosts = list(net.hosts()) total = len(hosts) workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts)) done_count = [0] lock = threading.Lock() alive = [] def _ping_and_track(ip): ok = _ping_one(ip, is_win) with lock: done_count[0] += 1 current = done_count[0] if progress_cb: progress_cb(current, total) return (str(ip), ok) with ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(_ping_and_track, ip) for ip in hosts] for f in as_completed(futures): ip_str, ok = f.result() if ok: alive.append(ip_str) return alive def scan_network(network, progress_cb=None, stage_cb=None): """Scan network: chỉ dùng ping để xác định thiết bị online.""" if stage_cb: stage_cb("ping") alive_ips = _ping_sweep(network, progress_cb) results = [{"ip": ip_str, "mac": "N/A"} for ip_str in alive_ips] return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))