import re import subprocess import sys import ipaddress import threading from concurrent.futures import ThreadPoolExecutor, as_completed # Windows: prevent subprocess from opening visible console windows _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 # Concurrent ping workers per platform _MAX_WORKERS_WIN = 50 _MAX_WORKERS_OTHER = 64 def _ping_one(ip, is_win): """Ping a single IP. Returns True if host responds.""" try: if is_win: # Capture stdout to check for TTL= — more reliable than returncode # on Windows (returncode can be 0 even for "Destination unreachable") r = subprocess.run( ["ping", "-n", "1", "-w", "500", str(ip)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=3, creationflags=_NO_WINDOW ) return r.returncode == 0 and b"TTL=" in r.stdout elif sys.platform == "darwin": r = subprocess.run( ["ping", "-c", "1", "-W", "300", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) else: r = subprocess.run( ["ping", "-c", "1", "-W", "1", str(ip)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2 ) return r.returncode == 0 except Exception: return False def _get_mac_from_arp(ip): """ Lấy MAC address của một IP qua ARP table. Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP cache vừa được OS cập nhật với thông tin mới nhất (không bị stale). Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được. """ try: if sys.platform == "win32": # arp -a → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic" r = subprocess.run( ["arp", "-a", str(ip)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=3, creationflags=_NO_WINDOW ) output = r.stdout.decode(errors="ignore") # Dạng Windows: aa-bb-cc-dd-ee-ff match = re.search( r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}" r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})", output ) else: # macOS / Linux: arp -n # macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..." # Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0" r = subprocess.run( ["arp", "-n", str(ip)], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=3 ) output = r.stdout.decode(errors="ignore") match = re.search( r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}" r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})", output ) if match: # Chuẩn hoá sang dấu ':' và chữ hoa mac = match.group(1).replace("-", ":").upper() return mac except Exception: pass return "N/A" def _ping_sweep(network, progress_cb=None): """Ping tất cả host trong network đồng thời. Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping. """ net = ipaddress.ip_network(network, strict=False) if net.num_addresses > 256: return [] is_win = sys.platform == "win32" hosts = list(net.hosts()) total = len(hosts) workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts)) done_count = [0] lock = threading.Lock() alive = [] def _ping_and_track(ip): ok = _ping_one(ip, is_win) with lock: done_count[0] += 1 current = done_count[0] if progress_cb: progress_cb(current, total) return (str(ip), ok) with ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(_ping_and_track, ip) for ip in hosts] for f in as_completed(futures): ip_str, ok = f.result() if ok: alive.append(ip_str) return alive def scan_network(network, progress_cb=None, stage_cb=None): """Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online). Flow: 1. Ping sweep — xác định thiết bị online 2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song) => ARP cache vừa được OS cập nhật sau ping, không bị stale. """ # ── Stage 1: Ping ── if stage_cb: stage_cb("ping") alive_ips = _ping_sweep(network, progress_cb) # ── Stage 2: MAC lookup ── if stage_cb: stage_cb("mac") results = [] if alive_ips: # Chạy song song để tra MAC nhanh hơn mac_workers = min(32, len(alive_ips)) macs = {} with ThreadPoolExecutor(max_workers=mac_workers) as executor: future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips} for future in as_completed(future_to_ip): ip = future_to_ip[future] try: macs[ip] = future.result() except Exception: macs[ip] = "N/A" for ip_str in alive_ips: results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")}) return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))