8 Commits

Author SHA1 Message Date
52c7b4b968 update doc 2026-03-12 23:55:26 +07:00
aa36758ec9 update scan ip with MAC, update UI show history 2026-03-12 23:51:43 +07:00
22e4436518 update README.md 2026-03-12 23:35:37 +07:00
aa6a114071 update default loadFW (SSH) 2026-03-12 23:21:21 +07:00
627197e29e fix bug scan ip 2026-03-10 18:55:03 +07:00
adf7253350 fix bug scan ip (win) 2026-03-10 18:10:43 +07:00
f5ee209726 fix bug scan ip 2026-03-10 17:54:56 +07:00
466dadf1c9 update UI 2026-03-09 20:49:13 +07:00
5 changed files with 365 additions and 191 deletions

View File

@@ -3,7 +3,7 @@
Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng loạt** cho các thiết bị OpenWrt trong mạng LAN.
Hỗ trợ nạp **thủ công** (chọn thiết bị → flash) và **tự động hóa** (scan → phát hiện → flash → retry).
> **Tech stack:** Python 3.9+ · PyQt6 · Paramiko/SCP · Scapy · Requests · PyInstaller
> **Tech stack:** Python 3.9+ · PyQt6 · Paramiko/SCP · Requests · PyInstaller
> **Phiên bản:** `1.2.0`
---
@@ -16,7 +16,7 @@ Mira_Firmware_Loader/
├── version.txt # Số phiên bản ứng dụng
├── requirements.txt # Danh sách dependencies
├── core/
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
│ ├── scanner.py # Quét thiết bị mạng (Ping sweep đa luồng)
│ ├── workers.py # ScanThread — chạy scanner trong background thread
│ ├── api_flash.py # Flash firmware qua LuCI HTTP API
│ ├── ssh_utils.py # SSH/SCP transport helpers dùng chung
@@ -48,8 +48,7 @@ Mira_Firmware_Loader/
### Yêu cầu
- Python **3.9+**
- Thư viện: `PyQt6`, `scapy`, `requests`, `paramiko`, `scp`, `pyinstaller` (để build)
- _Windows:_ Cài [Npcap](https://npcap.com/) để Scapy có thể gửi ARP broadcast (không bắt buộc — có fallback)
- Thư viện: `PyQt6`, `requests`, `paramiko`, `scp`, `pyinstaller` (để build)
### Khởi chạy nhanh
@@ -101,11 +100,11 @@ Output: `dist\Mira_Firmware_Loader.exe` — không cần cài Python trên máy
┌──────▼──────┐ ┌───────▼──────────┐ ┌───▼──────────────────┐
│ scanner.py │ │ Flash Workers │ │ AutoFlashWorker │
│ │ │ (thủ công) │ │ (tự động hóa) │
1. Ping │ │ │ │ │
Sweep │ │ NewFlashThread │ │ Phase 1: Scan LAN │
2. arp -a │ │ ├─ api_flash │ │ (tối đa 15 lần) │
3. Scapy │ │ └─ ssh_new_flash│ │ Phase 2: Flash │
ARP │ │ │ │ (auto-retry x3) │
│ Ping Sweep │ │ │ │ │
(Multithread│ │ NewFlashThread │ │ Phase 1: Scan LAN │
gọi OS │ │ ├─ api_flash │ │ (tối đa 15 lần) │
ping cmd) │ │ └─ ssh_new_flash│ │ Phase 2: Flash │
│ │ │ │ (auto-retry x3) │
└─────────────┘ │ UpdateFlash │ │ ├─ api_flash │
│ Thread │ │ └─ ssh_new_flash │
│ └─ ssh_update │ └──────────────────────┘
@@ -127,15 +126,14 @@ Output: `dist\Mira_Firmware_Loader.exe` — không cần cài Python trên máy
### 1. Quét mạng (Network Scan)
`scanner.py` ng chiến lược 3 lớp, đảm bảo phát hiện đầy đủ mà không cần quyền Root:
`scanner.py` sử dụng chiến lược **Ping trước → ARP sau**, đảm bảo vừa lấy được MAC vừa loại bỏ hoàn toàn lỗi thiết bị ảo do ARP cache cũ:
| Giai đoạn | Mô tả |
| -------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Ping Sweep** | Gửi ping đồng thời tới toàn bộ host trong dải `/24` (tất cả cùng lúc, không batching) để đánh thức thiết bị và điền ARP cache |
| **ARP Table** | Đọc `arp -a` bằng regex, hỗ trợ cả định dạng Windows (`cc-2d-...`) và macOS/Linux (`aa:bb:...`) |
| **Scapy ARP** | Chạy **song song** với ARP Table — gửi ARP broadcast để lấp khoảng trống. Yêu cầu Npcap (Windows) hoặc root (Linux); tự động bỏ qua nếu không khả dụng |
| **Ping Sweep** | Gửi ping đồng thời tới toàn bộ host trong dải mạng bằng tiến trình con (đa luồng). Nhằm xác đnh chính xác thiết bị nào đang thực sự cấp nguồn online. |
| **ARP Lookup** | Ngay sau khi ping, query ARP table hệ điều hành (song song) _chỉ cho các IP vừa ping thành công_, lấy MAC Address chuẩn xác trước khi cache cũ hết hạn. |
Kết quả được merge theo IP và sort tăng dần trước khi trả về UI.
Kết quả (IP và MAC) được sort tăng dần theo IP trước khi trả về UI.
### 2. Bảng thiết bị (Device Table)
@@ -281,7 +279,7 @@ Kết quả nạp được lưu ở 2 cấp:
## 🔒 Lưu ý bảo mật & Quyền
- **Scapy (chế độ sâu):** Cần Npcap (Windows) hoặc `sudo` (macOS/Linux). App vẫn hoạt động mà không cần quyền Admin nhờ fallback Ping Sweep + `arp -a`.
- **CREATE_NO_WINDOW:** Khi gọi subprocess (`ping`, `arp`), ứng dụng dùng flag `CREATE_NO_WINDOW` trên Windows để ngăn cửa sổ console hiện ra.
- **Quyền hệ thống:** Quá trình quét của `scanner.py` chỉ chạy lệnh ping từ hệ điều hành, vì vậy ứng dụng có thể chạy hoàn toàn **không cần** quyền Admin (trên Windows) / Root (trên macOS/Linux).
- **CREATE_NO_WINDOW:** Khi gọi subprocess lệnh `ping`, ứng dụng dùng flag `CREATE_NO_WINDOW` trên Windows để ngăn cửa sổ console command prompt (cmd) liên tục hiện lên màn hình.
- **HTTP thuần:** Firmware được upload qua HTTP (không HTTPS). Chỉ dùng trong mạng LAN nội bộ, không nên dùng trên mạng mở.
- **Credentials cố định:** SSH credentials (`root`/`admin123a`) được hardcode trong `flash_update_worker.py` và truyền từ `main.py`, không hiển thị trên UI.

View File

@@ -1,155 +1,172 @@
import subprocess
import re
import subprocess
import sys
import time
import ipaddress
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# Windows: prevent subprocess from opening visible console windows
_NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
# Concurrent ping workers per platform
_MAX_WORKERS_WIN = 50
_MAX_WORKERS_OTHER = 64
def _ping_one(ip, is_win):
"""Ping a single IP to populate ARP table."""
"""Ping a single IP. Returns True if host responds."""
try:
if is_win:
subprocess.run(
["ping", "-n", "1", "-w", "300", str(ip)], # 300ms (was 600ms)
stdout=subprocess.DEVNULL,
# Capture stdout to check for TTL= — more reliable than returncode
# on Windows (returncode can be 0 even for "Destination unreachable")
r = subprocess.run(
["ping", "-n", "1", "-w", "500", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=2,
timeout=3,
creationflags=_NO_WINDOW
)
return r.returncode == 0 and b"TTL=" in r.stdout
elif sys.platform == "darwin":
subprocess.run(
["ping", "-c", "1", "-W", "500", str(ip)], # 500ms — macOS: -W unit là ms
r = subprocess.run(
["ping", "-c", "1", "-W", "300", str(ip)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2
)
else:
subprocess.run(
["ping", "-c", "1", "-W", "1", str(ip)], # 1s — Linux: -W unit là giây
r = subprocess.run(
["ping", "-c", "1", "-W", "1", str(ip)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2
)
return r.returncode == 0
except Exception:
return False
def _get_mac_from_arp(ip):
"""
Lấy MAC address của một IP qua ARP table.
Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP
cache vừa được OS cập nhật với thông tin mới nhất (không bị stale).
Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được.
"""
try:
if sys.platform == "win32":
# arp -a <ip> → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic"
r = subprocess.run(
["arp", "-a", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3,
creationflags=_NO_WINDOW
)
output = r.stdout.decode(errors="ignore")
# Dạng Windows: aa-bb-cc-dd-ee-ff
match = re.search(
r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}"
r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})",
output
)
else:
# macOS / Linux: arp -n <ip>
# macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..."
# Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0"
r = subprocess.run(
["arp", "-n", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3
)
output = r.stdout.decode(errors="ignore")
match = re.search(
r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}"
r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})",
output
)
if match:
# Chuẩn hoá sang dấu ':' và chữ hoa
mac = match.group(1).replace("-", ":").upper()
return mac
except Exception:
pass
return "N/A"
def _ping_sweep(network, progress_cb=None):
"""Ping tất cả host trong network đồng thời để điền ARP cache.
Gọi progress_cb(done, total) sau mỗi ping nếu được cung cấp.
"""Ping tất cả host trong network đồng thời.
Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
"""
net = ipaddress.ip_network(network, strict=False)
# Chỉ ping sweep cho /24 hoặc nhỏ hơn
if net.num_addresses > 256:
return
return []
is_win = sys.platform == "win32"
hosts = list(net.hosts())
total = len(hosts)
workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts))
done_count = [0]
lock = threading.Lock()
alive = []
def _ping_and_track(ip):
_ping_one(ip, is_win)
done_count[0] += 1
ok = _ping_one(ip, is_win)
with lock:
done_count[0] += 1
current = done_count[0]
if progress_cb:
progress_cb(done_count[0], total)
progress_cb(current, total)
return (str(ip), ok)
# Tất cả host cùng lúc — loại bỏ overhead batching (was max_workers=100)
with ThreadPoolExecutor(max_workers=len(hosts)) as executor:
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
for f in as_completed(futures):
pass
ip_str, ok = f.result()
if ok:
alive.append(ip_str)
return alive
def scan_network(network, progress_cb=None, stage_cb=None):
"""Scan network: ping sweep → ARP table + Scapy song song."""
# Phase 1: Ping sweep
"""Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online).
Flow:
1. Ping sweep — xác định thiết bị online
2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song)
=> ARP cache vừa được OS cập nhật sau ping, không bị stale.
"""
# ── Stage 1: Ping ──
if stage_cb:
stage_cb("ping")
_ping_sweep(network, progress_cb)
time.sleep(0.3) # Giảm từ 1s xuống 0.3s
alive_ips = _ping_sweep(network, progress_cb)
# Phase 2 + 3: ARP table và Scapy chạy song song
# ── Stage 2: MAC lookup ──
if stage_cb:
stage_cb("arp")
stage_cb("mac")
def _collect_arp():
result = {}
try:
output = subprocess.check_output(
["arp", "-a"], text=True, creationflags=_NO_WINDOW
)
net = ipaddress.ip_network(network, strict=False)
if sys.platform == "win32":
pattern = re.compile(
r"(\d+\.\d+\.\d+\.\d+)\s+"
r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-"
r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+"
r"(dynamic|static)",
re.IGNORECASE
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str = m.group(1)
mac = m.group(2).replace("-", ":")
if mac.upper() != "FF:FF:FF:FF:FF:FF":
try:
if ipaddress.ip_address(ip_str) in net:
result[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
else:
pattern = re.compile(
r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)"
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str, mac = m.group(1), m.group(2)
if mac.lower() not in ("(incomplete)", "ff:ff:ff:ff:ff:ff"):
try:
if ipaddress.ip_address(ip_str) in net:
result[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
except Exception:
pass
return result
results = []
if alive_ips:
# Chạy song song để tra MAC nhanh hơn
mac_workers = min(32, len(alive_ips))
macs = {}
with ThreadPoolExecutor(max_workers=mac_workers) as executor:
future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try:
macs[ip] = future.result()
except Exception:
macs[ip] = "N/A"
def _collect_scapy():
result = {}
try:
import io
_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
from scapy.all import ARP, Ether, srp
finally:
sys.stderr = _stderr
for ip_str in alive_ips:
results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")})
arp = ARP(pdst=str(network))
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
raw = srp(ether / arp, timeout=1, verbose=0)[0] # Giảm từ 2s xuống 1s
for _, received in raw:
result[received.psrc] = {"ip": received.psrc, "mac": received.hwsrc}
except Exception:
pass
return result
# Chạy ARP read và Scapy đồng thời
with ThreadPoolExecutor(max_workers=2) as executor:
f_arp = executor.submit(_collect_arp)
f_scapy = executor.submit(_collect_scapy)
seen = f_arp.result()
# Scapy bổ sung những IP chưa có trong ARP table
for ip, dev in f_scapy.result().items():
if ip not in seen:
seen[ip] = dev
return sorted(seen.values(), key=lambda d: ipaddress.ip_address(d["ip"]))
return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))

View File

@@ -2,37 +2,50 @@
## 1. Tổng quan
Thành phần quét IP trong `scanner.py` dò tìm và liệt kê tất cả thiết bị đang hoạt động trên một dải mạng (ví dụ: `192.168.1.0/24`), trả về danh sách chứa **IP****MAC Address** của từng thiết bị.
Thành phần quét IP trong `scanner.py` dò tìm và liệt kê tất cả thiết bị **đang thực sự online** trên một dải mạng (ví dụ: `192.168.1.0/24`), trả về danh sách chứa **IP****MAC Address** của từng thiết bị.
Để đảm bảo tỷ lệ phát hiện cao trên mọi hệ điều hành (Windows, macOS, Linux), scanner kết hợp 3 giai đoạn:
Scanner hoạt động theo cơ chế 2 bước (Ping trước → ARP sau):
1. **ICMP Ping Sweep:** Chỉ những IP có phản hồi ping mới được ghi nhận là online.
2. **ARP Lookup:** Ngay sau khi IP phản hồi ping, OS tự động cập nhật ARP cache cho IP đó. Scanner lập tức query ARP table để lấy MAC chính xác nhất.
1. **Ping Sweep** — đánh thức thiết bị, điền ARP cache
2. **Đọc bảng ARP hệ điều hành** (`arp -a`) — không cần quyền Admin
3. **Scapy ARP broadcast** — bổ sung thiết bị chặn ICMP
Bước 2 và 3 chạy **song song** để giảm tổng thời gian scan.
> **Giải quyết vấn đề ARP cache cũ (stale entries):** Khác với phương pháp ARP scan truyền thống (`arp -a` liệt kê toàn mạng) thường dính thiết bị đã ngắt kết nối do bị cache, cách làm này **chỉ query ARP cho những IP vừa pass qua bài test ping**. Thiết bị offline sẽ rớt ở bước ping và không bao giờ được đưa vào danh sách.
---
## 2. Luồng hoạt động chính (Hàm `scan_network`)
```
scan_network(network)
├── stage_cb("ping") ← Thông báo UI bắt đầu quét
├── _ping_sweep(network) ← Ping đồng thời toàn bộ host
│ │
│ ├── _ping_one(ip) × N ← Mỗi host 1 thread
│ │
│ └── return [alive IPs]
├── stage_cb("mac") ← Thông báo UI bắt đầu lấy MAC
├── _get_mac_from_arp(ip) × K ← Tra MAC song song cho K IP alive
└── return [{"ip": ..., "mac": "AA:BB:CC:..."}, ...] ← Sorted by IP
```
**Bước 1 — Ping Sweep**
- Gọi `_ping_sweep(network)`: gửi ICMP Echo Request đồng thời tới **toàn bộ host** trong dải mạng.
- Mỗi thiết bị phản hồi sẽ khiến hệ điều hành **tự ghi MAC vào ARP Cache**.
- Sau khi sweep xong, chờ `0.3s` để OS kịp finalize ARP cache (giảm từ 1s trước đây).
- Chỉ các IP có `returncode == 0` (phản hồi thành công) mới được đưa vào danh sách `alive_ips`.
**Bước 2 + 3 — ARP Table & Scapy (song song)**
**Bước 2 — Địa chỉ MAC (ARP Lookup)**
- Hai hàm `_collect_arp()``_collect_scapy()` được submit vào `ThreadPoolExecutor(max_workers=2)` và chạy đồng thời:
- `_collect_arp()`: đọc `arp -a`, parse regex lấy IP + MAC.
- `_collect_scapy()`: gửi ARP broadcast, nhận phản hồi trực tiếp từ thiết bị.
- Kết quả merge theo IP: ARP table làm nền, Scapy bổ sung IP còn thiếu.
- Từ danh sách `alive_ips`, tạo ThreadPoolExecutor (max_workers=32) để gọi `_get_mac_from_arp(ip)` đồng thời.
- Trích xuất MAC address bằng pattern matching chéo nền tảng.
**Bước 4 — Trả về kết quả**
**Bước 3 — Trả về kết quả**
- Danh sách sort tăng dần theo IP:
`[{"ip": "192.168.1.2", "mac": "aa:bb:cc:dd:ee:ff"}, ...]`
`[{"ip": "192.168.1.2", "mac": "AA:BB:CC:DD:EE:FF"}, ...]`
---
@@ -40,48 +53,53 @@ Bước 2 và 3 chạy **song song** để giảm tổng thời gian scan.
### `_ping_one(ip, is_win)`
Ping một IP đơn lẻ với timeout tối ưu theo nền tảng:
Ping một IP đơn lẻ, trả về `True` nếu host phản hồi, `False` nếu không.
| OS | Lệnh | Timeout wait | Timeout process |
| ------- | ------------------ | ----------------- | --------------- |
| Windows | `ping -n 1 -w 300` | 300ms | 2s |
| macOS | `ping -c 1 -W 500` | 500ms (đơn vị ms) | 2s |
| Linux | `ping -c 1 -W 1` | 1s (đơn vị giây) | 2s |
Timeout tối ưu theo nền tảng:
> macOS và Linux dùng cùng flag `-W` nhưng đơn vị khác nhau — được xử lý tách biệt theo `sys.platform`.
| OS | Lệnh | Timeout wait | Timeout process |
| ------- | ------------------ | ------------ | --------------- |
| Windows | `ping -n 1 -w 300` | 300ms | 2s |
| macOS | `ping -c 1 -W 300` | 300ms | 2s |
| Linux | `ping -c 1 -W 1` | 1s | 2s |
> macOS và Linux dùng cùng flag `-W` nhưng đơn vị khác nhau (macOS: ms, Linux: giây) — được xử lý tách biệt theo `sys.platform`.
Windows sử dụng `CREATE_NO_WINDOW` flag để tránh mở cửa sổ console cho mỗi subprocess.
### `_ping_sweep(network, progress_cb)`
- Tạo `ThreadPoolExecutor(max_workers=len(hosts))`**toàn bộ host ping đồng thời**, không batching.
- Giới hạn an toàn: chỉ chạy với subnet `/24` trở xuống (`num_addresses <= 256`).
- Trả về danh sách IP (string) đã phản hồi thành công.
- Gọi `progress_cb(done, total)` sau mỗi ping để UI cập nhật thanh tiến độ.
### `_collect_arp()` (nội bộ trong `scan_network`)
### `_get_mac_from_arp(ip)`
Đọc và parse output `arp -a`, hỗ trợ đa nền tảng:
- Gọi lệnh hệ điều hành để đọc ARP cache cho IP cụ thể:
- **Windows**: `arp -a <ip>`
- **macOS / Linux**: `arp -n <ip>`
- Sử dụng Regex để parse MAC address và trả về dưới format chuẩn `AA:BB:CC:DD:EE:FF`.
- Nếu không tìm thấy, fallback thành `"N/A"`.
- **Windows:** Regex nhận dạng dạng `cc-2d-21-a5-85-b0 dynamic`, chuẩn hóa MAC sang `cc:2d:21:a5:85:b0`.
- **macOS/Linux:** Regex nhận dạng dạng `(192.168.1.1) at aa:bb:cc:dd:ee:ff`, bỏ qua entry `(incomplete)`.
### `scan_network(network, progress_cb, stage_cb)`
### `_collect_scapy()` (nội bộ trong `scan_network`)
- Gửi ARP `Who-has` broadcast (`Ether/ARP` qua `srp`) với **timeout 1s** (giảm từ 2s).
- Stderr bị redirect tạm thời khi import scapy để tránh spam log ra console.
- Tự động bỏ qua nếu scapy không khả dụng (không có Npcap / không có quyền root).
- Entry point chính.
- `stage_cb("ping")``stage_cb("mac")` thông báo UI giai đoạn hiện tại.
- Trả về `list[dict]` với format `{"ip": str, "mac": str}`, sorted theo IP tăng dần.
---
## 4. So sánh hiệu năng (trước và sau tối ưu)
## 4. Hiệu năng
| Thay đổi | Trước | Sau |
| --------------------------- | --------------------- | ------------------------------------ |
| Ping workers | 100 (batching ~3 đợt) | `len(hosts)` (~254, tất cả cùng lúc) |
| Ping timeout — Windows | 600ms | 300ms |
| Ping timeout — macOS | 1ms (sai đơn vị) | 500ms |
| Sleep sau ping sweep | 1.0s | 0.3s |
| ARP + Scapy | Tuần tự | **Song song** |
| Scapy timeout | 2s | 1s |
| **Tổng thời gian scan /24** | ~57s | **~1.52s** |
| Thông số | Giá trị |
| --------------------------- | ------------------------------------ |
| Ping workers | `len(hosts)` (~254, tất cả cùng lúc) |
| Ping timeout — Windows | 300ms |
| Ping timeout — macOS | 300ms |
| Ping timeout — Linux | 1s |
| Process timeout | 2s |
| **Tổng thời gian scan /24** | **~12s** |
---
@@ -89,13 +107,14 @@ Ping một IP đơn lẻ với timeout tối ưu theo nền tảng:
**Ưu điểm:**
- Tỷ lệ phát hiện thiết bị cao nhờ kết hợp 3 lớp.
- Không cần quyền Admin/Root — ping sweep + ARP table vẫn tìm được ~90% thiết bị.
- Tương thích đa nền tảng (Windows/macOS/Linux) qua xử lý riêng từng OS.
- ARP table và Scapy chạy song song → không cộng dồn thời gian chờ.
- **Có đầy đủ IP và MAC**, rất hữu ích cho log và tracking lịch sử thiết bị nạp FW.
- **Không bị dính ARP cache cũ**: Do chỉ lấy MAC của các IP vừa ping thành công.
- Không cần quyền Admin/Root.
- Không phụ thuộc thư viện bên ngoài (không cần Npcap / Scapy phức tạp).
- Tương thích đa nền tảng (Windows/macOS/Linux).
- Cực nhanh nhờ cơ chế full-parallel.
**Nhược điểm:**
- Vẫn cần `sleep(0.3s)` để OS kịp ghi ARP cache sau ping sweep.
- Thiết bị tắt hoàn toàn cả ICMP lẫn ARP sẽ không bị phát hiện.
- Spawn ~254 process `ping` đồng thời trên Windows có overhead cao hơn Unix do Windows tạo process chậm hơn.
- Thiết bị cố tình chặn gói tin ICMP (tắt ping) sẽ không bị phát hiện.
- Ping 254 thiết bị cùng lúc bằng tiến trình con (`subprocess`) trên Windows có overhead hệ thống cao hơn Unix.

190
main.py
View File

@@ -315,8 +315,8 @@ class App(QWidget):
mc_layout.addWidget(meth_lbl)
self.method_combo = QComboBox()
self.method_combo.addItem("🌐 API (LuCI)", "api")
self.method_combo.addItem("💻 SSH", "ssh")
self.method_combo.addItem("🌐 API (LuCI)", "api")
self.method_combo.setMinimumWidth(140)
self.method_combo.setFixedHeight(28)
self.method_combo.setStyleSheet("""
@@ -457,11 +457,14 @@ class App(QWidget):
lbl.setStyleSheet("color: #cdd6f4; font-weight: bold;")
return lbl
# IPs permanently hidden from the device list (never shown in UI)
_HIDDEN_IPS = {"192.168.11.102"}
def _get_filtered_devices(self):
"""Return devices filtered based on show_all checkbox."""
if self.show_all_cb.isChecked():
return list(self.all_devices)
excluded = {self.local_ip, self.gateway_ip}
return [d for d in self.all_devices if d["ip"] not in self._HIDDEN_IPS]
excluded = {self.local_ip, self.gateway_ip} | self._HIDDEN_IPS
return [d for d in self.all_devices if d["ip"] not in excluded]
def _refresh_table(self):
@@ -532,17 +535,77 @@ class App(QWidget):
item.setCheckState(Qt.CheckState.Unchecked)
def _show_history(self):
"""Show history of flashed devices in this session."""
"""Show history of flashed devices in this session using a table dialog."""
if not self.flashed_macs:
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
else:
lines = []
for mac in sorted(self.flashed_macs.keys()):
ip, _, result, ts = self.flashed_macs[mac]
icon = "" if result.startswith("DONE") else ""
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}")
msg = f"Flash History ({len(self.flashed_macs)} device(s)):\n\n" + "\n".join(lines)
QMessageBox.information(self, "Flash History", msg)
return
from PyQt6.QtWidgets import QDialog
dialog = QDialog(self)
dialog.setWindowTitle("Flash History")
dialog.resize(600, 400)
dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
layout = QVBoxLayout(dialog)
layout.setContentsMargins(10, 10, 10, 10)
table = QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
table.setAlternatingRowColors(True)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
header = table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 80)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
table.verticalHeader().setVisible(False)
table.setRowCount(len(self.flashed_macs))
success_count = 0
fail_count = 0
for row, mac in enumerate(sorted(self.flashed_macs.keys())):
ip, _, result, ts = self.flashed_macs[mac]
ok = result.startswith("DONE")
if ok:
success_count += 1
else:
fail_count += 1
time_item = QTableWidgetItem(ts)
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, 0, time_item)
table.setItem(row, 1, QTableWidgetItem(ip))
table.setItem(row, 2, QTableWidgetItem(mac))
res_item = QTableWidgetItem(f"{result}" if ok else f"{result}")
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
table.setItem(row, 3, res_item)
layout.addWidget(table)
summary_lbl = QLabel(f"Total: {len(self.flashed_macs)} | ✅ {success_count} | ❌ {fail_count}")
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
layout.addWidget(summary_lbl)
btn_close = QPushButton("Close")
btn_close.setFixedHeight(30)
btn_close.setFixedWidth(100)
btn_close.clicked.connect(dialog.accept)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(btn_close)
layout.addLayout(btn_layout)
dialog.exec()
# ── Actions ──
@@ -611,6 +674,11 @@ class App(QWidget):
self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m")
self.scan_status.setText("⏳ Pinging all hosts...")
self.scan_status.setStyleSheet("color: #f9e2af;")
elif stage == "mac":
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Reading MAC addresses...")
self.scan_status.setText("⏳ Reading MAC addresses from ARP...")
self.scan_status.setStyleSheet("color: #f9e2af;")
elif stage == "arp":
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Reading ARP cache...")
@@ -772,7 +840,6 @@ class App(QWidget):
row = self._flash_row_map.get(index, index)
mac_item = self.table.item(row, 2)
ip_item = self.table.item(row, 1)
import datetime
now_str = datetime.datetime.now().strftime("%H:%M:%S")
mac_str = mac_item.text().strip() if mac_item else ""
ip_str = ip_item.text().strip() if ip_item else ""
@@ -933,10 +1000,22 @@ class App(QWidget):
meth_lbl.setStyleSheet("font-weight: bold; font-size: 12px;")
row2.addWidget(meth_lbl)
self.auto_method_combo = QComboBox()
self.auto_method_combo.addItem("🌐 API (LuCI)", "api")
self.auto_method_combo.addItem("💻 SSH", "ssh")
self.auto_method_combo.setFixedHeight(28)
self.auto_method_combo.addItem("🌐 API (LuCI)", "api")
self.auto_method_combo.setFixedHeight(34)
self.auto_method_combo.setMinimumWidth(140)
self.auto_method_combo.setStyleSheet("""
QComboBox {
background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 6px;
padding: 2px 8px; color: #e2e8f0; font-size: 13px; font-weight: bold;
}
QComboBox:hover { border-color: #7eb8f7; }
QComboBox::drop-down { border: none; width: 20px; }
QComboBox QAbstractItemView {
background-color: #2d3352; color: #e2e8f0;
border: 1px solid #3d4a6b; selection-background-color: #3d4a6b;
}
""")
row2.addWidget(self.auto_method_combo)
sep3 = QLabel("")
@@ -1072,14 +1151,13 @@ class App(QWidget):
self.auto_log_content.setWordWrap(True)
self.auto_log_content.setAlignment(Qt.AlignmentFlag.AlignTop)
self.auto_log_content.setStyleSheet(
"color: #cdd6f4; font-size: 10px; font-family: 'SF Mono', 'Menlo', monospace;"
"padding: 4px; background-color: #11121d; border-radius: 4px;"
"color: #cdd6f4; font-size: 12px; font-family: 'Consolas', 'Courier New', monospace;"
"padding: 6px; background-color: #11121d; border-radius: 4px;"
)
self.auto_log_content.setTextFormat(Qt.TextFormat.PlainText)
self.auto_log_area.setWidget(self.auto_log_content)
self.auto_log_area.setWidgetResizable(True)
self.auto_log_area.setMinimumHeight(120)
self.auto_log_area.setMaximumHeight(280)
self.auto_log_area.setMinimumHeight(150)
self.auto_log_area.setStyleSheet("QScrollArea { border: 1px solid #2d3748; border-radius: 4px; background-color: #11121d; }")
log_layout.addWidget(self.auto_log_area)
@@ -1089,15 +1167,76 @@ class App(QWidget):
# ── Auto Flash Actions ──
def _show_auto_history(self):
"""Show history of auto-flashed devices using a table dialog."""
if not self._auto_history:
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
return
lines = []
for ip, mac, result, ts in self._auto_history:
icon = "" if result.startswith("DONE") else ""
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}")
msg = f"Flash History ({len(self._auto_history)} device(s)):\n\n" + "\n".join(lines)
QMessageBox.information(self, "Flash History", msg)
from PyQt6.QtWidgets import QDialog
dialog = QDialog(self)
dialog.setWindowTitle("Auto Flash History")
dialog.resize(600, 400)
dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
layout = QVBoxLayout(dialog)
layout.setContentsMargins(10, 10, 10, 10)
table = QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
table.setAlternatingRowColors(True)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
header = table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 80)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
table.verticalHeader().setVisible(False)
table.setRowCount(len(self._auto_history))
success_count = 0
fail_count = 0
for row, (ip, mac, result, ts) in enumerate(self._auto_history):
ok = result.startswith("DONE")
if ok:
success_count += 1
else:
fail_count += 1
time_item = QTableWidgetItem(ts)
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, 0, time_item)
table.setItem(row, 1, QTableWidgetItem(ip))
table.setItem(row, 2, QTableWidgetItem(mac))
res_item = QTableWidgetItem(f"{result}" if ok else f"{result}")
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
table.setItem(row, 3, res_item)
layout.addWidget(table)
summary_lbl = QLabel(f"Total: {len(self._auto_history)} | ✅ {success_count} | ❌ {fail_count}")
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
layout.addWidget(summary_lbl)
btn_close = QPushButton("Close")
btn_close.setFixedHeight(30)
btn_close.setFixedWidth(100)
btn_close.clicked.connect(dialog.accept)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(btn_close)
layout.addLayout(btn_layout)
dialog.exec()
def _auto_select_firmware(self):
file, _ = QFileDialog.getOpenFileName(
@@ -1255,6 +1394,7 @@ class App(QWidget):
self._auto_fail_count += 1
self.auto_result_table.setItem(row, 3, item)
self._auto_history.append((ip, mac.upper(), result, now_str))
total = len(self._auto_device_rows)
done = self._auto_success_count + self._auto_fail_count
self.auto_summary_label.setText(

View File

@@ -1 +1 @@
1.2.1
1.2.3