Files
Mira_Firmware_Loader/core/scanner.py

173 lines
5.7 KiB
Python

import re
import subprocess
import sys
import ipaddress
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# Windows: prevent subprocess from opening visible console windows
_NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
# Concurrent ping workers per platform
_MAX_WORKERS_WIN = 50
_MAX_WORKERS_OTHER = 64
def _ping_one(ip, is_win):
"""Ping a single IP. Returns True if host responds."""
try:
if is_win:
# Capture stdout to check for TTL= — more reliable than returncode
# on Windows (returncode can be 0 even for "Destination unreachable")
r = subprocess.run(
["ping", "-n", "1", "-w", "500", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3,
creationflags=_NO_WINDOW
)
return r.returncode == 0 and b"TTL=" in r.stdout
elif sys.platform == "darwin":
r = subprocess.run(
["ping", "-c", "1", "-W", "300", str(ip)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2
)
else:
r = subprocess.run(
["ping", "-c", "1", "-W", "1", str(ip)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2
)
return r.returncode == 0
except Exception:
return False
def _get_mac_from_arp(ip):
"""
Lấy MAC address của một IP qua ARP table.
Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP
cache vừa được OS cập nhật với thông tin mới nhất (không bị stale).
Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được.
"""
try:
if sys.platform == "win32":
# arp -a <ip> → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic"
r = subprocess.run(
["arp", "-a", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3,
creationflags=_NO_WINDOW
)
output = r.stdout.decode(errors="ignore")
# Dạng Windows: aa-bb-cc-dd-ee-ff
match = re.search(
r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}"
r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})",
output
)
else:
# macOS / Linux: arp -n <ip>
# macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..."
# Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0"
r = subprocess.run(
["arp", "-n", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3
)
output = r.stdout.decode(errors="ignore")
match = re.search(
r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}"
r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})",
output
)
if match:
# Chuẩn hoá sang dấu ':' và chữ hoa
mac = match.group(1).replace("-", ":").upper()
return mac
except Exception:
pass
return "N/A"
def _ping_sweep(network, progress_cb=None):
"""Ping tất cả host trong network đồng thời.
Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
"""
net = ipaddress.ip_network(network, strict=False)
if net.num_addresses > 256:
return []
is_win = sys.platform == "win32"
hosts = list(net.hosts())
total = len(hosts)
workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts))
done_count = [0]
lock = threading.Lock()
alive = []
def _ping_and_track(ip):
ok = _ping_one(ip, is_win)
with lock:
done_count[0] += 1
current = done_count[0]
if progress_cb:
progress_cb(current, total)
return (str(ip), ok)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
for f in as_completed(futures):
ip_str, ok = f.result()
if ok:
alive.append(ip_str)
return alive
def scan_network(network, progress_cb=None, stage_cb=None):
"""Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online).
Flow:
1. Ping sweep — xác định thiết bị online
2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song)
=> ARP cache vừa được OS cập nhật sau ping, không bị stale.
"""
# ── Stage 1: Ping ──
if stage_cb:
stage_cb("ping")
alive_ips = _ping_sweep(network, progress_cb)
# ── Stage 2: MAC lookup ──
if stage_cb:
stage_cb("mac")
results = []
if alive_ips:
# Chạy song song để tra MAC nhanh hơn
mac_workers = min(32, len(alive_ips))
macs = {}
with ThreadPoolExecutor(max_workers=mac_workers) as executor:
future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try:
macs[ip] = future.result()
except Exception:
macs[ip] = "N/A"
for ip_str in alive_ips:
results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")})
return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))