80 lines
2.4 KiB
Python
80 lines
2.4 KiB
Python
import subprocess
|
|
import sys
|
|
import ipaddress
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# Windows: prevent subprocess from opening visible console windows
|
|
_NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
|
|
|
|
|
|
def _ping_one(ip, is_win):
|
|
"""Ping a single IP. Returns True if host responds."""
|
|
try:
|
|
if is_win:
|
|
r = subprocess.run(
|
|
["ping", "-n", "1", "-w", "300", str(ip)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2,
|
|
creationflags=_NO_WINDOW
|
|
)
|
|
elif sys.platform == "darwin":
|
|
r = subprocess.run(
|
|
["ping", "-c", "1", "-W", "300", str(ip)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2
|
|
)
|
|
else:
|
|
r = subprocess.run(
|
|
["ping", "-c", "1", "-W", "1", str(ip)],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=2
|
|
)
|
|
return r.returncode == 0
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _ping_sweep(network, progress_cb=None):
|
|
"""Ping tất cả host trong network đồng thời.
|
|
Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
|
|
"""
|
|
net = ipaddress.ip_network(network, strict=False)
|
|
|
|
if net.num_addresses > 256:
|
|
return []
|
|
|
|
is_win = sys.platform == "win32"
|
|
hosts = list(net.hosts())
|
|
total = len(hosts)
|
|
done_count = [0]
|
|
alive = []
|
|
|
|
def _ping_and_track(ip):
|
|
ok = _ping_one(ip, is_win)
|
|
done_count[0] += 1
|
|
if progress_cb:
|
|
progress_cb(done_count[0], total)
|
|
return (str(ip), ok)
|
|
|
|
with ThreadPoolExecutor(max_workers=len(hosts)) as executor:
|
|
futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
|
|
for f in as_completed(futures):
|
|
ip_str, ok = f.result()
|
|
if ok:
|
|
alive.append(ip_str)
|
|
|
|
return alive
|
|
|
|
|
|
def scan_network(network, progress_cb=None, stage_cb=None):
|
|
"""Scan network: chỉ dùng ping để xác định thiết bị online."""
|
|
if stage_cb:
|
|
stage_cb("ping")
|
|
alive_ips = _ping_sweep(network, progress_cb)
|
|
|
|
results = [{"ip": ip_str, "mac": "N/A"} for ip_str in alive_ips]
|
|
return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))
|