18 Commits
V1.1.2 ... main

Author SHA1 Message Date
50c20c841d update UI 2026-03-13 17:26:09 +07:00
52c7b4b968 update doc 2026-03-12 23:55:26 +07:00
aa36758ec9 update scan ip with MAC, update UI show history 2026-03-12 23:51:43 +07:00
22e4436518 update README.md 2026-03-12 23:35:37 +07:00
aa6a114071 update default loadFW (SSH) 2026-03-12 23:21:21 +07:00
627197e29e fix bug scan ip 2026-03-10 18:55:03 +07:00
adf7253350 fix bug scan ip (win) 2026-03-10 18:10:43 +07:00
f5ee209726 fix bug scan ip 2026-03-10 17:54:56 +07:00
466dadf1c9 update UI 2026-03-09 20:49:13 +07:00
b4745214f2 update UI 2026-03-09 20:29:48 +07:00
910307967b fix bug UI 2026-03-09 19:51:45 +07:00
042c50536c update UI, version 2026-03-09 19:33:00 +07:00
19febeaf3f update version 2026-03-09 17:20:30 +07:00
47f3320c3d Thêm tính năng tự động hóa nạp FW 2026-03-09 17:20:07 +07:00
56b688766e update doc 2026-03-09 14:12:43 +07:00
69b620f832 update doc 2026-03-09 14:11:44 +07:00
a7c41a7235 update scan ip 2026-03-09 13:53:10 +07:00
594d13c0cc update README.md 2026-03-09 13:19:00 +07:00
8 changed files with 2075 additions and 477 deletions

302
README.md
View File

