8 Commits

Author SHA1 Message Date
52c7b4b968 update doc 2026-03-12 23:55:26 +07:00
aa36758ec9 update scan ip with MAC, update UI show history 2026-03-12 23:51:43 +07:00
22e4436518 update README.md 2026-03-12 23:35:37 +07:00
aa6a114071 update default loadFW (SSH) 2026-03-12 23:21:21 +07:00
627197e29e fix bug scan ip 2026-03-10 18:55:03 +07:00
adf7253350 fix bug scan ip (win) 2026-03-10 18:10:43 +07:00
f5ee209726 fix bug scan ip 2026-03-10 17:54:56 +07:00
466dadf1c9 update UI 2026-03-09 20:49:13 +07:00
5 changed files with 365 additions and 191 deletions

View File

@@ -3,7 +3,7 @@
Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng loạt** cho các thiết bị OpenWrt trong mạng LAN. Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng loạt** cho các thiết bị OpenWrt trong mạng LAN.
Hỗ trợ nạp **thủ công** (chọn thiết bị → flash) và **tự động hóa** (scan → phát hiện → flash → retry). Hỗ trợ nạp **thủ công** (chọn thiết bị → flash) và **tự động hóa** (scan → phát hiện → flash → retry).
> **Tech stack:** Python 3.9+ · PyQt6 · Paramiko/SCP · Scapy · Requests · PyInstaller > **Tech stack:** Python 3.9+ · PyQt6 · Paramiko/SCP · Requests · PyInstaller
> **Phiên bản:** `1.2.0` > **Phiên bản:** `1.2.0`
--- ---
@@ -16,7 +16,7 @@ Mira_Firmware_Loader/
├── version.txt # Số phiên bản ứng dụng ├── version.txt # Số phiên bản ứng dụng
├── requirements.txt # Danh sách dependencies ├── requirements.txt # Danh sách dependencies
├── core/ ├── core/
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy) │ ├── scanner.py # Quét thiết bị mạng (Ping sweep đa luồng)
│ ├── workers.py # ScanThread — chạy scanner trong background thread │ ├── workers.py # ScanThread — chạy scanner trong background thread
│ ├── api_flash.py # Flash firmware qua LuCI HTTP API │ ├── api_flash.py # Flash firmware qua LuCI HTTP API
│ ├── ssh_utils.py # SSH/SCP transport helpers dùng chung │ ├── ssh_utils.py # SSH/SCP transport helpers dùng chung
@@ -48,8 +48,7 @@ Mira_Firmware_Loader/
### Yêu cầu ### Yêu cầu
- Python **3.9+** - Python **3.9+**
- Thư viện: `PyQt6`, `scapy`, `requests`, `paramiko`, `scp`, `pyinstaller` (để build) - Thư viện: `PyQt6`, `requests`, `paramiko`, `scp`, `pyinstaller` (để build)
- _Windows:_ Cài [Npcap](https://npcap.com/) để Scapy có thể gửi ARP broadcast (không bắt buộc — có fallback)
### Khởi chạy nhanh ### Khởi chạy nhanh
@@ -101,11 +100,11 @@ Output: `dist\Mira_Firmware_Loader.exe` — không cần cài Python trên máy
┌──────▼──────┐ ┌───────▼──────────┐ ┌───▼──────────────────┐ ┌──────▼──────┐ ┌───────▼──────────┐ ┌───▼──────────────────┐
│ scanner.py │ │ Flash Workers │ │ AutoFlashWorker │ │ scanner.py │ │ Flash Workers │ │ AutoFlashWorker │
│ │ │ (thủ công) │ │ (tự động hóa) │ │ │ │ (thủ công) │ │ (tự động hóa) │
1. Ping │ │ │ │ │ │ Ping Sweep │ │ │ │ │
Sweep │ │ NewFlashThread │ │ Phase 1: Scan LAN │ (Multithread│ │ NewFlashThread │ │ Phase 1: Scan LAN │
2. arp -a │ │ ├─ api_flash │ │ (tối đa 15 lần) │ gọi OS │ │ ├─ api_flash │ │ (tối đa 15 lần) │
3. Scapy │ │ └─ ssh_new_flash│ │ Phase 2: Flash │ ping cmd) │ │ └─ ssh_new_flash│ │ Phase 2: Flash │
ARP │ │ │ │ (auto-retry x3) │ │ │ │ │ (auto-retry x3) │
└─────────────┘ │ UpdateFlash │ │ ├─ api_flash │ └─────────────┘ │ UpdateFlash │ │ ├─ api_flash │
│ Thread │ │ └─ ssh_new_flash │ │ Thread │ │ └─ ssh_new_flash │
│ └─ ssh_update │ └──────────────────────┘ │ └─ ssh_update │ └──────────────────────┘
@@ -127,15 +126,14 @@ Output: `dist\Mira_Firmware_Loader.exe` — không cần cài Python trên máy
### 1. Quét mạng (Network Scan) ### 1. Quét mạng (Network Scan)
`scanner.py` ng chiến lược 3 lớp, đảm bảo phát hiện đầy đủ mà không cần quyền Root: `scanner.py` sử dụng chiến lược **Ping trước → ARP sau**, đảm bảo vừa lấy được MAC vừa loại bỏ hoàn toàn lỗi thiết bị ảo do ARP cache cũ:
| Giai đoạn | Mô tả | | Giai đoạn | Mô tả |
| -------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | | -------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
| **Ping Sweep** | Gửi ping đồng thời tới toàn bộ host trong dải `/24` (tất cả cùng lúc, không batching) để đánh thức thiết bị và điền ARP cache | | **Ping Sweep** | Gửi ping đồng thời tới toàn bộ host trong dải mạng bằng tiến trình con (đa luồng). Nhằm xác đnh chính xác thiết bị nào đang thực sự cấp nguồn online. |
| **ARP Table** | Đọc `arp -a` bằng regex, hỗ trợ cả định dạng Windows (`cc-2d-...`) và macOS/Linux (`aa:bb:...`) | | **ARP Lookup** | Ngay sau khi ping, query ARP table hệ điều hành (song song) _chỉ cho các IP vừa ping thành công_, lấy MAC Address chuẩn xác trước khi cache cũ hết hạn. |
| **Scapy ARP** | Chạy **song song** với ARP Table — gửi ARP broadcast để lấp khoảng trống. Yêu cầu Npcap (Windows) hoặc root (Linux); tự động bỏ qua nếu không khả dụng |
Kết quả được merge theo IP và sort tăng dần trước khi trả về UI. Kết quả (IP và MAC) được sort tăng dần theo IP trước khi trả về UI.
### 2. Bảng thiết bị (Device Table) ### 2. Bảng thiết bị (Device Table)
@@ -281,7 +279,7 @@ Kết quả nạp được lưu ở 2 cấp:
## 🔒 Lưu ý bảo mật & Quyền ## 🔒 Lưu ý bảo mật & Quyền
- **Scapy (chế độ sâu):** Cần Npcap (Windows) hoặc `sudo` (macOS/Linux). App vẫn hoạt động mà không cần quyền Admin nhờ fallback Ping Sweep + `arp -a`. - **Quyền hệ thống:** Quá trình quét của `scanner.py` chỉ chạy lệnh ping từ hệ điều hành, vì vậy ứng dụng có thể chạy hoàn toàn **không cần** quyền Admin (trên Windows) / Root (trên macOS/Linux).
- **CREATE_NO_WINDOW:** Khi gọi subprocess (`ping`, `arp`), ứng dụng dùng flag `CREATE_NO_WINDOW` trên Windows để ngăn cửa sổ console hiện ra. - **CREATE_NO_WINDOW:** Khi gọi subprocess lệnh `ping`, ứng dụng dùng flag `CREATE_NO_WINDOW` trên Windows để ngăn cửa sổ console command prompt (cmd) liên tục hiện lên màn hình.
- **HTTP thuần:** Firmware được upload qua HTTP (không HTTPS). Chỉ dùng trong mạng LAN nội bộ, không nên dùng trên mạng mở. - **HTTP thuần:** Firmware được upload qua HTTP (không HTTPS). Chỉ dùng trong mạng LAN nội bộ, không nên dùng trên mạng mở.
- **Credentials cố định:** SSH credentials (`root`/`admin123a`) được hardcode trong `flash_update_worker.py` và truyền từ `main.py`, không hiển thị trên UI. - **Credentials cố định:** SSH credentials (`root`/`admin123a`) được hardcode trong `flash_update_worker.py` và truyền từ `main.py`, không hiển thị trên UI.

View File

@@ -1,155 +1,172 @@
import subprocess
import re import re
import subprocess
import sys import sys
import time
import ipaddress import ipaddress
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
# Windows: prevent subprocess from opening visible console windows # Windows: prevent subprocess from opening visible console windows
_NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
# Concurrent ping workers per platform
_MAX_WORKERS_WIN = 50
_MAX_WORKERS_OTHER = 64
def _ping_one(ip, is_win): def _ping_one(ip, is_win):
"""Ping a single IP to populate ARP table.""" """Ping a single IP. Returns True if host responds."""
try: try:
if is_win: if is_win:
subprocess.run( # Capture stdout to check for TTL= — more reliable than returncode
["ping", "-n", "1", "-w", "300", str(ip)], # 300ms (was 600ms) # on Windows (returncode can be 0 even for "Destination unreachable")
stdout=subprocess.DEVNULL, r = subprocess.run(
["ping", "-n", "1", "-w", "500", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=2, timeout=3,
creationflags=_NO_WINDOW creationflags=_NO_WINDOW
) )
return r.returncode == 0 and b"TTL=" in r.stdout
elif sys.platform == "darwin": elif sys.platform == "darwin":
subprocess.run( r = subprocess.run(
["ping", "-c", "1", "-W", "500", str(ip)], # 500ms — macOS: -W unit là ms ["ping", "-c", "1", "-W", "300", str(ip)],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=2 timeout=2
) )
else: else:
subprocess.run( r = subprocess.run(
["ping", "-c", "1", "-W", "1", str(ip)], # 1s — Linux: -W unit là giây ["ping", "-c", "1", "-W", "1", str(ip)],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=2 timeout=2
) )
return r.returncode == 0
except Exception:
return False
def _get_mac_from_arp(ip):
"""
Lấy MAC address của một IP qua ARP table.
Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP
cache vừa được OS cập nhật với thông tin mới nhất (không bị stale).
Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được.
"""
try:
if sys.platform == "win32":
# arp -a <ip> → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic"
r = subprocess.run(
["arp", "-a", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3,
creationflags=_NO_WINDOW
)
output = r.stdout.decode(errors="ignore")
# Dạng Windows: aa-bb-cc-dd-ee-ff
match = re.search(
r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}"
r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})",
output
)
else:
# macOS / Linux: arp -n <ip>
# macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..."
# Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0"
r = subprocess.run(
["arp", "-n", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3
)
output = r.stdout.decode(errors="ignore")
match = re.search(
r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}"
r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})",
output
)
if match:
# Chuẩn hoá sang dấu ':' và chữ hoa
mac = match.group(1).replace("-", ":").upper()
return mac
except Exception: except Exception:
pass pass
return "N/A"
def _ping_sweep(network, progress_cb=None): def _ping_sweep(network, progress_cb=None):
"""Ping tất cả host trong network đồng thời để điền ARP cache. """Ping tất cả host trong network đồng thời.
Gọi progress_cb(done, total) sau mỗi ping nếu được cung cấp. Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
""" """
net = ipaddress.ip_network(network, strict=False) net = ipaddress.ip_network(network, strict=False)
# Chỉ ping sweep cho /24 hoặc nhỏ hơn
if net.num_addresses > 256: if net.num_addresses > 256:
return return []
is_win = sys.platform == "win32" is_win = sys.platform == "win32"
hosts = list(net.hosts()) hosts = list(net.hosts())
total = len(hosts) total = len(hosts)
workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts))
done_count = [0] done_count = [0]
lock = threading.Lock()
alive = []
def _ping_and_track(ip): def _ping_and_track(ip):
_ping_one(ip, is_win) ok = _ping_one(ip, is_win)
with lock:
done_count[0] += 1 done_count[0] += 1
current = done_count[0]
if progress_cb: if progress_cb:
progress_cb(done_count[0], total) progress_cb(current, total)
return (str(ip), ok)
# Tất cả host cùng lúc — loại bỏ overhead batching (was max_workers=100) with ThreadPoolExecutor(max_workers=workers) as executor:
with ThreadPoolExecutor(max_workers=len(hosts)) as executor:
futures = [executor.submit(_ping_and_track, ip) for ip in hosts] futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
for f in as_completed(futures): for f in as_completed(futures):
pass ip_str, ok = f.result()
if ok:
alive.append(ip_str)
return alive
def scan_network(network, progress_cb=None, stage_cb=None): def scan_network(network, progress_cb=None, stage_cb=None):
"""Scan network: ping sweep → ARP table + Scapy song song.""" """Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online).
# Phase 1: Ping sweep
Flow:
1. Ping sweep — xác định thiết bị online
2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song)
=> ARP cache vừa được OS cập nhật sau ping, không bị stale.
"""
# ── Stage 1: Ping ──
if stage_cb: if stage_cb:
stage_cb("ping") stage_cb("ping")
_ping_sweep(network, progress_cb) alive_ips = _ping_sweep(network, progress_cb)
time.sleep(0.3) # Giảm từ 1s xuống 0.3s
# Phase 2 + 3: ARP table và Scapy chạy song song # ── Stage 2: MAC lookup ──
if stage_cb: if stage_cb:
stage_cb("arp") stage_cb("mac")
def _collect_arp(): results = []
result = {} if alive_ips:
# Chạy song song để tra MAC nhanh hơn
mac_workers = min(32, len(alive_ips))
macs = {}
with ThreadPoolExecutor(max_workers=mac_workers) as executor:
future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips}
for future in as_completed(future_to_ip):
ip = future_to_ip[future]
try: try:
output = subprocess.check_output( macs[ip] = future.result()
["arp", "-a"], text=True, creationflags=_NO_WINDOW
)
net = ipaddress.ip_network(network, strict=False)
if sys.platform == "win32":
pattern = re.compile(
r"(\d+\.\d+\.\d+\.\d+)\s+"
r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-"
r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+"
r"(dynamic|static)",
re.IGNORECASE
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str = m.group(1)
mac = m.group(2).replace("-", ":")
if mac.upper() != "FF:FF:FF:FF:FF:FF":
try:
if ipaddress.ip_address(ip_str) in net:
result[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
else:
pattern = re.compile(
r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)"
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str, mac = m.group(1), m.group(2)
if mac.lower() not in ("(incomplete)", "ff:ff:ff:ff:ff:ff"):
try:
if ipaddress.ip_address(ip_str) in net:
result[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
except Exception: except Exception:
pass macs[ip] = "N/A"
return result
def _collect_scapy(): for ip_str in alive_ips:
result = {} results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")})
try:
import io
_stderr = sys.stderr
sys.stderr = io.StringIO()
try:
from scapy.all import ARP, Ether, srp
finally:
sys.stderr = _stderr
arp = ARP(pdst=str(network)) return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
raw = srp(ether / arp, timeout=1, verbose=0)[0] # Giảm từ 2s xuống 1s
for _, received in raw:
result[received.psrc] = {"ip": received.psrc, "mac": received.hwsrc}
except Exception:
pass
return result
# Chạy ARP read và Scapy đồng thời
with ThreadPoolExecutor(max_workers=2) as executor:
f_arp = executor.submit(_collect_arp)
f_scapy = executor.submit(_collect_scapy)
seen = f_arp.result()
# Scapy bổ sung những IP chưa có trong ARP table
for ip, dev in f_scapy.result().items():
if ip not in seen:
seen[ip] = dev
return sorted(seen.values(), key=lambda d: ipaddress.ip_address(d["ip"]))

View File

@@ -2,37 +2,50 @@
## 1. Tổng quan ## 1. Tổng quan
Thành phần quét IP trong `scanner.py` dò tìm và liệt kê tất cả thiết bị đang hoạt động trên một dải mạng (ví dụ: `192.168.1.0/24`), trả về danh sách chứa **IP****MAC Address** của từng thiết bị. Thành phần quét IP trong `scanner.py` dò tìm và liệt kê tất cả thiết bị **đang thực sự online** trên một dải mạng (ví dụ: `192.168.1.0/24`), trả về danh sách chứa **IP****MAC Address** của từng thiết bị.
Để đảm bảo tỷ lệ phát hiện cao trên mọi hệ điều hành (Windows, macOS, Linux), scanner kết hợp 3 giai đoạn: Scanner hoạt động theo cơ chế 2 bước (Ping trước → ARP sau):
1. **ICMP Ping Sweep:** Chỉ những IP có phản hồi ping mới được ghi nhận là online.
2. **ARP Lookup:** Ngay sau khi IP phản hồi ping, OS tự động cập nhật ARP cache cho IP đó. Scanner lập tức query ARP table để lấy MAC chính xác nhất.
1. **Ping Sweep** — đánh thức thiết bị, điền ARP cache > **Giải quyết vấn đề ARP cache cũ (stale entries):** Khác với phương pháp ARP scan truyền thống (`arp -a` liệt kê toàn mạng) thường dính thiết bị đã ngắt kết nối do bị cache, cách làm này **chỉ query ARP cho những IP vừa pass qua bài test ping**. Thiết bị offline sẽ rớt ở bước ping và không bao giờ được đưa vào danh sách.
2. **Đọc bảng ARP hệ điều hành** (`arp -a`) — không cần quyền Admin
3. **Scapy ARP broadcast** — bổ sung thiết bị chặn ICMP
Bước 2 và 3 chạy **song song** để giảm tổng thời gian scan.
--- ---
## 2. Luồng hoạt động chính (Hàm `scan_network`) ## 2. Luồng hoạt động chính (Hàm `scan_network`)
```
scan_network(network)
├── stage_cb("ping") ← Thông báo UI bắt đầu quét
├── _ping_sweep(network) ← Ping đồng thời toàn bộ host
│ │
│ ├── _ping_one(ip) × N ← Mỗi host 1 thread
│ │
│ └── return [alive IPs]
├── stage_cb("mac") ← Thông báo UI bắt đầu lấy MAC
├── _get_mac_from_arp(ip) × K ← Tra MAC song song cho K IP alive
└── return [{"ip": ..., "mac": "AA:BB:CC:..."}, ...] ← Sorted by IP
```
**Bước 1 — Ping Sweep** **Bước 1 — Ping Sweep**
- Gọi `_ping_sweep(network)`: gửi ICMP Echo Request đồng thời tới **toàn bộ host** trong dải mạng. - Gọi `_ping_sweep(network)`: gửi ICMP Echo Request đồng thời tới **toàn bộ host** trong dải mạng.
- Mỗi thiết bị phản hồi sẽ khiến hệ điều hành **tự ghi MAC vào ARP Cache**. - Chỉ các IP có `returncode == 0` (phản hồi thành công) mới được đưa vào danh sách `alive_ips`.
- Sau khi sweep xong, chờ `0.3s` để OS kịp finalize ARP cache (giảm từ 1s trước đây).
**Bước 2 + 3 — ARP Table & Scapy (song song)** **Bước 2 — Địa chỉ MAC (ARP Lookup)**
- Hai hàm `_collect_arp()``_collect_scapy()` được submit vào `ThreadPoolExecutor(max_workers=2)` và chạy đồng thời: - Từ danh sách `alive_ips`, tạo ThreadPoolExecutor (max_workers=32) để gọi `_get_mac_from_arp(ip)` đồng thời.
- `_collect_arp()`: đọc `arp -a`, parse regex lấy IP + MAC. - Trích xuất MAC address bằng pattern matching chéo nền tảng.
- `_collect_scapy()`: gửi ARP broadcast, nhận phản hồi trực tiếp từ thiết bị.
- Kết quả merge theo IP: ARP table làm nền, Scapy bổ sung IP còn thiếu.
**Bước 4 — Trả về kết quả** **Bước 3 — Trả về kết quả**
- Danh sách sort tăng dần theo IP: - Danh sách sort tăng dần theo IP:
`[{"ip": "192.168.1.2", "mac": "aa:bb:cc:dd:ee:ff"}, ...]` `[{"ip": "192.168.1.2", "mac": "AA:BB:CC:DD:EE:FF"}, ...]`
--- ---
@@ -40,48 +53,53 @@ Bước 2 và 3 chạy **song song** để giảm tổng thời gian scan.
### `_ping_one(ip, is_win)` ### `_ping_one(ip, is_win)`
Ping một IP đơn lẻ với timeout tối ưu theo nền tảng: Ping một IP đơn lẻ, trả về `True` nếu host phản hồi, `False` nếu không.
Timeout tối ưu theo nền tảng:
| OS | Lệnh | Timeout wait | Timeout process | | OS | Lệnh | Timeout wait | Timeout process |
| ------- | ------------------ | ----------------- | --------------- | | ------- | ------------------ | ------------ | --------------- |
| Windows | `ping -n 1 -w 300` | 300ms | 2s | | Windows | `ping -n 1 -w 300` | 300ms | 2s |
| macOS | `ping -c 1 -W 500` | 500ms (đơn vị ms) | 2s | | macOS | `ping -c 1 -W 300` | 300ms | 2s |
| Linux | `ping -c 1 -W 1` | 1s (đơn vị giây) | 2s | | Linux | `ping -c 1 -W 1` | 1s | 2s |
> macOS và Linux dùng cùng flag `-W` nhưng đơn vị khác nhau — được xử lý tách biệt theo `sys.platform`. > macOS và Linux dùng cùng flag `-W` nhưng đơn vị khác nhau (macOS: ms, Linux: giây) — được xử lý tách biệt theo `sys.platform`.
Windows sử dụng `CREATE_NO_WINDOW` flag để tránh mở cửa sổ console cho mỗi subprocess.
### `_ping_sweep(network, progress_cb)` ### `_ping_sweep(network, progress_cb)`
- Tạo `ThreadPoolExecutor(max_workers=len(hosts))`**toàn bộ host ping đồng thời**, không batching. - Tạo `ThreadPoolExecutor(max_workers=len(hosts))`**toàn bộ host ping đồng thời**, không batching.
- Giới hạn an toàn: chỉ chạy với subnet `/24` trở xuống (`num_addresses <= 256`). - Giới hạn an toàn: chỉ chạy với subnet `/24` trở xuống (`num_addresses <= 256`).
- Trả về danh sách IP (string) đã phản hồi thành công.
- Gọi `progress_cb(done, total)` sau mỗi ping để UI cập nhật thanh tiến độ. - Gọi `progress_cb(done, total)` sau mỗi ping để UI cập nhật thanh tiến độ.
### `_collect_arp()` (nội bộ trong `scan_network`) ### `_get_mac_from_arp(ip)`
Đọc và parse output `arp -a`, hỗ trợ đa nền tảng: - Gọi lệnh hệ điều hành để đọc ARP cache cho IP cụ thể:
- **Windows**: `arp -a <ip>`
- **macOS / Linux**: `arp -n <ip>`
- Sử dụng Regex để parse MAC address và trả về dưới format chuẩn `AA:BB:CC:DD:EE:FF`.
- Nếu không tìm thấy, fallback thành `"N/A"`.
- **Windows:** Regex nhận dạng dạng `cc-2d-21-a5-85-b0 dynamic`, chuẩn hóa MAC sang `cc:2d:21:a5:85:b0`. ### `scan_network(network, progress_cb, stage_cb)`
- **macOS/Linux:** Regex nhận dạng dạng `(192.168.1.1) at aa:bb:cc:dd:ee:ff`, bỏ qua entry `(incomplete)`.
### `_collect_scapy()` (nội bộ trong `scan_network`) - Entry point chính.
- `stage_cb("ping")``stage_cb("mac")` thông báo UI giai đoạn hiện tại.
- Gửi ARP `Who-has` broadcast (`Ether/ARP` qua `srp`) với **timeout 1s** (giảm từ 2s). - Trả về `list[dict]` với format `{"ip": str, "mac": str}`, sorted theo IP tăng dần.
- Stderr bị redirect tạm thời khi import scapy để tránh spam log ra console.
- Tự động bỏ qua nếu scapy không khả dụng (không có Npcap / không có quyền root).
--- ---
## 4. So sánh hiệu năng (trước và sau tối ưu) ## 4. Hiệu năng
| Thay đổi | Trước | Sau | | Thông số | Giá trị |
| --------------------------- | --------------------- | ------------------------------------ | | --------------------------- | ------------------------------------ |
| Ping workers | 100 (batching ~3 đợt) | `len(hosts)` (~254, tất cả cùng lúc) | | Ping workers | `len(hosts)` (~254, tất cả cùng lúc) |
| Ping timeout — Windows | 600ms | 300ms | | Ping timeout — Windows | 300ms |
| Ping timeout — macOS | 1ms (sai đơn vị) | 500ms | | Ping timeout — macOS | 300ms |
| Sleep sau ping sweep | 1.0s | 0.3s | | Ping timeout — Linux | 1s |
| ARP + Scapy | Tuần tự | **Song song** | | Process timeout | 2s |
| Scapy timeout | 2s | 1s | | **Tổng thời gian scan /24** | **~12s** |
| **Tổng thời gian scan /24** | ~57s | **~1.52s** |
--- ---
@@ -89,13 +107,14 @@ Ping một IP đơn lẻ với timeout tối ưu theo nền tảng:
**Ưu điểm:** **Ưu điểm:**
- Tỷ lệ phát hiện thiết bị cao nhờ kết hợp 3 lớp. - **Có đầy đủ IP và MAC**, rất hữu ích cho log và tracking lịch sử thiết bị nạp FW.
- Không cần quyền Admin/Root — ping sweep + ARP table vẫn tìm được ~90% thiết bị. - **Không bị dính ARP cache cũ**: Do chỉ lấy MAC của các IP vừa ping thành công.
- Tương thích đa nền tảng (Windows/macOS/Linux) qua xử lý riêng từng OS. - Không cần quyền Admin/Root.
- ARP table và Scapy chạy song song → không cộng dồn thời gian chờ. - Không phụ thuộc thư viện bên ngoài (không cần Npcap / Scapy phức tạp).
- Tương thích đa nền tảng (Windows/macOS/Linux).
- Cực nhanh nhờ cơ chế full-parallel.
**Nhược điểm:** **Nhược điểm:**
- Vẫn cần `sleep(0.3s)` để OS kịp ghi ARP cache sau ping sweep. - Thiết bị cố tình chặn gói tin ICMP (tắt ping) sẽ không bị phát hiện.
- Thiết bị tắt hoàn toàn cả ICMP lẫn ARP sẽ không bị phát hiện. - Ping 254 thiết bị cùng lúc bằng tiến trình con (`subprocess`) trên Windows có overhead hệ thống cao hơn Unix.
- Spawn ~254 process `ping` đồng thời trên Windows có overhead cao hơn Unix do Windows tạo process chậm hơn.

188
main.py
View File

@@ -315,8 +315,8 @@ class App(QWidget):
mc_layout.addWidget(meth_lbl) mc_layout.addWidget(meth_lbl)
self.method_combo = QComboBox() self.method_combo = QComboBox()
self.method_combo.addItem("🌐 API (LuCI)", "api")
self.method_combo.addItem("💻 SSH", "ssh") self.method_combo.addItem("💻 SSH", "ssh")
self.method_combo.addItem("🌐 API (LuCI)", "api")
self.method_combo.setMinimumWidth(140) self.method_combo.setMinimumWidth(140)
self.method_combo.setFixedHeight(28) self.method_combo.setFixedHeight(28)
self.method_combo.setStyleSheet(""" self.method_combo.setStyleSheet("""
@@ -457,11 +457,14 @@ class App(QWidget):
lbl.setStyleSheet("color: #cdd6f4; font-weight: bold;") lbl.setStyleSheet("color: #cdd6f4; font-weight: bold;")
return lbl return lbl
# IPs permanently hidden from the device list (never shown in UI)
_HIDDEN_IPS = {"192.168.11.102"}
def _get_filtered_devices(self): def _get_filtered_devices(self):
"""Return devices filtered based on show_all checkbox.""" """Return devices filtered based on show_all checkbox."""
if self.show_all_cb.isChecked(): if self.show_all_cb.isChecked():
return list(self.all_devices) return [d for d in self.all_devices if d["ip"] not in self._HIDDEN_IPS]
excluded = {self.local_ip, self.gateway_ip} excluded = {self.local_ip, self.gateway_ip} | self._HIDDEN_IPS
return [d for d in self.all_devices if d["ip"] not in excluded] return [d for d in self.all_devices if d["ip"] not in excluded]
def _refresh_table(self): def _refresh_table(self):
@@ -532,17 +535,77 @@ class App(QWidget):
item.setCheckState(Qt.CheckState.Unchecked) item.setCheckState(Qt.CheckState.Unchecked)
def _show_history(self): def _show_history(self):
"""Show history of flashed devices in this session.""" """Show history of flashed devices in this session using a table dialog."""
if not self.flashed_macs: if not self.flashed_macs:
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.") QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
else: return
lines = []
for mac in sorted(self.flashed_macs.keys()): from PyQt6.QtWidgets import QDialog
dialog = QDialog(self)
dialog.setWindowTitle("Flash History")
dialog.resize(600, 400)
dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
layout = QVBoxLayout(dialog)
layout.setContentsMargins(10, 10, 10, 10)
table = QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
table.setAlternatingRowColors(True)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
header = table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 80)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
table.verticalHeader().setVisible(False)
table.setRowCount(len(self.flashed_macs))
success_count = 0
fail_count = 0
for row, mac in enumerate(sorted(self.flashed_macs.keys())):
ip, _, result, ts = self.flashed_macs[mac] ip, _, result, ts = self.flashed_macs[mac]
icon = "" if result.startswith("DONE") else "" ok = result.startswith("DONE")
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}") if ok:
msg = f"Flash History ({len(self.flashed_macs)} device(s)):\n\n" + "\n".join(lines) success_count += 1
QMessageBox.information(self, "Flash History", msg) else:
fail_count += 1
time_item = QTableWidgetItem(ts)
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, 0, time_item)
table.setItem(row, 1, QTableWidgetItem(ip))
table.setItem(row, 2, QTableWidgetItem(mac))
res_item = QTableWidgetItem(f"{result}" if ok else f"{result}")
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
table.setItem(row, 3, res_item)
layout.addWidget(table)
summary_lbl = QLabel(f"Total: {len(self.flashed_macs)} | ✅ {success_count} | ❌ {fail_count}")
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
layout.addWidget(summary_lbl)
btn_close = QPushButton("Close")
btn_close.setFixedHeight(30)
btn_close.setFixedWidth(100)
btn_close.clicked.connect(dialog.accept)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(btn_close)
layout.addLayout(btn_layout)
dialog.exec()
# ── Actions ── # ── Actions ──
@@ -611,6 +674,11 @@ class App(QWidget):
self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m") self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m")
self.scan_status.setText("⏳ Pinging all hosts...") self.scan_status.setText("⏳ Pinging all hosts...")
self.scan_status.setStyleSheet("color: #f9e2af;") self.scan_status.setStyleSheet("color: #f9e2af;")
elif stage == "mac":
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Reading MAC addresses...")
self.scan_status.setText("⏳ Reading MAC addresses from ARP...")
self.scan_status.setStyleSheet("color: #f9e2af;")
elif stage == "arp": elif stage == "arp":
self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Reading ARP cache...") self.scan_progress_bar.setFormat(" Reading ARP cache...")
@@ -772,7 +840,6 @@ class App(QWidget):
row = self._flash_row_map.get(index, index) row = self._flash_row_map.get(index, index)
mac_item = self.table.item(row, 2) mac_item = self.table.item(row, 2)
ip_item = self.table.item(row, 1) ip_item = self.table.item(row, 1)
import datetime
now_str = datetime.datetime.now().strftime("%H:%M:%S") now_str = datetime.datetime.now().strftime("%H:%M:%S")
mac_str = mac_item.text().strip() if mac_item else "" mac_str = mac_item.text().strip() if mac_item else ""
ip_str = ip_item.text().strip() if ip_item else "" ip_str = ip_item.text().strip() if ip_item else ""
@@ -933,10 +1000,22 @@ class App(QWidget):
meth_lbl.setStyleSheet("font-weight: bold; font-size: 12px;") meth_lbl.setStyleSheet("font-weight: bold; font-size: 12px;")
row2.addWidget(meth_lbl) row2.addWidget(meth_lbl)
self.auto_method_combo = QComboBox() self.auto_method_combo = QComboBox()
self.auto_method_combo.addItem("🌐 API (LuCI)", "api")
self.auto_method_combo.addItem("💻 SSH", "ssh") self.auto_method_combo.addItem("💻 SSH", "ssh")
self.auto_method_combo.setFixedHeight(28) self.auto_method_combo.addItem("🌐 API (LuCI)", "api")
self.auto_method_combo.setFixedHeight(34)
self.auto_method_combo.setMinimumWidth(140) self.auto_method_combo.setMinimumWidth(140)
self.auto_method_combo.setStyleSheet("""
QComboBox {
background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 6px;
padding: 2px 8px; color: #e2e8f0; font-size: 13px; font-weight: bold;
}
QComboBox:hover { border-color: #7eb8f7; }
QComboBox::drop-down { border: none; width: 20px; }
QComboBox QAbstractItemView {
background-color: #2d3352; color: #e2e8f0;
border: 1px solid #3d4a6b; selection-background-color: #3d4a6b;
}
""")
row2.addWidget(self.auto_method_combo) row2.addWidget(self.auto_method_combo)
sep3 = QLabel("") sep3 = QLabel("")
@@ -1072,14 +1151,13 @@ class App(QWidget):
self.auto_log_content.setWordWrap(True) self.auto_log_content.setWordWrap(True)
self.auto_log_content.setAlignment(Qt.AlignmentFlag.AlignTop) self.auto_log_content.setAlignment(Qt.AlignmentFlag.AlignTop)
self.auto_log_content.setStyleSheet( self.auto_log_content.setStyleSheet(
"color: #cdd6f4; font-size: 10px; font-family: 'SF Mono', 'Menlo', monospace;" "color: #cdd6f4; font-size: 12px; font-family: 'Consolas', 'Courier New', monospace;"
"padding: 4px; background-color: #11121d; border-radius: 4px;" "padding: 6px; background-color: #11121d; border-radius: 4px;"
) )
self.auto_log_content.setTextFormat(Qt.TextFormat.PlainText) self.auto_log_content.setTextFormat(Qt.TextFormat.PlainText)
self.auto_log_area.setWidget(self.auto_log_content) self.auto_log_area.setWidget(self.auto_log_content)
self.auto_log_area.setWidgetResizable(True) self.auto_log_area.setWidgetResizable(True)
self.auto_log_area.setMinimumHeight(120) self.auto_log_area.setMinimumHeight(150)
self.auto_log_area.setMaximumHeight(280)
self.auto_log_area.setStyleSheet("QScrollArea { border: 1px solid #2d3748; border-radius: 4px; background-color: #11121d; }") self.auto_log_area.setStyleSheet("QScrollArea { border: 1px solid #2d3748; border-radius: 4px; background-color: #11121d; }")
log_layout.addWidget(self.auto_log_area) log_layout.addWidget(self.auto_log_area)
@@ -1089,15 +1167,76 @@ class App(QWidget):
# ── Auto Flash Actions ── # ── Auto Flash Actions ──
def _show_auto_history(self): def _show_auto_history(self):
"""Show history of auto-flashed devices using a table dialog."""
if not self._auto_history: if not self._auto_history:
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.") QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
return return
lines = []
for ip, mac, result, ts in self._auto_history: from PyQt6.QtWidgets import QDialog
icon = "" if result.startswith("DONE") else "" dialog = QDialog(self)
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}") dialog.setWindowTitle("Auto Flash History")
msg = f"Flash History ({len(self._auto_history)} device(s)):\n\n" + "\n".join(lines) dialog.resize(600, 400)
QMessageBox.information(self, "Flash History", msg) dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
layout = QVBoxLayout(dialog)
layout.setContentsMargins(10, 10, 10, 10)
table = QTableWidget()
table.setColumnCount(4)
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
table.setAlternatingRowColors(True)
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
header = table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 80)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
table.verticalHeader().setVisible(False)
table.setRowCount(len(self._auto_history))
success_count = 0
fail_count = 0
for row, (ip, mac, result, ts) in enumerate(self._auto_history):
ok = result.startswith("DONE")
if ok:
success_count += 1
else:
fail_count += 1
time_item = QTableWidgetItem(ts)
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
table.setItem(row, 0, time_item)
table.setItem(row, 1, QTableWidgetItem(ip))
table.setItem(row, 2, QTableWidgetItem(mac))
res_item = QTableWidgetItem(f"{result}" if ok else f"{result}")
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
table.setItem(row, 3, res_item)
layout.addWidget(table)
summary_lbl = QLabel(f"Total: {len(self._auto_history)} | ✅ {success_count} | ❌ {fail_count}")
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
layout.addWidget(summary_lbl)
btn_close = QPushButton("Close")
btn_close.setFixedHeight(30)
btn_close.setFixedWidth(100)
btn_close.clicked.connect(dialog.accept)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(btn_close)
layout.addLayout(btn_layout)
dialog.exec()
def _auto_select_firmware(self): def _auto_select_firmware(self):
file, _ = QFileDialog.getOpenFileName( file, _ = QFileDialog.getOpenFileName(
@@ -1255,6 +1394,7 @@ class App(QWidget):
self._auto_fail_count += 1 self._auto_fail_count += 1
self.auto_result_table.setItem(row, 3, item) self.auto_result_table.setItem(row, 3, item)
self._auto_history.append((ip, mac.upper(), result, now_str)) self._auto_history.append((ip, mac.upper(), result, now_str))
total = len(self._auto_device_rows) total = len(self._auto_device_rows)
done = self._auto_success_count + self._auto_fail_count done = self._auto_success_count + self._auto_fail_count
self.auto_summary_label.setText( self.auto_summary_label.setText(

View File

@@ -1 +1 @@
1.2.1 1.2.3