@@ -1,31 +1,44 @@
# ⚡ IoT Firmware Loader (MiraV3) # ⚡ Mira Firmware Loader
Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng loạt** cho các thiết bị OpenWrt (qua giao diện LuCI) trong mạng LAN. Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng loạt** cho các thiết bị OpenWrt trong mạng LAN.
Hỗ trợ nạp **thủ công** (chọn thiết bị → flash) và **tự động hóa** (scan → phát hiện → flash → retry).
> **Tech stack:** Python 3.9+ · PyQt6 · Scapy · Requests · PyInstaller > **Tech stack:** Python 3.9+ · PyQt6 · Paramiko/SCP · Requests · PyInstaller
> **Phiên bản:** `1.2.0`
--- ---
## 📁 Cấu trúc dự án ## 📁 Cấu trúc dự án
```text ```text
iot_fw_loader/ Mira_Firmware_Loader/
├── main.py # UI chính (PyQt6) ├── main.py # UI chính (PyQt6) — App + AutoFlashWindow
├── core/ # Các thành phần cốt lõi và xử lý đa luồng ├── version.txt # Số phiên bản ứng dụng
│ ├── workers.py # Quản lý luồng dùng chung (ScanThread, FlashThread) ├── requirements.txt # Danh sách dependencies
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy) ├── core/
│ ├── flasher.py # Flash firmware và tự động hóa qua OpenWrt LuCI bằng API │ ├── scanner.py # Quét thiết bị mạng (Ping sweep đa luồng)
── ssh_flasher.py # Load/Update firmware qua đường dẫn SSH ── workers.py # ScanThread — chạy scanner trong background thread
├── ui/ # Các component thiết kế giao diện │ ├── api_flash.py # Flash firmware qua LuCI HTTP API
│ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox, etc.) │ ├── ssh_utils.py # SSH/SCP transport helpers dùng chung
── styles.py # Các cấu hình Stylesheet ── ssh_new_flash.py # Luồng SSH cho chế độ Nạp Mới (Telnet → set passwd → SSH)
├── utils/ # Các hàm helper tiện ích │ ├── ssh_update_flash.py # Luồng SSH cho chế độ Update (SSH trực tiếp)
│ ├── network.py # Các hàm xử lý IP, Hostname │ ├── flash_new_worker.py # NewFlashThread — QThread điều phối Nạp Mới FW
── system.py # Các hàm lấy thông tin máy và resources ── flash_update_worker.py # UpdateFlashThread — QThread điều phối Update FW
├── run.sh # Script khởi chạy (macOS/Linux) │ └── auto_flash_worker.py # AutoFlashWorker — QThread tự động scan → flash → retry
├── run.bat # Script khởi chạy (Windows) ├── ui/
├── build_windows.bat # Script đóng gói thành file .exe độc lập (Windows) │ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox)
└── venv/ # Python virtual environment (tự tạo) │ └── styles.py # Stylesheet toàn ứng dụng (STYLE + AUTO_STYLE)
├── utils/
│ ├── network.py # Helper IP / network (get_local_ip, get_default_network)
│ └── system.py # Lấy thông tin máy, resource path cho PyInstaller
├── docs/
│ ├── api_flash_docs.md # Tài liệu kỹ thuật LuCI API flash
│ ├── load_fw_ssh_docs.md # Tài liệu kỹ thuật SSH flash (cả 2 luồng)
│ ├── scanner_docs.md # Tài liệu kỹ thuật scanner
│ └── auto_flash_docs.md # Tài liệu kỹ thuật tự động hóa nạp FW
├── run.sh # Script khởi chạy (macOS/Linux)
├── run.bat # Script khởi chạy (Windows)
└── build_windows.bat # Script đóng gói thành .exe (Windows, PyInstaller)
``` ```
--- ---
@@ -34,11 +47,10 @@ iot_fw_loader/
### Yêu cầu ### Yêu cầu
- Python 3.9+ - Python **3.9+**
- Các thư viện: `PyQt6`, `scapy`, `requests`, `pyinstaller` (để build). - Thư viện: `PyQt6`, `requests`, `paramiko`, `scp`, `pyinstaller` (để build)
- _Trên Windows:_ Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
### Khởi chạy nhanh (Môi trường Dev) ### Khởi chạy nhanh
**macOS / Linux:** **macOS / Linux:**
@@ -54,48 +66,58 @@ run.bat
> Script tự tạo `venv` và cài dependencies nếu chưa có. > Script tự tạo `venv` và cài dependencies nếu chưa có.
### 📦 Build ra file chạy độc lập (.exe) cho Windows ### 📦 Build file chạy độc lập (.exe) cho Windows
Chạy script sau để tự động đóng gói ứng dụng thành 1 file `.exe` duy nhất (không cần cài Python trên máy đích):
```bat ```bat
build_windows.bat build_windows.bat
``` ```
File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`. Output: `dist\Mira_Firmware_Loader.exe` — không cần cài Python trên máy đích.
--- ---
## 🏗 Kiến trúc hệ thống ## 🏗 Kiến trúc hệ thống
```text ```
┌───────────────────────────────────────────────────────────── ┌──────────────────────────────────────────────────────────┐
│ main.py (UI) main.py (UI) │
┌───────────┐ ┌───────────┐ ┌────────────────────────────┐
Machine │ Firmware │ Network Scan (QThread) │ Machine Info │ Firmware Selector │ Network Scan
│ Info │ │ Selector │ │ Tham số: Network (CIDR) │ ─────────────────────────────────────────────────────
└───────────┘ └───────────┘ └────────┬───────────────────┘ Device Table [ ] IP │ MAC │ Status
────────────────────────────────────┼─────────────────── │ ─────────────────────────────────────────────────────
│ Device Table Flash Controls
[ ] IP │ Name │ MAC │ Status │ Flash Mode: [ New Flash | Update Firmware ]
(Hỗ trợ lọc ẩn Gateway & Self) │ Method: [ API (LuCI) | SSH (paramiko) ]
└────────────────────────────────────┼───────────────────┘ Concurrent devices: [SpinBox]
┌────────────────────────────────────┼───────────────────┐ [ ⚡ FLASH SELECTED DEVICES ]
│ Flash Controls + Progress Bar │ │ ─────────────────────────────────────────────────────
│ (ThreadPoolExecutor) │ [ 🤖 Tự động hóa nạp FW ]
└────────────────────────────────────┼───────────────────┘ │ └────────────────────────┬─────────────────────────────────
└───────────────────────────────────────┼─────────────────────┘
┌─────────────────┼─────────────────┐
┌───────────────────┼───────────────────┐ │ │
│ │ ┌──────▼──────┐ ┌───────▼──────────┐ ┌───▼──────────────────┐
┌─────▼─────┐ ┌─────▼─────┐ scanner.py │ Flash Workers AutoFlashWorker │
scanner.py│ │ flasher.py │ (thủ công) (tự động hóa)
│ Ping Sweep │ │ │
│ 1. Ping │ │ LuCI HTTP │ (Multithread│ │ NewFlashThread Phase 1: Scan LAN
│ Sweep │ │ /cgi-bin/ gọi OS │ │ ├─ api_flash (tối đa 15 lần)
│ 2. arp -a │ │ luci ping cmd) │ │ └─ ssh_new_flash│ │ Phase 2: Flash
3. Scapy │ (auto-retry x3)
└───────────┘ └───────────┘ ─────────── UpdateFlash ├─ api_flash │
│ Thread │ │ └─ ssh_new_flash │
│ └─ ssh_update │ └──────────────────────┘
└──────────────────┘
┌──────────▼──────────┐
│ ssh_utils.py │
│ (Transport Layer) │
│ _create_ssh_client │
│ _upload_firmware │
│ _verify_firmware │
│ _sync_and_sysupgr │
└─────────────────────┘
``` ```
--- ---
@@ -104,44 +126,160 @@ File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`.
### 1. Quét mạng (Network Scan) ### 1. Quét mạng (Network Scan)
Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát hiện đa dạng các thiết bị mạng mà vẫn duy trì khả năng tránh spam/lag cho OS: `scanner.py` sử dụng chiến lược **Ping trước → ARP sau**, đảm bảo vừa lấy được MAC vừa loại bỏ hoàn toàn lỗi thiết bị ảo do ARP cache cũ:
1. **Ping Sweep:** Gửi gói tin Ping song song (tối đa 50 threads) để đánh thức thiết bị và điền IP/MAC vào bảng ARP Cache của hệ điều hành. | Giai đoạn | Mô tả |
2. **ARP Table Fallback:** Đọc bảng ARP nội bộ của OS (`arp -a`) bằng Regex. Hoạt động đa nền tảng (Windows/macOS/Linux) mà không cần quyền Admin/Root. | -------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ |
3. **Scapy ARP (Tính năng nâng cao):** Gửi các gói tin ARP Broadcast trực tiếp để đảm bảo bao phủ gap nếu có. Yêu cầu quyền Root trên Linux/macOS hoặc Npcap trên Windows. | **Ping Sweep** | Gửi ping đồng thời tới toàn bộ host trong dải mạng bằng tiến trình con (đa luồng). Nhằm xác định chính xác thiết bị nào đang thực sự cấp nguồn online. |
_Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị._ | **ARP Lookup** | Ngay sau khi ping, query ARP table hệ điều hành (song song) _chỉ cho các IP vừa ping thành công_, lấy MAC Address chuẩn xác trước khi cache cũ hết hạn. |
### 2. Giao diện thiết bị (Device Table) Kết quả (IP và MAC) được sort tăng dần theo IP trước khi trả về UI.
- Thiết bị được thu thập sẽ hiện ra trên UI dạng bảng, với tính năng tuỳ chọn hiển thị (checkbox ẩn Gateway mạng hiện tại và chính máy tính đang quét phần mềm). ### 2. Bảng thiết bị (Device Table)
- Trạng thái các luồng UI được tách rời bằng QThread + QPropertyAnimation cho hộp Collapsible nhằm tự động cuộn khi ẩn hiện nội dung, không làm treo ứng dụng.
### 3. Nạp Firmware (OpenWrt Flasher) - Hiển thị cột: checkbox, IP, MAC, Status
- Mặc định ẩn gateway và IP máy tính đang chạy (có thể bật "Show all")
- Thiết bị đã flash trong session được đánh dấu "Already Flashed" và tự bỏ tick
Từ phiên bản hiện tại, phương thức ESP32 OTA không còn được áp dụng, thay vào đó module `flasher.py` tự động hóa quá trình update của router **OpenWrt (Barrier Breaker 14.07)**: ### 3. Flash Firmware (Thủ công)
- **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`. Có 2 chế độ và 2 method, tổng cộng 3 luồng thực thi khác nhau:
- **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
- **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có). #### Chế độ `New Flash` — dùng cho thiết bị vừa reset cứng
- **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
- 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh. | Method | Luồng | Mô tả |
| -------------- | ------------------ | ------------------------------------------------------------------------------------------- |
| **API (LuCI)** | `api_flash.py` | Đăng nhập LuCI → upload firmware → Proceed. Hỗ trợ Barrier Breaker 14.07 và OpenWrt mới hơn |
| **SSH** | `ssh_new_flash.py` | Kết nối Telnet → đặt password mới → SSH → SCP upload → sysupgrade |
**Luồng SSH New Flash chi tiết:**
1. Telnet port 23 (thiết bị vừa reset, chưa có pass)
2. Đặt password `admin123a` qua lệnh `passwd`
3. SSH vào với password vừa đặt
4. SCP upload firmware lên `/tmp/`
5. Verify file tồn tại
6. `sync && sysupgrade -F -v -n` → thiết bị reboot (connection drop = DONE)
#### Chế độ `Update Firmware` — dùng cho thiết bị đang chạy OpenWrt
Luồng `ssh_update_flash.py`, SSH trực tiếp (không qua Telnet):
1. SSH kết nối với `root` / `admin123a` (fallback: `admin`)
2. SCP upload firmware lên `/tmp/`
3. Verify file tồn tại
4. `sync && sysupgrade -F -v -n` → thiết bị reboot
> ⚠️ Update Mode hiển thị cảnh báo nếu IP thiết bị khác `192.168.11.102` và yêu cầu xác nhận.
### 4. 🤖 Tự động hóa nạp FW (MỚI)
Tính năng nạp FW hoàn toàn tự động — chỉ cần cấu hình và nhấn bắt đầu:
```
Cấu hình → Auto Scan LAN → Phát hiện đủ thiết bị → Auto Flash → Auto Retry → Thông báo
```
#### 4.1. Quy trình
| Phase | Mô tả |
| --------------------- | ------------------------------------------------------------------------- |
| **Phase 1: Scan LAN** | Scan mạng liên tục mỗi 5 giây, tối đa **15 lần**, cho đến khi đủ thiết bị |
| **Phase 2: Flash** | Nạp FW song song qua ThreadPoolExecutor, tự động **retry tối đa 3 lần** |
#### 4.2. Cấu hình
| Tham số | Mô tả | Mặc định |
| --------------- | --------------------------------------------- | ------------------- |
| **Firmware** | File firmware (.bin/.hex/.uf2) | Lấy từ cửa sổ chính |
| **Mạng** | Dải mạng LAN cần scan | Tự suy từ IP host |
| **Số lượng** | Số thiết bị cần nạp | 5 |
| **Phương thức** | API (LuCI) hoặc SSH | API (LuCI) |
| **Song song** | Số thiết bị nạp cùng lúc (0 = không giới hạn) | 10 |
#### 4.3. Auto-Retry nạp FW
Khi một thiết bị nạp thất bại, hệ thống tự động retry:
```
Lần 1 → FAIL: Connection timeout
⚠️ Log cảnh báo, chờ 2 giây...
Lần 2 → FAIL: Upload error
⚠️ Log cảnh báo, chờ 2 giây...
Lần 3 → DONE ✅ (hoặc ❌ báo lỗi sau 3 lần)
```
- Tối đa **3 lần retry** mỗi thiết bị (`MAX_FLASH_RETRIES`)
- Chờ **2 giây** giữa mỗi lần retry để thiết bị ổn định
- Nếu hết 3 lần vẫn fail → đánh dấu ❌, tiếp tục thiết bị tiếp theo
#### 4.4. Scan Timeout
- Nếu scan **15 lần** mà chưa đủ thiết bị → dừng và hiện cảnh báo
- Gợi ý kiểm tra: thiết bị đã bật chưa, dải mạng có đúng không
#### 4.5. Quy tắc bảo vệ IP `192.168.11.102`
| Chế độ | Được nạp 192.168.11.102? | Cơ chế |
| ------------------------ | :----------------------: | ----------------------------------- |
| **New Flash (thủ công)** | ❌ Không | Chặn trước khi flash, hiện cảnh báo |
| **Update FW (thủ công)** | ✅ Có | Cho phép bình thường |
| **Tự động hóa** | ❌ Không | Tự động lọc khỏi kết quả scan |
#### 4.6. Lịch sử nạp
Kết quả nạp được lưu ở 2 cấp:
| Nơi lưu | Phạm vi | Format |
| ------------------------------- | ---------------------- | ------------------------------------ |
| `AutoFlashWindow._auto_history` | Phiên tự động hiện tại | `list[(ip, mac, result, timestamp)]` |
| `App.flashed_macs` | Toàn bộ session app | `dict{MAC: (ip, mac, result, ts)}` |
- Cả thành công ✅ lẫn thất bại ❌ đều được ghi lại
- Nút "📋 Lịch sử nạp" hiển thị cùng format ở cả 2 cửa sổ: `[HH:MM:SS] ✅/❌ IP (MAC) — result`
### 5. Xử lý song song
| Tham số | Mô tả |
| ---------------------- | ------------------------------------------------------------------------ |
| **Concurrent devices** | Số thiết bị flash đồng thời (`ThreadPoolExecutor`). `0` = không giới hạn |
--- ---
## ⚙️ Cấu hình ## ⚙️ Cấu hình mặc định
| Tham số | Mô tả | | Tham số | Giá trị mặc định | Mô tả |
| ---------------------- | ------------------------------------------------------------------- | | ---------------------- | ------------------------------------ | --------------------------------- |
| **Network** | Dải mạng quét (vd: `192.168.1.0/24`). Tự động tính từ local IP app. | | **Network** | Tự suy ra từ local IP (`x.x.x.0/24`) | Dải mạng để quét |
| **Show all** | Tuỳ chọn cho phép liệt kê cả Gateway IP máy chủ. | | **Flash Mode** | `New Flash` | Nạp mới hoặc Update |
| **Concurrent devices** | Số luồng để đĩa flash song song. `0` = Không giới hạn. | | **Method** | `API (LuCI)` | Phương thức flash cho New Flash |
| **Firmware filter** | Mặc định hiển thị và hỗ trợ `.bin`, `.hex`, `.uf2`. | | **SSH User** | `root` | Hardcoded, không hiển thị trên UI |
| **SSH Password** | `admin123a` | Hardcoded, không hiển thị trên UI |
| **Concurrent devices** | `10` | Số luồng flash song song |
| **Show all** | Tắt | Ẩn gateway và IP máy host |
| **MAX_FLASH_RETRIES** | `3` | Số lần retry nạp FW (tự động) |
| **MAX_SCAN_ROUNDS** | `15` | Số lần scan tối đa (tự động) |
---
## 🛡️ Xử lý lỗi tổng hợp
| Tình huống | Hành vi |
| -------------------------------- | --------------------------------------------------- |
| Scan exception (network error) | Log lỗi, chờ 3s, scan lại (đếm vào MAX_SCAN_ROUNDS) |
| Scan 15 lần chưa đủ thiết bị | Hiện popup cảnh báo, dừng tự động |
| Flash thất bại lần 12 (tự động) | Log cảnh báo, chờ 2s, retry tự động |
| Flash thất bại sau 3 lần retry | Log lỗi, đánh dấu ❌, tiếp tục device tiếp theo |
| User nhấn DỪNG | Set stop flag, dừng scan/flash an toàn |
| IP 192.168.11.102 + New Flash | Chặn ngay, hiện cảnh báo |
| Chưa chọn firmware | Hiện popup cảnh báo, không cho bắt đầu |
| Mạng không hợp lệ | Hiện popup cảnh báo, không cho bắt đầu |
--- ---
## 🔒 Lưu ý bảo mật & Quyền ## 🔒 Lưu ý bảo mật & Quyền
- **Quyền Scanner:** Ở chế độ quét Scapy (Sâu), cần khởi chạy bằng quyền Admin (Windows) hoặc Sudo (macOS/Linux), tuy nhiên App có thể chạy kể cả không có quyền Admin nhờ chiến lược Ping Sweep + `arp -a`. - **Quyền hệ thống:** Quá trình quét của `scanner.py` chỉ chạy lệnh ping từ hệ điều hành, vì vậy ứng dụng có thể chạy hoàn toàn **không cần** quyền Admin (trên Windows) / Root (trên macOS/Linux).
- **Ẩn Console (Window):** Khi gọi subproces (`ping`, `arp`), ứng dụng có tích hợp thuộc tính platform `CREATE_NO_WINDOW` để ngăn chặn các terminal đen giật nháy trên màn hình Windows. - **CREATE_NO_WINDOW:** Khi gọi subprocess lệnh `ping`, ứng dụng dùng flag `CREATE_NO_WINDOW` trên Windows để ngăn cửa sổ console command prompt (cmd) liên tục hiện lên màn hình.
- **Bảo mật mạng:** Quá trình tải firmware lên thiết bị thông qua HTTP thuần, chỉ nên sử dụng ở mạng LAN nội bộ, tránh những mạng mở hoặc public để tránh lộ file firmware. - **HTTP thuần:** Firmware được upload qua HTTP (không HTTPS). Chỉ dùng trong mạng LAN nội bộ, không nên dùng trên mạng mở.
- **LuCI Login:** API này nhắm thẳng vào quá trình đăng nhập qua form, nên sẽ tự động handle các router đang dùng OpenWrt Barrier Breaker mà không cần phải can thiệp tay. - **Credentials cố định:** SSH credentials (`root`/`admin123a`) được hardcode trong `flash_update_worker.py` và truyền từ `main.py`, không hiển thị trên UI.

200
core/auto_flash_worker.py Normal file
View File

@@ -0,0 +1,200 @@
"""
Worker thread cho chế độ "Tự động hóa nạp FW".
Flow:
1. Scan mạng LAN liên tục (tối đa max_scan_rounds lần)
2. Khi phát hiện đủ số thiết bị yêu cầu → bắt đầu nạp FW
3. Nạp FW theo phương thức đã chọn (API / SSH), tự động retry nếu lỗi
4. Thông báo khi hoàn thành
"""
import time
import ipaddress
from PyQt6.QtCore import QThread, pyqtSignal
from concurrent.futures import ThreadPoolExecutor
from core.scanner import scan_network
from core.api_flash import flash_device_api
from core.ssh_new_flash import flash_device_new_ssh
MAX_FLASH_RETRIES = 3 # Số lần retry nạp FW khi thất bại
MAX_SCAN_ROUNDS = 10 # Số lần scan tối đa trước khi báo không đủ thiết bị
class AutoFlashWorker(QThread):
"""Tự động scan → flash khi đủ số lượng thiết bị."""
# Signals
log_message = pyqtSignal(str) # log message cho UI
scan_found = pyqtSignal(int) # số device tìm thấy trong lần scan hiện tại
devices_ready = pyqtSignal(list) # danh sách devices sẵn sàng flash [{ip, mac}, ...]
device_status = pyqtSignal(str, str) # ip, status message
device_done = pyqtSignal(str, str, str) # ip, mac, result ("DONE"/"FAIL:...")
flash_progress = pyqtSignal(int, int) # done_count, total
all_done = pyqtSignal(int, int) # success_count, fail_count
scan_timeout = pyqtSignal(int, int) # found_count, target_count — scan hết lần mà chưa đủ
stopped = pyqtSignal() # khi dừng bởi user
def __init__(self, network, target_count, method, max_workers,
firmware_path, local_ip="", gateway_ip="",
ssh_user="root", ssh_password="admin123a",
ssh_backup_password="admin123a", set_passwd=True):
super().__init__()
self.network = network
self.target_count = target_count
self.method = method
self.max_workers = max_workers
self.firmware_path = firmware_path
self.local_ip = local_ip
self.gateway_ip = gateway_ip
self.ssh_user = ssh_user
self.ssh_password = ssh_password
self.ssh_backup_password = ssh_backup_password
self.set_passwd = set_passwd
self._stop_flag = False
def stop(self):
self._stop_flag = True
def run(self):
self.log_message.emit("🚀 Bắt đầu chế độ tự động hóa nạp FW...")
self.log_message.emit(f" Mục tiêu: {self.target_count} thiết bị | Phương thức: {self.method.upper()} | Song song: {self.max_workers}")
self.log_message.emit(f" Mạng: {self.network}")
self.log_message.emit("")
# ── Phase 1: Scan liên tục cho đến khi đủ thiết bị (tối đa MAX_SCAN_ROUNDS lần) ──
devices = []
scan_round = 0
excluded = {self.local_ip, self.gateway_ip}
best_found = 0
while not self._stop_flag:
scan_round += 1
self.log_message.emit(f"🔍 Scan lần {scan_round}/{MAX_SCAN_ROUNDS}...")
try:
results = scan_network(str(self.network))
except Exception as e:
self.log_message.emit(f"❌ Scan thất bại: {e}")
if self._stop_flag:
break
time.sleep(3)
continue
# Lọc bỏ gateway, local IP, và 192.168.11.102 (chỉ update mới được nạp)
filtered = [d for d in results if d["ip"] not in excluded and d["ip"] != "192.168.11.102"]
found_count = len(filtered)
best_found = max(best_found, found_count)
self.scan_found.emit(found_count)
self.log_message.emit(f" Tìm thấy {found_count}/{self.target_count} thiết bị")
if found_count >= self.target_count:
# Chỉ lấy đúng số lượng yêu cầu
devices = filtered[:self.target_count]
self.log_message.emit(f"✅ Đủ {self.target_count} thiết bị! Bắt đầu nạp FW...")
self.log_message.emit("")
break
if self._stop_flag:
break
# Kiểm tra đã scan quá số lần tối đa
if scan_round >= MAX_SCAN_ROUNDS:
self.log_message.emit(f"⚠️ Đã scan {MAX_SCAN_ROUNDS} lần mà chỉ tìm thấy {best_found}/{self.target_count} thiết bị.")
self.scan_timeout.emit(best_found, self.target_count)
self.stopped.emit()
return
self.log_message.emit(f" Chưa đủ, chờ 5 giây rồi scan lại...")
# Chờ 5 giây nhưng check stop flag mỗi 0.5s
for _ in range(10):
if self._stop_flag:
break
time.sleep(0.5)
if self._stop_flag:
self.log_message.emit("⛔ Đã dừng bởi người dùng.")
self.stopped.emit()
return
# ── Phase 2: Flash ──
total = len(devices)
success_count = 0
fail_count = 0
done_count = 0
self.flash_progress.emit(0, total)
# Gửi danh sách devices cho UI để populate bảng trước khi flash
self.devices_ready.emit(devices)
# Log danh sách thiết bị
for d in devices:
self.log_message.emit(f" 📱 {d['ip']} ({d['mac']})")
self.log_message.emit("")
def _flash_one(dev):
nonlocal success_count, fail_count, done_count
ip = dev["ip"]
mac = dev.get("mac", "N/A")
result = ""
for attempt in range(1, MAX_FLASH_RETRIES + 1):
if self._stop_flag:
result = "FAIL: Dừng bởi người dùng"
break
if attempt > 1:
self.log_message.emit(f"🔄 [{ip}] Retry lần {attempt}/{MAX_FLASH_RETRIES}...")
self.device_status.emit(ip, f"Retry lần {attempt}/{MAX_FLASH_RETRIES}...")
time.sleep(2) # chờ thiết bị ổn định trước khi retry
try:
def on_status(msg):
self.device_status.emit(ip, msg)
if self.method == "ssh":
result = flash_device_new_ssh(
ip, self.firmware_path,
user=self.ssh_user,
password=self.ssh_password,
backup_password=self.ssh_backup_password,
set_passwd=self.set_passwd,
status_cb=on_status,
)
else:
result = flash_device_api(
ip, self.firmware_path,
status_cb=on_status,
)
except Exception as e:
result = f"FAIL: {e}"
if result.startswith("DONE"):
break
else:
if attempt < MAX_FLASH_RETRIES:
self.log_message.emit(f"⚠️ [{ip}] Lần {attempt} thất bại: {result}")
else:
self.log_message.emit(f"❌ [{ip}] Thất bại sau {MAX_FLASH_RETRIES} lần thử: {result}")
self.device_done.emit(ip, mac, result)
if result.startswith("DONE"):
success_count += 1
else:
fail_count += 1
done_count += 1
self.flash_progress.emit(done_count, total)
workers = self.max_workers if self.max_workers > 0 else total
with ThreadPoolExecutor(max_workers=max(workers, 1)) as executor:
futures = [executor.submit(_flash_one, dev) for dev in devices]
for f in futures:
f.result()
if self._stop_flag:
break
self.log_message.emit("")
self.log_message.emit(f"🏁 Hoàn thành! Thành công: {success_count} | Thất bại: {fail_count}")
self.all_done.emit(success_count, fail_count)

View File

@@ -1,224 +1,172 @@
import subprocess
import re import re
import subprocess
import sys import sys
import time
import ipaddress import ipaddress
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
# Windows: prevent subprocess from opening visible console windows # Windows: prevent subprocess from opening visible console windows
_NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0 _NO_WINDOW = subprocess.CREATE_NO_WINDOW if sys.platform == "win32" else 0
# Concurrent ping workers per platform
def _scan_with_scapy(network): _MAX_WORKERS_WIN = 50
"""Scan using scapy (requires root/sudo, and Npcap on Windows).""" _MAX_WORKERS_OTHER = 64
from scapy.all import ARP, Ether, srp
arp = ARP(pdst=str(network))
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
packet = ether / arp
result = srp(packet, timeout=3, verbose=0)[0]
devices = []
for sent, received in result:
devices.append({
"ip": received.psrc,
"mac": received.hwsrc
})
return devices
def _ping_one(ip, is_win): def _ping_one(ip, is_win):
"""Ping a single IP to populate ARP table.""" """Ping a single IP. Returns True if host responds."""
try: try:
if is_win: if is_win:
subprocess.run( # Capture stdout to check for TTL= — more reliable than returncode
["ping", "-n", "1", "-w", "600", str(ip)], # on Windows (returncode can be 0 even for "Destination unreachable")
stdout=subprocess.DEVNULL, r = subprocess.run(
["ping", "-n", "1", "-w", "500", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=3, timeout=3,
creationflags=_NO_WINDOW creationflags=_NO_WINDOW
) )
return r.returncode == 0 and b"TTL=" in r.stdout
elif sys.platform == "darwin":
r = subprocess.run(
["ping", "-c", "1", "-W", "300", str(ip)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
timeout=2
)
else: else:
subprocess.run( r = subprocess.run(
["ping", "-c", "1", "-W", "1", str(ip)], ["ping", "-c", "1", "-W", "1", str(ip)],
stdout=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
timeout=2
)
return r.returncode == 0
except Exception:
return False
def _get_mac_from_arp(ip):
"""
Lấy MAC address của một IP qua ARP table.
Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP
cache vừa được OS cập nhật với thông tin mới nhất (không bị stale).
Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được.
"""
try:
if sys.platform == "win32":
# arp -a <ip> → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic"
r = subprocess.run(
["arp", "-a", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3,
creationflags=_NO_WINDOW
)
output = r.stdout.decode(errors="ignore")
# Dạng Windows: aa-bb-cc-dd-ee-ff
match = re.search(
r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}"
r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})",
output
)
else:
# macOS / Linux: arp -n <ip>
# macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..."
# Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0"
r = subprocess.run(
["arp", "-n", str(ip)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=3 timeout=3
) )
output = r.stdout.decode(errors="ignore")
match = re.search(
r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}"
r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})",
output
)
if match:
# Chuẩn hoá sang dấu ':' và chữ hoa
mac = match.group(1).replace("-", ":").upper()
return mac
except Exception: except Exception:
pass pass
return "N/A"
def _ping_sweep(network, progress_cb=None): def _ping_sweep(network, progress_cb=None):
"""Ping all IPs in network concurrently to populate ARP table. """Ping tất cả host trong network đồng thời.
Calls progress_cb(done, total) after each ping completes if provided. Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
""" """
net = ipaddress.ip_network(network, strict=False) net = ipaddress.ip_network(network, strict=False)
# Only ping sweep for /24 or smaller to avoid flooding
if net.num_addresses > 256: if net.num_addresses > 256:
return return []
is_win = sys.platform == "win32" is_win = sys.platform == "win32"
hosts = list(net.hosts()) hosts = list(net.hosts())
total = len(hosts) total = len(hosts)
workers = min(_MAX_WORKERS_WIN if is_win else _MAX_WORKERS_OTHER, len(hosts))
done_count = [0] done_count = [0]
lock = threading.Lock()
alive = []
def _ping_and_track(ip): def _ping_and_track(ip):
_ping_one(ip, is_win) ok = _ping_one(ip, is_win)
done_count[0] += 1 with lock:
done_count[0] += 1
current = done_count[0]
if progress_cb: if progress_cb:
progress_cb(done_count[0], total) progress_cb(current, total)
return (str(ip), ok)
with ThreadPoolExecutor(max_workers=100) as executor: with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(_ping_and_track, ip) for ip in hosts] futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
for f in as_completed(futures): for f in as_completed(futures):
pass ip_str, ok = f.result()
if ok:
alive.append(ip_str)
return alive
def _scan_with_arp_table(network):
"""Fallback: read ARP table using system 'arp -a' (no root needed).
Supports both macOS and Windows output formats.
"""
# Ping sweep first to populate ARP table with active devices
_ping_sweep(network)
# Brief pause to let the OS finalize ARP cache entries
time.sleep(1)
try:
output = subprocess.check_output(
["arp", "-a"], text=True, creationflags=_NO_WINDOW
)
except Exception:
return []
devices = []
net = ipaddress.ip_network(network, strict=False)
if sys.platform == "win32":
# Windows format:
# 192.168.4.1 cc-2d-21-a5-85-b0 dynamic
pattern = re.compile(
r"(\d+\.\d+\.\d+\.\d+)\s+"
r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-"
r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+"
r"(dynamic|static)",
re.IGNORECASE
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str = m.group(1)
# Convert Windows MAC format (cc-2d-21-a5-85-b0) to standard (cc:2d:21:a5:85:b0)
mac = m.group(2).replace("-", ":")
if mac.upper() != "FF:FF:FF:FF:FF:FF":
try:
if ipaddress.ip_address(ip_str) in net:
devices.append({"ip": ip_str, "mac": mac})
except ValueError:
pass
else:
# macOS/Linux format:
# ? (192.168.1.1) at aa:bb:cc:dd:ee:ff on en0
pattern = re.compile(
r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)"
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str, mac = m.group(1), m.group(2)
if mac.lower() != "(incomplete)" and mac != "ff:ff:ff:ff:ff:ff":
try:
if ipaddress.ip_address(ip_str) in net:
devices.append({"ip": ip_str, "mac": mac})
except ValueError:
pass
return devices
def scan_network(network, progress_cb=None, stage_cb=None): def scan_network(network, progress_cb=None, stage_cb=None):
"""Scan network: ping sweep first, then merge scapy ARP + arp table.""" """Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online).
# Phase 1: Ping sweep — wake up devices and populate ARP cache
Flow:
1. Ping sweep — xác định thiết bị online
2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song)
=> ARP cache vừa được OS cập nhật sau ping, không bị stale.
"""
# ── Stage 1: Ping ──
if stage_cb: if stage_cb:
stage_cb("ping") stage_cb("ping")
_ping_sweep(network, progress_cb) alive_ips = _ping_sweep(network, progress_cb)
time.sleep(1)
# Collect results from both methods and merge by IP # ── Stage 2: MAC lookup ──
seen = {} # ip -> device dict
# Phase 2: ARP table (populated by ping sweep above)
if stage_cb: if stage_cb:
stage_cb("arp") stage_cb("mac")
try:
output = subprocess.check_output(
["arp", "-a"], text=True, creationflags=_NO_WINDOW
)
net = ipaddress.ip_network(network, strict=False)
if sys.platform == "win32":
pattern = re.compile(
r"(\d+\.\d+\.\d+\.\d+)\s+"
r"([0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-"
r"[0-9a-fA-F]{2}-[0-9a-fA-F]{2}-[0-9a-fA-F]{2})\s+"
r"(dynamic|static)",
re.IGNORECASE
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str = m.group(1)
mac = m.group(2).replace("-", ":")
if mac.upper() != "FF:FF:FF:FF:FF:FF":
try:
if ipaddress.ip_address(ip_str) in net:
seen[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
else:
pattern = re.compile(
r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]+)"
)
for line in output.splitlines():
m = pattern.search(line)
if m:
ip_str, mac = m.group(1), m.group(2)
if mac.lower() not in ("(incomplete)", "ff:ff:ff:ff:ff:ff"):
try:
if ipaddress.ip_address(ip_str) in net:
seen[ip_str] = {"ip": ip_str, "mac": mac}
except ValueError:
pass
except Exception:
pass
# Phase 3: scapy ARP scan (if Npcap available) — fills in any gaps results = []
if stage_cb: if alive_ips:
stage_cb("scapy") # Chạy song song để tra MAC nhanh hơn
try: mac_workers = min(32, len(alive_ips))
import io, os macs = {}
_stderr = sys.stderr with ThreadPoolExecutor(max_workers=mac_workers) as executor:
sys.stderr = io.StringIO() future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips}
try: for future in as_completed(future_to_ip):
from scapy.all import ARP, Ether, srp ip = future_to_ip[future]
finally: try:
sys.stderr = _stderr macs[ip] = future.result()
except Exception:
macs[ip] = "N/A"
arp = ARP(pdst=str(network)) for ip_str in alive_ips:
ether = Ether(dst="ff:ff:ff:ff:ff:ff") results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")})
result = srp(ether / arp, timeout=2, verbose=0)[0]
for sent, received in result:
ip = received.psrc
if ip not in seen:
seen[ip] = {"ip": ip, "mac": received.hwsrc}
except Exception:
pass
return sorted(seen.values(), key=lambda d: ipaddress.ip_address(d["ip"])) return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))

271
docs/auto_flash_docs.md Normal file
View File

@@ -0,0 +1,271 @@
# Tài liệu Kỹ thuật: Tự động hóa nạp FW (`core/auto_flash_worker.py`)
Module **Tự động hóa nạp FW** tự động quét mạng LAN, phát hiện thiết bị đích, và nạp firmware hàng loạt mà không cần thao tác thủ công. Được thiết kế cho môi trường sản xuất cần nạp FW nhanh cho nhiều thiết bị OpenWrt cùng lúc.
---
## 1. Kiến Trúc — Vai Trò File
| File | Vai trò |
| --------------------------- | ------------------------------------------------------------------------ |
| `core/auto_flash_worker.py` | `AutoFlashWorker` — QThread xử lý toàn bộ quy trình scan → flash tự động |
| `core/scanner.py` | `scan_network()` — quét mạng LAN (ping sweep + ARP table + Scapy) |
| `core/api_flash.py` | `flash_device_api()` — nạp FW qua LuCI HTTP API |
| `core/ssh_new_flash.py` | `flash_device_new_ssh()` — nạp FW qua SSH (paramiko/scp) |
| `main.py` | `AutoFlashWindow` — UI cửa sổ tự động hóa (PyQt6) |
| `ui/styles.py` | `AUTO_STYLE` — stylesheet riêng cho cửa sổ tự động hóa (tím) |
---
## 2. Sơ Đồ Luồng Tổng Quan
```mermaid
flowchart TD
A[👤 Người dùng nhấn XÁC NHẬN & BẮT ĐẦU] --> B[AutoFlashWorker.run]
B --> P1["── Phase 1: Scan LAN ──"]
P1 --> S1["scan_network(network)"]
S1 --> F1{"Lọc bỏ:\n• Local IP\n• Gateway IP\n• 192.168.11.102"}
F1 --> C1{Đủ số lượng\nthiết bị?}
C1 -->|Có| P2["── Phase 2: Flash ──"]
C1 -->|Chưa đủ| C2{Đã scan\n≥ 15 lần?}
C2 -->|Chưa| W1["Chờ 5 giây\n→ scan lại"]
W1 --> S1
C2 -->|Rồi| T1["⚠️ scan_timeout\nThông báo lỗi"]
P2 --> D1["ThreadPoolExecutor\nNạp FW song song"]
D1 --> D2["_flash_one(device)"]
D2 --> D3{Kết quả?}
D3 -->|DONE| D4["✅ Thành công"]
D3 -->|FAIL| D5{Retry < 3?}
D5 -->|Có| D6["🔄 Chờ 2s → Retry"]
D6 --> D2
D5 -->|Không| D7["❌ Thất bại\nsau 3 lần"]
D4 --> D8["Tổng hợp kết quả"]
D7 --> D8
D8 --> D9["🏁 all_done(success, fail)"]
```
---
## 3. Cấu Hình & Hằng Số
| Hằng số | Giá trị | Mô tả |
| ------------------- | ------- | ------------------------------------------------------ |
| `MAX_FLASH_RETRIES` | 3 | Số lần retry tối đa khi nạp FW thất bại cho 1 thiết bị |
| `MAX_SCAN_ROUNDS` | 15 | Số lần scan LAN tối đa trước khi báo timeout |
| Scan interval | 5 giây | Khoảng cách giữa các lần scan (check stop mỗi 0.5s) |
| Retry delay | 2 giây | Thời gian chờ trước khi retry nạp FW |
---
## 4. Tham Số Khởi Tạo Worker
```python
AutoFlashWorker(
network="192.168.11.0/24", # Dải mạng cần scan
target_count=5, # Số lượng thiết bị cần nạp
method="api", # "api" (LuCI) hoặc "ssh"
max_workers=10, # Số luồng nạp song song (0 = không giới hạn)
firmware_path="/path/fw.bin", # Đường dẫn file firmware
local_ip="192.168.11.50", # IP máy host (sẽ bị loại khỏi scan)
gateway_ip="192.168.11.1", # IP gateway (sẽ bị loại khỏi scan)
ssh_user="root", # SSH username (mặc định: root)
ssh_password="admin123a", # SSH password chính
ssh_backup_password="admin123a", # SSH password dự phòng
set_passwd=True, # Có đặt lại mật khẩu sau flash không
)
```
---
## 5. Signal — Giao Tiếp Worker ↔ UI
| Signal | Kiểu dữ liệu | Khi nào emit |
| ---------------- | --------------- | --------------------------------------------------- |
| `log_message` | `str` | Mỗi dòng log (scan, flash, retry, kết quả) |
| `scan_found` | `int` | Sau mỗi lần scan — số thiết bị tìm thấy |
| `devices_ready` | `list[dict]` | Khi đủ thiết bị — danh sách `{ip, mac}` trước flash |
| `device_status` | `str, str` | Cập nhật trạng thái real-time: `(ip, message)` |
| `device_done` | `str, str, str` | Mỗi thiết bị xong: `(ip, mac, result)` |
| `flash_progress` | `int, int` | Tiến trình: `(done_count, total)` |
| `all_done` | `int, int` | Kết thúc: `(success_count, fail_count)` |
| `scan_timeout` | `int, int` | Scan hết lần: `(best_found, target_count)` |
| `stopped` | — | Khi worker dừng (bởi user hoặc timeout) |
---
## 6. Chi Tiết Quy Trình
### 6.1. Phase 1 — Scan Mạng LAN
```
Lần 1/15 → scan_network("192.168.11.0/24")
→ Lọc bỏ: local_ip, gateway_ip, 192.168.11.102
→ Tìm thấy 3/5 thiết bị → chưa đủ
→ Chờ 5 giây...
Lần 2/15 → scan_network(...)
→ Tìm thấy 5/5 thiết bị → ĐỦ!
→ Chuyển sang Phase 2
```
**Quy tắc lọc IP:**
- `local_ip` — IP của máy host (tránh nạp FW vào chính máy mình)
- `gateway_ip` — IP của router/gateway
- `192.168.11.102` — IP thiết bị đã cài FW sẵn, **chỉ được nạp ở chế độ Update FW**
**Timeout:**
- Sau **15 lần scan** mà chưa đủ thiết bị → emit `scan_timeout`
- UI hiện popup cảnh báo kèm gợi ý kiểm tra:
- Thiết bị đã bật và kết nối mạng chưa
- Dải mạng có đúng không
- Thử lại sau khi kiểm tra
### 6.2. Phase 2 — Nạp FW (có Auto-Retry)
Mỗi thiết bị được nạp trong `ThreadPoolExecutor` (chạy song song):
```
[192.168.11.103] Lần 1 → flash_device_api(...) → FAIL: Connection timeout
⚠️ Lần 1 thất bại
Chờ 2 giây...
[192.168.11.103] Lần 2 → flash_device_api(...) → FAIL: Upload error
⚠️ Lần 2 thất bại
Chờ 2 giây...
[192.168.11.103] Lần 3 → flash_device_api(...) → DONE
✅ Thành công (lần thứ 3)
```
**Quy trình retry:**
1. Thực hiện nạp FW (API hoặc SSH)
2. Nếu kết quả bắt đầu bằng `"DONE"` → thành công, dừng retry
3. Nếu thất bại và còn lần retry → log cảnh báo, chờ 2 giây, thử lại
4. Nếu thất bại sau 3 lần → log lỗi, báo kết quả `FAIL`
---
## 7. Giao Diện Cửa Sổ Tự Động (AutoFlashWindow)
### 7.1. Bố Cục
```
┌──────────────────────────────────────────┐
│ 🤖 Tự động hóa nạp FW │
├──────────────────────────────────────────┤
│ ⚙️ Cấu hình nạp (thu gọn được) │
│ FW: V3.0.6p5.bin │ Mạng: .11.0/24 │
│ Số lượng: 5 │ API (LuCI) │ Song song:10│
├──────────────────────────────────────────┤
│ [▶ XÁC NHẬN & BẮT ĐẦU] [■ DỪNG] │
│ 🔍 Đang scan: 3/5 thiết bị... ████░░ │
├──────────────────────────────────────────┤
│ 📋 Danh sách thiết bị │
│ ┌───┬──────────────┬────────────┬──────┐ │
│ │ # │ IP │ MAC │Kết quả│ │
│ ├───┼──────────────┼────────────┼──────┤ │
│ │ 1 │192.168.11.103│ AA:BB:CC.. │✅DONE│ │
│ │ 2 │192.168.11.104│ DD:EE:FF.. │⏳... │ │
│ │ 3 │192.168.11.105│ 11:22:33.. │🔄 R2 │ │
│ └───┴──────────────┴────────────┴──────┘ │
│ Tổng: 5 | Xong: 3 | ✅ 2 | ❌ 1 [📋Lịch sử]│
├──────────────────────────────────────────┤
│ 📝 Log (thu gọn được) │
│ 🚀 Bắt đầu chế độ tự động hóa nạp FW...│
│ 🔍 Scan lần 1/15... │
│ Tìm thấy 5/5 thiết bị │
│ ✅ Đủ 5 thiết bị! Bắt đầu nạp FW... │
│ ⚠️ [192.168.11.105] Lần 1 thất bại... │
│ 🔄 [192.168.11.105] Retry lần 2/3... │
└──────────────────────────────────────────┘
```
### 7.2. Các Thành Phần UI
| Thành phần | Mô tả |
| ------------------------- | ---------------------------------------------------------- |
| **Cấu hình nạp** | CollapsibleGroupBox — chọn FW, mạng, số lượng, phương thức |
| **Nút điều khiển** | XÁC NHẬN & BẮT ĐẦU / DỪNG — enable/disable theo trạng thái |
| **Trạng thái + Progress** | Hiển thị inline trạng thái scan/flash + progress bar |
| **Bảng thiết bị** | 4 cột: #, IP, MAC, Kết quả — cập nhật real-time |
| **Tổng hợp + Lịch sử** | Bộ đếm ✅/❌ + nút "📋 Lịch sử nạp" xem chi tiết |
| **Log** | CollapsibleGroupBox — log chi tiết toàn bộ quá trình |
---
## 8. Lịch Sử Nạp (Flash History)
Kết quả nạp được lưu ở **2 nơi** với cùng format:
| Nơi lưu | Phạm vi | Dữ liệu |
| ------------------------------- | ---------------------- | ------------------------------------ |
| `AutoFlashWindow._auto_history` | Phiên tự động hiện tại | `list[(ip, mac, result, timestamp)]` |
| `App.flashed_macs` | Toàn bộ session app | `dict{MAC: (ip, mac, result, ts)}` |
- Cả thành công ✅ lẫn thất bại ❌ đều được ghi lại
- Nút "📋 Lịch sử nạp" hiển thị danh sách với format: `[HH:MM:SS] ✅/❌ IP (MAC) — result`
- Lịch sử `_auto_history` reset mỗi khi nhấn "XÁC NHẬN & BẮT ĐẦU" lần mới
- Lịch sử `flashed_macs` tồn tại suốt phiên chạy app (cả manual và auto)
---
## 9. Quy Tắc Bảo Vệ IP `192.168.11.102`
| Chế độ | Được nạp 192.168.11.102? | Cơ chế |
| ------------------------ | :----------------------: | --------------------------------------- |
| **New Flash (thủ công)** | ❌ Không | Kiểm tra trước khi flash, hiện cảnh báo |
| **Update FW (thủ công)** | ✅ Có | Cho phép bình thường |
| **Tự động hóa** | ❌ Không | Lọc khỏi kết quả scan tự động |
---
## 10. Xử Lý Lỗi Tổng Hợp
| Tình huống | Hành vi |
| ------------------------------ | --------------------------------------------------- |
| Scan exception (network error) | Log lỗi, chờ 3s, scan lại (đếm vào MAX_SCAN_ROUNDS) |
| Scan 15 lần chưa đủ thiết bị | Emit `scan_timeout`, hiện popup cảnh báo, dừng |
| Flash thất bại lần 1-2 | Log cảnh báo, chờ 2s, retry tự động |
| Flash thất bại sau 3 lần retry | Log lỗi, đánh dấu ❌, tiếp tục device tiếp theo |
| User nhấn DỪNG | Set `_stop_flag`, dừng scan/flash, emit `stopped` |
| Chưa chọn firmware | Hiện popup cảnh báo, không cho bắt đầu |
| Mạng không hợp lệ | Hiện popup cảnh báo, không cho bắt đầu |
---
## 11. Hướng Dẫn Sử Dụng
### Bước 1: Mở tính năng
Nhấn nút **"🤖 Tự động hóa nạp FW"** ở cuối cửa sổ chính.
### Bước 2: Cấu hình
1. **Chọn firmware** — nhấn 📁 hoặc tự động lấy từ cửa sổ chính
2. **Dải mạng** — mặc định lấy từ IP máy host (ví dụ: `192.168.11.0/24`)
3. **Số lượng** — số thiết bị cần nạp (1500)
4. **Phương thức** — API (LuCI) hoặc SSH
5. **Song song** — số thiết bị nạp cùng lúc (0 = tất cả cùng lúc)
### Bước 3: Bắt đầu
Nhấn **"▶ XÁC NHẬN & BẮT ĐẦU"** → xác nhận popup → hệ thống tự động:
1. Scan mạng liên tục cho đến khi đủ thiết bị (tối đa 15 lần)
2. Nạp FW song song cho tất cả thiết bị tìm được
3. Tự động retry nếu thiết bị nào bị lỗi (tối đa 3 lần)
4. Hiện thông báo tổng hợp khi hoàn thành
### Bước 4: Theo dõi
- **Bảng thiết bị** — trạng thái real-time từng thiết bị
- **Log** — chi tiết quá trình scan, flash, retry
- **Lịch sử nạp** — nhấn 📋 để xem danh sách đã nạp
### Dừng giữa chừng
Nhấn **"■ DỪNG"** — worker sẽ dừng an toàn sau khi hoàn thành device đang xử lý.

View File

@@ -1,59 +1,120 @@
# Tài liệu Kỹ thuật: Cơ chế Quét IP trên Mạng (Network Scanner) # Tài liệu Kỹ thuật: Cơ chế Quét IP trên Mạng (Network Scanner)
## 1. Tổng quan ## 1. Tổng quan
Thành phần quét IP (IP Scanner) trong file `scanner.py` được thiết kế để dò tìm và liệt kê tất cả các thiết bị đang hoạt động trên một dải mạng (ví dụ: `192.168.1.0/24`). Nó theo dõi và trả về danh sách các đối tượng chứa địa chỉ **IP****MAC Address** của từng thiết bị.
Để đảm bảo tỷ lệ phát hiện cao nhất trên các hệ điều hành khác nhau (Windows, macOS, Linux), script sử dụng kết hợp hai phương pháp dò tìm: Thành phần quét IP trong `scanner.py` dò tìm và liệt kê tất cả thiết bị **đang thực sự online** trên một dải mạng (ví dụ: `192.168.1.0/24`), trả về danh sách chứa **IP****MAC Address** của từng thiết bị.
1. **Quét mồi bằng Ping (Ping Sweep) kết hợp đọc bảng ARP tĩnh của hệ điều hành.**
2. **Quét ARP trực tiếp ở Tầng 2 (Layer 2) bằng thư viện `scapy`.** Scanner hoạt động theo cơ chế 2 bước (Ping trước → ARP sau):
1. **ICMP Ping Sweep:** Chỉ những IP có phản hồi ping mới được ghi nhận là online.
2. **ARP Lookup:** Ngay sau khi IP phản hồi ping, OS tự động cập nhật ARP cache cho IP đó. Scanner lập tức query ARP table để lấy MAC chính xác nhất.
> **Giải quyết vấn đề ARP cache cũ (stale entries):** Khác với phương pháp ARP scan truyền thống (`arp -a` liệt kê toàn mạng) thường dính thiết bị đã ngắt kết nối do bị cache, cách làm này **chỉ query ARP cho những IP vừa pass qua bài test ping**. Thiết bị offline sẽ rớt ở bước ping và không bao giờ được đưa vào danh sách.
--- ---
## 2. Luồng hoạt động chính (Hàm `scan_network`) ## 2. Luồng hoạt động chính (Hàm `scan_network`)
Đây là hàm được gọi trực tiếp khi muốn quét một mạng. Trình tự rà quét diễn ra qua các bước sau: ```
scan_network(network)
├── stage_cb("ping") ← Thông báo UI bắt đầu quét
├── _ping_sweep(network) ← Ping đồng thời toàn bộ host
│ │
│ ├── _ping_one(ip) × N ← Mỗi host 1 thread
│ │
│ └── return [alive IPs]
├── stage_cb("mac") ← Thông báo UI bắt đầu lấy MAC
├── _get_mac_from_arp(ip) × K ← Tra MAC song song cho K IP alive
└── return [{"ip": ..., "mac": "AA:BB:CC:..."}, ...] ← Sorted by IP
```
**Bước 1: "Đánh thức" các thiết bị (Ping Sweep)** **Bước 1Ping Sweep**
- Hệ thống gọi hàm `_ping_sweep(network)` để gửi gói Ping (ICMP Echo Request) đồng loạt tới tất cả các IP có thể có trong mạng.
- Mục đích của bước này là buộc các thiết bị phản hồi, từ đó hệ điều hành của máy đang chạy lệnh sẽ **tự động ghi nhận địa chỉ MAC** của các thiết bị đó vào bộ nhớ đệm ARP (ARP Cache).
- Hệ thống tạm dừng 1 giây để đảm bảo hệ điều hành kịp lưu thông tin vào ARP Cache.
**Bước 2: Lấy dữ liệu từ bảng ARP của Hệ điều hành (Method 1)** - Gọi `_ping_sweep(network)`: gửi ICMP Echo Request đồng thời tới **toàn bộ host** trong dải mạng.
- Thực thi lệnh hệ thống `arp -a` để đọc ARP Cache. - Chỉ các IP có `returncode == 0` (phản hồi thành công) mới được đưa vào danh sách `alive_ips`.
- Kết quả được phân tích cú pháp (Regex) để trích xuất IP và MAC tương ứng, tương thích linh hoạt với cả đầu ra của Windows lẫn macOS/Linux.
- Các thiết bị đọc được lưu vào danh sách nháp (biến `seen`).
**Bước 3: Quét sâu với Scapy (Method 2 - Dự phòng/Bổ sung)** **Bước 2 — Địa chỉ MAC (ARP Lookup)**
- Script gọi thêm thư viện `scapy` để phát một thông điệp "ARP Who-has" tới địa chỉ MAC Broadcast (`ff:ff:ff:ff:ff:ff`).
- Phương pháp này giúp phát hiện ra các thiết bị chặn Ping (chặn ICMP) ở Bước 1 nhưng buộc phải phản hồi gói ARP ở tầng Data Link.
- Những thiết bị mới tìm thấy (nếu chưa có trong danh sách `seen` ở Bước 2) sẽ được bổ sung vào danh sách.
**Bước 4: Trả về kết quả** - Từ danh sách `alive_ips`, tạo ThreadPoolExecutor (max_workers=32) để gọi `_get_mac_from_arp(ip)` đồng thời.
- Danh sách các thiết bị cuối cùng được sắp xếp từ nhỏ đến lớn dựa trên địa chỉ IP và trả về dưới dạng: - Trích xuất MAC address bằng pattern matching chéo nền tảng.
`[{"ip": "192.168.1.2", "mac": "aa:bb:cc:dd:ee:ff"}, ...]`
**Bước 3 — Trả về kết quả**
- Danh sách sort tăng dần theo IP:
`[{"ip": "192.168.1.2", "mac": "AA:BB:CC:DD:EE:FF"}, ...]`
--- ---
## 3. Phân tích chi tiết các hàm hỗ trợ ## 3. Phân tích chi tiết các hàm
### `_ping_sweep(network)` \& `_ping_one(ip, is_win)` ### `_ping_one(ip, is_win)`
- **Nhiệm vụ:** Quét Ping hàng loạt (Ping Sweep).
- **Cách thức hoạt động:** Sử dụng `ThreadPoolExecutor` để chạy **tối đa 100 luồng (threads) song song**. Điều này giúp việc gửi Ping hàng loạt diễn ra cực kì nhanh chóng tính bằng giây thay vì phải đợi ping từng IP một.
- **Biện pháp an toàn:** Script có cơ chế tự bảo vệ chặn flood mạng: nó sẽ **từ chối chạy Ping Sweep** nếu dải mạng (subnet) lớn hơn 256 địa chỉ IP (tức là chỉ chạy cho dải mạng từ `/24` trở xuống).
### `_scan_with_arp_table(network)` Ping một IP đơn lẻ, trả về `True` nếu host phản hồi, `False` nếu không.
- **Nhiệm vụ:** Hàm chạy độc lập để quét tìm thiết bị mà **không cần thông qua đặc quyền Root/Administrator** (fallback method).
- **Hỗ trợ Đa nền tảng:**
- **Trên Windows (`win32`):** Chuẩn hóa định dạng chuẩn MAC từ gạch ngang sang dấu hai chấm (vd: từ `cc-2d-21...` sang `cc:2d:21...`). Nó dựa vào Regex nhận diện các dòng chuẩn xuất ra có chữ `dynamic` hoặc `static`.
- **Trên macOS / Linux:** Dùng Regex đọc định dạng chuẩn riêng biệt ví dụ: `? (192.168.1.1) at aa:bb:cc:dd:ee:ff on en0`. Loại bỏ những địa chỉ lỗi bị đánh dấu là `(incomplete)`.
### `_scan_with_scapy(network)` Timeout tối ưu theo nền tảng:
- **Nhiệm vụ:** Công cụ quét cực mạnh ở Tầng 2 (Layer 2) của mô hình mạng OSI.
- **Đặc thù:** Đòi hỏi người dùng phải cấp quyền Sudo/Root trên Linux/macOS hoặc phải cài đặt thư viện phần mềm **Npcap/WinPcap** trên Windows thì mới có thể sử dụng. Gửi gói `srp` bằng `scapy` với timeout là 3 giây để lấy về thông tin phần cứng đích thực. | OS | Lệnh | Timeout wait | Timeout process |
| ------- | ------------------ | ------------ | --------------- |
| Windows | `ping -n 1 -w 300` | 300ms | 2s |
| macOS | `ping -c 1 -W 300` | 300ms | 2s |
| Linux | `ping -c 1 -W 1` | 1s | 2s |
> macOS và Linux dùng cùng flag `-W` nhưng đơn vị khác nhau (macOS: ms, Linux: giây) — được xử lý tách biệt theo `sys.platform`.
Windows sử dụng `CREATE_NO_WINDOW` flag để tránh mở cửa sổ console cho mỗi subprocess.
### `_ping_sweep(network, progress_cb)`
- Tạo `ThreadPoolExecutor(max_workers=len(hosts))`**toàn bộ host ping đồng thời**, không batching.
- Giới hạn an toàn: chỉ chạy với subnet `/24` trở xuống (`num_addresses <= 256`).
- Trả về danh sách IP (string) đã phản hồi thành công.
- Gọi `progress_cb(done, total)` sau mỗi ping để UI cập nhật thanh tiến độ.
### `_get_mac_from_arp(ip)`
- Gọi lệnh hệ điều hành để đọc ARP cache cho IP cụ thể:
- **Windows**: `arp -a <ip>`
- **macOS / Linux**: `arp -n <ip>`
- Sử dụng Regex để parse MAC address và trả về dưới format chuẩn `AA:BB:CC:DD:EE:FF`.
- Nếu không tìm thấy, fallback thành `"N/A"`.
### `scan_network(network, progress_cb, stage_cb)`
- Entry point chính.
- `stage_cb("ping")``stage_cb("mac")` thông báo UI giai đoạn hiện tại.
- Trả về `list[dict]` với format `{"ip": str, "mac": str}`, sorted theo IP tăng dần.
--- ---
## 4. Tóm tắt Ưu & Nhược điểm của thiết kế này ## 4. Hiệu năng
- **Ưu điểm:** Khả năng dò tìm diện rộng rất cao vì sự kết hợp song song của hai phương pháp. Nếu thiết bị không có quyền Root/Sudo để chạy `scapy`, nó vẫn có thể tìm được ít nhất ~90% thiết bị trong mạng nhờ tính năng Ping Sweeping mạnh mẽ. Tính tương thích chéo OS (Windows/Mac/Linux) được xử lý rất tốt và gọn gàng qua Regex. | Thông số | Giá trị |
- **Nhược điểm:** Tốn thêm 1 giây (hàm `time.sleep(1)`) và thêm vài giây timeout tổng cộng, do cần chờ để làm đầy bộ nhớ đệm ARP trước khi quét sâu. Nếu thiết bị đích cố tình tắt phản hồi ARP, thì vẫn có khả năng bị lọt dán mạng. | --------------------------- | ------------------------------------ |
| Ping workers | `len(hosts)` (~254, tất cả cùng lúc) |
| Ping timeout — Windows | 300ms |
| Ping timeout — macOS | 300ms |
| Ping timeout — Linux | 1s |
| Process timeout | 2s |
| **Tổng thời gian scan /24** | **~12s** |
---
## 5. Ưu & Nhược điểm
**Ưu điểm:**
- **Có đầy đủ IP và MAC**, rất hữu ích cho log và tracking lịch sử thiết bị nạp FW.
- **Không bị dính ARP cache cũ**: Do chỉ lấy MAC của các IP vừa ping thành công.
- Không cần quyền Admin/Root.
- Không phụ thuộc thư viện bên ngoài (không cần Npcap / Scapy phức tạp).
- Tương thích đa nền tảng (Windows/macOS/Linux).
- Cực nhanh nhờ cơ chế full-parallel.
**Nhược điểm:**
- Thiết bị cố tình chặn gói tin ICMP (tắt ping) sẽ không bị phát hiện.
- Ping 254 thiết bị cùng lúc bằng tiến trình con (`subprocess`) trên Windows có overhead hệ thống cao hơn Unix.

1151
main.py

File diff suppressed because it is too large Load Diff

View File

@@ -278,3 +278,202 @@ QCheckBox::indicator:hover {
border-color: #7eb8f7; border-color: #7eb8f7;
} }
""" """
AUTO_STYLE = """
QWidget {
background-color: #1a1b2e;
color: #e2e8f0;
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
font-size: 12px;
}
QGroupBox {
border: 1px solid #2d3748;
border-radius: 8px;
margin-top: 10px;
padding: 20px 8px 6px 8px;
font-weight: bold;
color: #c4b5fd;
background-color: #1e2035;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 14px;
top: 5px;
padding: 0px 8px;
background-color: transparent;
}
QLabel {
background-color: transparent;
}
QLabel#title {
font-size: 16px;
font-weight: bold;
color: #c4b5fd;
letter-spacing: 1px;
}
QPushButton {
background-color: #2d3352;
border: 1px solid #3d4a6b;
border-radius: 6px;
padding: 4px 12px;
color: #e2e8f0;
font-weight: 600;
min-height: 24px;
}
QPushButton:hover {
background-color: #3d4a6b;
border-color: #c4b5fd;
color: #ffffff;
}
QPushButton#start_btn {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #7c3aed, stop:1 #a78bfa);
border-color: #7c3aed;
color: #ffffff;
font-size: 14px;
font-weight: bold;
letter-spacing: 1px;
}
QPushButton#start_btn:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #8b5cf6, stop:1 #c4b5fd);
}
QPushButton#start_btn:disabled {
background: #3d3d5c;
color: #6b7280;
}
QPushButton#stop_btn {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #dc2626, stop:1 #ef4444);
border-color: #dc2626;
color: #ffffff;
font-size: 14px;
font-weight: bold;
}
QPushButton#stop_btn:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #ef4444, stop:1 #f87171);
}
QPushButton#stop_btn:disabled {
background: #3d3d5c;
color: #6b7280;
}
QLineEdit {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 8px;
padding: 7px 12px;
color: #e2e8f0;
}
QLineEdit:focus {
border-color: #7c3aed;
background-color: #161727;
}
QComboBox {
background-color: #1e1e2e;
border: 1px solid #3d4a6b;
border-radius: 4px;
padding: 4px 10px;
color: #ffffff;
font-size: 13px;
font-weight: bold;
}
QSpinBox {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 4px;
padding: 4px 8px;
color: #e2e8f0;
}
QTableWidget {
background-color: #13141f;
alternate-background-color: #1a1b2e;
border: 1px solid #2d3748;
border-radius: 8px;
gridline-color: #2d3748;
selection-background-color: #2d3a5a;
selection-color: #e2e8f0;
}
QTableWidget::item {
padding: 2px 6px;
border: none;
}
QHeaderView::section {
background-color: #1e2035;
color: #c4b5fd;
border: none;
border-bottom: 2px solid #7c3aed;
border-right: 1px solid #2d3748;
padding: 4px 6px;
font-weight: bold;
font-size: 11px;
}
QHeaderView::section:last {
border-right: none;
}
QProgressBar {
border: 1px solid #2d3748;
border-radius: 6px;
text-align: center;
background-color: #13141f;
color: #e2e8f0;
height: 20px;
font-size: 11px;
font-weight: 600;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #7c3aed, stop:1 #a78bfa);
border-radius: 5px;
}
QScrollBar:vertical {
background: #13141f;
width: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:vertical {
background: #3d4a6b;
border-radius: 5px;
min-height: 30px;
}
QScrollBar::handle:vertical:hover {
background: #c4b5fd;
}
QScrollBar::add-line:vertical,
QScrollBar::sub-line:vertical {
height: 0px;
}
QScrollBar::add-page:vertical,
QScrollBar::sub-page:vertical {
background: transparent;
}
"""

View File

@@ -1 +1 @@
1.1.2 1.2.3