11 Commits

27 changed files with 2105 additions and 1066 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.4 KiB

Binary file not shown.

View File

@@ -8,11 +8,20 @@ Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng lo
## 📁 Cấu trúc dự án
```
```text
iot_fw_loader/
├── main.py # UI chính (PyQt6) + Điều phối luồng và xử lý đa luồng
├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
├── flasher.py # Upload firmware và tự động hóa qua giao diện OpenWrt LuCI
├── main.py # UI chính (PyQt6)
├── core/ # Các thành phần cốt lõi và xử lý đa luồng
│ ├── workers.py # Quản lý luồng dùng chung (ScanThread, FlashThread)
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
│ ├── flasher.py # Flash firmware và tự động hóa qua OpenWrt LuCI bằng API
│ └── ssh_flasher.py # Load/Update firmware qua đường dẫn SSH
├── ui/ # Các component thiết kế giao diện
│ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox, etc.)
│ └── styles.py # Các cấu hình Stylesheet
├── utils/ # Các hàm helper tiện ích
│ ├── network.py # Các hàm xử lý IP, Hostname
│ └── system.py # Các hàm lấy thông tin máy và resources
├── run.sh # Script khởi chạy (macOS/Linux)
├── run.bat # Script khởi chạy (Windows)
├── build_windows.bat # Script đóng gói thành file .exe độc lập (Windows)
@@ -27,7 +36,7 @@ iot_fw_loader/
- Python 3.9+
- Các thư viện: `PyQt6`, `scapy`, `requests`, `pyinstaller` (để build).
- *Trên Windows:* Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
- _Trên Windows:_ Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
### Khởi chạy nhanh (Môi trường Dev)
@@ -46,6 +55,7 @@ run.bat
> Script tự tạo `venv` và cài dependencies nếu chưa có.
### 📦 Build ra file chạy độc lập (.exe) cho Windows
Chạy script sau để tự động đóng gói ứng dụng thành 1 file `.exe` duy nhất (không cần cài Python trên máy đích):
```bat
@@ -58,7 +68,7 @@ File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`.
## 🏗 Kiến trúc hệ thống
```
```text
┌─────────────────────────────────────────────────────────────┐
│ main.py (UI) │
│ ┌───────────┐ ┌───────────┐ ┌────────────────────────────┐ │
@@ -99,7 +109,7 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
1. **Ping Sweep:** Gửi gói tin Ping song song (tối đa 50 threads) để đánh thức thiết bị và điền IP/MAC vào bảng ARP Cache của hệ điều hành.
2. **ARP Table Fallback:** Đọc bảng ARP nội bộ của OS (`arp -a`) bằng Regex. Hoạt động đa nền tảng (Windows/macOS/Linux) mà không cần quyền Admin/Root.
3. **Scapy ARP (Tính năng nâng cao):** Gửi các gói tin ARP Broadcast trực tiếp để đảm bảo bao phủ gap nếu có. Yêu cầu quyền Root trên Linux/macOS hoặc Npcap trên Windows.
*Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị.*
_Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị._
### 2. Giao diện thiết bị (Device Table)
@@ -110,11 +120,11 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
Từ phiên bản hiện tại, phương thức ESP32 OTA không còn được áp dụng, thay vào đó module `flasher.py` tự động hóa quá trình update của router **OpenWrt (Barrier Breaker 14.07)**:
* **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
* **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
* **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
* **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
* 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
- **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
- **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
- **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
- **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
- 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
---

View File

@@ -11,6 +11,10 @@ echo.
cd /d "%~dp0"
REM Doc version tu file
set /p APP_VERSION=<version.txt
echo Building version: v%APP_VERSION%
REM 1. Tao venv neu chua co
if not exist "venv" (
echo [1/4] Creating virtual environment...
@@ -22,14 +26,15 @@ call venv\Scripts\activate.bat
REM 3. Cai dependencies + PyInstaller
echo [2/4] Installing dependencies...
pip install PyQt6 scapy requests pyinstaller --quiet
pip install -r requirements.txt pyinstaller --quiet
REM 4. Build .exe
echo [3/4] Building executable...
pyinstaller ^
--name "MiraV3_Firmware_Loader" ^
--name "Mira_Firmware_Loader_v%APP_VERSION%" ^
--icon "icon.ico" ^
--add-data "icon.ico;." ^
--add-data "version.txt;." ^
--onefile ^
--windowed ^
--noconfirm ^
@@ -47,11 +52,11 @@ echo.
echo [4/4] Build complete!
echo.
if exist "dist\MiraV3_Firmware_Loader.exe" (
echo SUCCESS: dist\MiraV3_Firmware_Loader.exe
if exist "dist\Mira_Firmware_Loader_v%APP_VERSION%.exe" (
echo SUCCESS: dist\Mira_Firmware_Loader_v%APP_VERSION%.exe
echo.
echo File size:
for %%A in ("dist\MiraV3_Firmware_Loader.exe") do echo %%~zA bytes
for %%A in ("dist\Mira_Firmware_Loader_v%APP_VERSION%.exe") do echo %%~zA bytes
echo.
echo Ban co the copy file .exe nay sang may khac va chay truc tiep.
) else (

141
core/api_flash.py Normal file
View File

@@ -0,0 +1,141 @@
"""
LuCI HTTP Firmware Flasher for OpenWrt devices.
Tự động hoá 3 bước flash qua web interface LuCI:
1. Login → lấy sysauth cookie + stok token
2. Upload firmware.bin (multipart)
3. Confirm (Proceed) → thiết bị flash và reboot
Tương thích:
- Barrier Breaker 14.07 (field: username/password)
- OpenWrt mới hơn (field: luci_username/luci_password)
"""
import re
import time
import os
from typing import Optional
import requests
def _extract_stok(text: str) -> Optional[str]:
"""Tách stok token từ URL hoặc body HTML. Trả về None nếu không tìm thấy."""
m = re.search(r";stok=([a-f0-9]+)", text)
return m.group(1) if m else None
def flash_device_api(ip, firmware_path, username="root", password="",
keep_settings=False, status_cb=None):
"""
Flash firmware lên thiết bị OpenWrt qua LuCI HTTP.
Args:
ip : địa chỉ IP thiết bị
firmware_path : đường dẫn file .bin trên máy tính
username : LuCI username (mặc định "root")
password : LuCI password (mặc định rỗng — Barrier Breaker không có pass)
keep_settings : True = giữ cấu hình cũ sau flash
status_cb : callback(str) cập nhật tiến độ lên UI
Returns:
"DONE" — flash thành công
"FAIL: …" — thất bại, kèm lý do
"""
base_url = f"http://{ip}"
login_url = f"{base_url}/cgi-bin/luci"
session = requests.Session()
try:
# ── STEP 1: Login ────────────────────────────────────────────
if status_cb:
status_cb("Logging in...")
# Phát hiện field name tương thích Barrier Breaker vs OpenWrt mới
try:
page_html = session.get(login_url, timeout=10).text
except Exception:
page_html = ""
if 'name="luci_username"' in page_html:
login_data = {"luci_username": username, "luci_password": password}
else:
login_data = {"username": username, "password": password}
resp = session.post(login_url, data=login_data,
timeout=10, allow_redirects=True)
if resp.status_code == 403:
return "FAIL: Login denied (403)"
# Lấy stok từ URL → body → redirect history
stok = (_extract_stok(resp.url)
or _extract_stok(resp.text)
or next(
(_extract_stok(h.headers.get("Location", ""))
for h in resp.history
if _extract_stok(h.headers.get("Location", ""))),
None
))
if not stok and "sysauth" not in str(session.cookies):
return "FAIL: Login failed — no session"
flash_url = (
f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops"
if stok else
f"{base_url}/cgi-bin/luci/admin/system/flashops"
)
# ── STEP 2: Upload Firmware ──────────────────────────────────
if status_cb:
status_cb("Uploading firmware...")
filename = os.path.basename(firmware_path)
extra = {"keep": "on"} if keep_settings else {}
with open(firmware_path, "rb") as f:
resp = session.post(
flash_url,
data=extra,
files={"image": (filename, f, "application/octet-stream")},
timeout=300,
)
if resp.status_code != 200:
return f"FAIL: Upload HTTP {resp.status_code}"
body = resp.text.lower()
if "invalid image" in body or "bad image" in body:
return "FAIL: Invalid firmware image"
if "unsupported" in body or "not compatible" in body:
return "FAIL: Firmware not compatible"
if "verify" not in body or "proceed" not in body:
return ("FAIL: Upload ignored by server"
if 'name="image"' in resp.text
else "FAIL: Unexpected response after upload")
# ── STEP 3: Confirm (Proceed) ────────────────────────────────
if status_cb:
status_cb("Confirming (Proceed)...")
confirm = {"step": "2", "keep": "on" if keep_settings else ""}
try:
session.post(flash_url, data=confirm, timeout=300)
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
pass # Đứt kết nối khi reboot = bình thường
if status_cb:
status_cb("Rebooting...")
time.sleep(3)
return "DONE"
except requests.ConnectionError:
return "FAIL: Cannot connect"
except requests.Timeout:
return "DONE (rebooting)"
except Exception as e:
return f"FAIL: {e}"
finally:
session.close()

79
core/flash_new_worker.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Worker thread cho chế độ "Nạp Mới FW" (New Flash / Factory Reset).
Hỗ trợ 2 method:
- "api" : Flash qua LuCI HTTP (core/api_flash.py)
- "ssh" : Flash qua SSH/SCP (core/ssh_new_flash.py), có tuỳ chọn set_passwd
"""
from PyQt6.QtCore import QThread, pyqtSignal
from concurrent.futures import ThreadPoolExecutor
from core.api_flash import flash_device_api
from core.ssh_new_flash import flash_device_new_ssh
class NewFlashThread(QThread):
"""Flash firmware lên thiết bị OpenWrt vừa reset / chưa có pass."""
device_status = pyqtSignal(int, str) # index, status message
device_done = pyqtSignal(int, str) # index, result ("DONE" / "FAIL: ...")
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10,
method="api",
ssh_user="root", ssh_password="admin123a",
ssh_backup_password="", set_passwd=True):
"""
Args:
devices : list[dict] — [{ip, mac, ...}, ...]
firmware_path : str — đường dẫn file firmware
max_workers : int — số thiết bị flash song song (0 = unlimited)
method : "api"|"ssh"
ssh_user : str — SSH username (chỉ dùng với method="ssh")
ssh_password : str — SSH password mới sẽ đặt / đăng nhập
ssh_backup_password : str — password dự phòng nếu login lần đầu thất bại
set_passwd : bool — True = gọi passwd trước khi flash (thiết bị vừa reset)
"""
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
self.method = method
self.ssh_user = ssh_user
self.ssh_password = ssh_password
self.ssh_backup_password = ssh_backup_password
self.set_passwd = set_passwd
def run(self):
def _flash_one(i, dev):
try:
def on_status(msg):
self.device_status.emit(i, msg)
if self.method == "ssh":
result = flash_device_new_ssh(
dev["ip"], self.firmware_path,
user=self.ssh_user,
password=self.ssh_password,
backup_password=self.ssh_backup_password,
set_passwd=self.set_passwd,
status_cb=on_status,
)
else:
result = flash_device_api(
dev["ip"], self.firmware_path,
status_cb=on_status,
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
with ThreadPoolExecutor(max_workers=max(workers, 1)) as executor:
futures = [executor.submit(_flash_one, i, dev)
for i, dev in enumerate(self.devices)]
for f in futures:
f.result()
self.all_done.emit()

View File

@@ -0,0 +1,65 @@
"""
Worker thread cho chế độ "Update Firmware" (thiết bị đã cài sẵn OpenWrt).
Đặc điểm:
- Chỉ dùng SSH (không có LuCI API).
- Credentials cố định: root / admin123a (backup: admin).
- Không thực hiện bước đặt lại mật khẩu (set_passwd=False) vì thiết bị
đã có SSH đang chạy với pass đã biết.
"""
from PyQt6.QtCore import QThread, pyqtSignal
from concurrent.futures import ThreadPoolExecutor
from core.ssh_update_flash import flash_device_update_ssh
# Credentials mặc định cho Update Mode
_UPDATE_SSH_USER = "root"
_UPDATE_SSH_PASSWORD = "admin123a"
_UPDATE_SSH_BACKUP = "admin"
class UpdateFlashThread(QThread):
"""Cập nhật firmware lên thiết bị OpenWrt đang chạy qua SSH / sysupgrade."""
device_status = pyqtSignal(int, str) # index, status message
device_done = pyqtSignal(int, str) # index, result ("DONE" / "FAIL: ...")
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10):
"""
Args:
devices : list[dict] — [{ip, mac, ...}, ...]
firmware_path : str — đường dẫn file firmware
max_workers : int — số thiết bị update song song (0 = unlimited)
"""
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
def run(self):
def _update_one(i, dev):
try:
def on_status(msg):
self.device_status.emit(i, msg)
result = flash_device_update_ssh(
dev["ip"], self.firmware_path,
user=_UPDATE_SSH_USER,
password=_UPDATE_SSH_PASSWORD,
backup_password=_UPDATE_SSH_BACKUP,
status_cb=on_status,
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
with ThreadPoolExecutor(max_workers=max(workers, 1)) as executor:
futures = [executor.submit(_update_one, i, dev)
for i, dev in enumerate(self.devices)]
for f in futures:
f.result()
self.all_done.emit()

View File

@@ -74,7 +74,7 @@ def _ping_sweep(network, progress_cb=None):
if progress_cb:
progress_cb(done_count[0], total)
with ThreadPoolExecutor(max_workers=50) as executor:
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
for f in as_completed(futures):
pass

313
core/ssh_flasher.py Normal file
View File

@@ -0,0 +1,313 @@
"""
SSH-based Firmware Flasher for OpenWrt Devices
Replicates the flash.ps1 logic using Python paramiko/scp:
1. Auto-accept SSH host key
2. Set device password via `passwd` command
3. Upload firmware via SCP to /tmp/
4. Verify, sync, and flash via sysupgrade
"""
import os
import time
import paramiko
from scp import SCPClient
import random
def _create_ssh_client(ip, user, password, timeout=15):
"""Create an SSH client with auto-accept host key policy, including retries."""
for attempt in range(3):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=user, password=password, timeout=timeout,
look_for_keys=False, allow_agent=False)
return client
except Exception as e:
if attempt == 2:
raise e
time.sleep(1)
import socket
class _SimpleTelnet:
"""Minimal Telnet client using raw sockets (replaces telnetlib removed in Python 3.13+)."""
def __init__(self, host, port=23, timeout=10):
self._sock = socket.create_connection((host, port), timeout=timeout)
def read_very_eager(self):
try:
self._sock.setblocking(False)
data = b""
while True:
try:
chunk = self._sock.recv(4096)
if not chunk:
break
data += chunk
except (BlockingIOError, OSError):
break
self._sock.setblocking(True)
return data
except Exception:
return b""
def write(self, data):
self._sock.sendall(data)
def close(self):
try:
self._sock.close()
except Exception:
pass
def set_device_password(ip, user="root", old_password="", new_password="admin123a",
status_cb=None):
"""
Set device password via Telnet (if raw/reset) or SSH.
"""
# Jitter to avoid hammering the network with many concurrent connections
time.sleep(random.uniform(0.1, 1.5))
if status_cb:
status_cb("Checking Telnet port for raw device...")
# 1. Thử Telnet trước (OpenWrt mặc định mở Telnet 23 và cấm SSH Root khi chưa có Pass)
try:
tn = _SimpleTelnet(ip, timeout=5)
# Nếu vô được Telnet tức là thiết bị vừa Reset cứng chưa có pass
if status_cb:
status_cb("Telnet connected! Setting password...")
# Đợi logo OpenWrt và prompt "root@OpenWrt:/# "
time.sleep(1)
tn.read_very_eager()
# Gửi lệnh đổi pass
tn.write(b"passwd\n")
time.sleep(1)
tn.write(new_password.encode('ascii') + b"\n")
time.sleep(1)
tn.write(new_password.encode('ascii') + b"\n")
time.sleep(1)
# Thoát telnet
tn.write(b"exit\n")
time.sleep(0.5)
tn.close()
# Chờ 3 giây để OpenWrt kịp đóng Telnet và nổ tiến trình Dropbear (SSH Server)
if status_cb:
status_cb("Password set via Telnet. Waiting for SSH to start...")
time.sleep(3)
return "DONE"
except Exception as e:
# Bất kỳ lỗi Telnet nào (ConnectionRefused, Timeout, BrokenPipe...)
# đều có nghĩa là Telnet không truy cập được hoặc bị đóng ngang.
# Chuyển qua luồng kết nối SSH.
pass
# 2. Rơi xuống luồng SSH nếu thiết bị cũ (cổng Telnet đóng hoặc lỗi)
if status_cb:
status_cb("Connecting SSH for password update...")
last_error = None
for attempt in range(3):
try:
client = _create_ssh_client(ip, user, old_password, timeout=10)
if status_cb:
status_cb("Setting password via SSH...")
shell = client.invoke_shell()
time.sleep(1)
if shell.recv_ready():
shell.recv(65535)
shell.send("passwd\n")
time.sleep(2)
shell.send(f"{new_password}\n")
time.sleep(1)
shell.send(f"{new_password}\n")
time.sleep(2)
if shell.recv_ready():
shell.recv(65535)
shell.send("exit\n")
time.sleep(0.5)
shell.close()
client.close()
if status_cb:
status_cb("Password set ✓")
return "DONE"
except paramiko.AuthenticationException as e:
# Authentication means wrong password, retrying won't help here
if 'client' in locals() and client:
client.close()
return f"FAIL: Cannot connect SSH (Old pass incorrect?) — {e}"
except Exception as e:
last_error = e
if 'client' in locals() and client:
try: client.close()
except: pass
# Errno 64 or 113 usually means 'Host is down' or 'No route to host'.
# It can be a transient switch issue. Wait a bit and retry.
time.sleep(2)
return f"FAIL: Cannot connect SSH after 3 attempts — {last_error}"
def flash_device_ssh(ip, firmware_path, user="root", password="admin123a",
backup_password="", set_passwd=False, status_cb=None):
"""
Flash firmware to an OpenWrt device via SSH/SCP.
Steps (mirroring flash.ps1):
1. (Optional) Set device password via passwd
2. Upload firmware via SCP to /tmp/
3. Verify uploaded file
4. Sync filesystem
5. Execute sysupgrade
Returns:
"DONE" on success, "FAIL: reason" on error
"""
# ═══════════════════════════════════════════
# STEP 0: Set password (optional)
# ═══════════════════════════════════════════
if not set_passwd:
# Jitter here if we skip set_passwd
time.sleep(random.uniform(0.1, 1.5))
if set_passwd:
result = set_device_password(ip, user, "", password, status_cb)
if result.startswith("FAIL"):
# Try with backup password if set
if backup_password:
result = set_device_password(ip, user, backup_password, password, status_cb)
# If still failing, try with current intended password just in case it was already set
if result.startswith("FAIL"):
result = set_device_password(ip, user, password, password, status_cb)
if result.startswith("FAIL"):
return result
# ═══════════════════════════════════════════
# STEP 1: Connect SSH
# ═══════════════════════════════════════════
if status_cb:
status_cb("Connecting SSH...")
try:
client = _create_ssh_client(ip, user, password)
except paramiko.AuthenticationException:
return "FAIL: SSH authentication failed"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
try:
# ═══════════════════════════════════════════
# STEP 2: Upload firmware via SCP
# ═══════════════════════════════════════════
if status_cb:
status_cb("Uploading firmware via SCP...")
filename = os.path.basename(firmware_path)
remote_path = f"/tmp/{filename}"
upload_success = False
last_error = None
for attempt in range(3):
try:
scp_client = SCPClient(client.get_transport(), socket_timeout=350)
scp_client.put(firmware_path, remote_path)
scp_client.close()
upload_success = True
break
except Exception as e:
last_error = e
time.sleep(1.5)
if not upload_success:
return f"FAIL: SCP upload failed (Check Network) — {last_error}"
time.sleep(2)
# ═══════════════════════════════════════════
# STEP 3: Verify firmware uploaded
# ═══════════════════════════════════════════
if status_cb:
status_cb("Verifying firmware...")
stdin, stdout, stderr = client.exec_command(
f"test -f {remote_path} && ls -lh {remote_path}",
timeout=10
)
verify_output = stdout.read().decode("utf-8", errors="ignore").strip()
verify_err = stderr.read().decode("utf-8", errors="ignore").strip()
if not verify_output:
return f"FAIL: Firmware file not found on device after upload"
# ═══════════════════════════════════════════
# STEP 4: Sync filesystem
# ═══════════════════════════════════════════
if status_cb:
status_cb("Syncing filesystem...")
client.exec_command("sync", timeout=10)
time.sleep(2)
# ═══════════════════════════════════════════
# STEP 5: Flash firmware (sysupgrade)
# ═══════════════════════════════════════════
if status_cb:
status_cb("Flashing firmware (sysupgrade)...")
try:
# Capture output by redirecting to a file in /tmp first, or read from stdout
# Use -F to force upgrade and bypass "Image metadata not present" error for uImage files
stdin, stdout, stderr = client.exec_command(f"sysupgrade -F -v -n {remote_path} > /tmp/sysup.log 2>&1")
# Wait up to 4 seconds to see if it immediately fails
# Sysupgrade takes time and normally drops the connection, so if it finishes in < 4s, it's an error.
time.sleep(4)
if stdout.channel.exit_status_ready():
exit_code = stdout.channel.recv_exit_status()
# Fetch the error log
_, log_out, _ = client.exec_command("cat /tmp/sysup.log")
err_msg = log_out.read().decode("utf-8", errors="ignore").strip()
return f"FAIL: sysupgrade terminated early (Code {exit_code}). Details:\n{err_msg}"
except Exception:
# Connection drop during sysupgrade is exactly what we expect if it succeeds
pass
if status_cb:
status_cb("Rebooting...")
time.sleep(3)
return "DONE"
except Exception as e:
return f"FAIL: {e}"
finally:
try:
client.close()
except Exception:
pass

228
core/ssh_new_flash.py Normal file
View File

@@ -0,0 +1,228 @@
"""
Luồng SSH cho chế độ "Nạp Mới FW" (Factory Reset / Raw device).
Đặc điểm:
- Thiết bị vừa reset cứng → Telnet port 23 mở, SSH chưa có pass.
- Bước 0 (tuỳ chọn): đặt password qua Telnet → SSH (set_passwd=True).
- Bước 14: kết nối SSH, upload SCP, verify, sync + sysupgrade.
Public API:
set_device_password(ip, user, old_password, new_password, status_cb)
flash_device_new_ssh(ip, firmware_path, user, password,
backup_password, set_passwd, status_cb)
"""
import socket
import time
import random
import paramiko
from core.ssh_utils import (
_create_ssh_client,
_upload_firmware,
_verify_firmware,
_sync_and_sysupgrade,
)
# ──────────────────────────────────────────────────
# Telnet client thủ công (telnetlib bị xoá Python 3.13+)
# ──────────────────────────────────────────────────
class _SimpleTelnet:
"""Minimal Telnet client dùng raw socket."""
def __init__(self, host, port=23, timeout=10):
self._sock = socket.create_connection((host, port), timeout=timeout)
def read_very_eager(self):
try:
self._sock.setblocking(False)
data = b""
while True:
try:
chunk = self._sock.recv(4096)
if not chunk:
break
data += chunk
except (BlockingIOError, OSError):
break
self._sock.setblocking(True)
return data
except Exception:
return b""
def write(self, data: bytes):
self._sock.sendall(data)
def close(self):
try:
self._sock.close()
except Exception:
pass
# ──────────────────────────────────────────────────
# Đặt mật khẩu thiết bị (Telnet → SSH fallback)
# ──────────────────────────────────────────────────
def set_device_password(ip, user="root", old_password="",
new_password="admin123a", status_cb=None):
"""
Đặt mật khẩu thiết bị OpenWrt.
Thứ tự thử:
1. Telnet port 23 — thiết bị vừa reset (chưa có pass, SSH chưa mở)
2. SSH — thiết bị cũ có SSH nhưng cần đổi pass
Returns:
"DONE" — thành công
"FAIL: …" — thất bại sau tất cả các retry
"""
# Jitter để không hammering khi chạy song song nhiều thiết bị
time.sleep(random.uniform(0.1, 1.5))
# ── 1. Thử Telnet ──────────────────────────────────────────────
if status_cb:
status_cb("Checking Telnet port for raw device...")
try:
tn = _SimpleTelnet(ip, timeout=5)
if status_cb:
status_cb("Telnet connected! Setting password...")
time.sleep(1)
tn.read_very_eager() # Flush banner OpenWrt
tn.write(b"passwd\n"); time.sleep(1)
tn.write(new_password.encode("ascii") + b"\n"); time.sleep(1)
tn.write(new_password.encode("ascii") + b"\n"); time.sleep(1)
tn.write(b"exit\n"); time.sleep(0.5)
tn.close()
if status_cb:
status_cb("Password set via Telnet. Waiting for SSH to start...")
time.sleep(3) # Chờ Dropbear (SSH daemon) khởi động
return "DONE"
except Exception:
# Telnet không truy cập được → thử SSH bên dưới
pass
# ── 2. Fallback SSH ─────────────────────────────────────────────
if status_cb:
status_cb("Connecting SSH for password update...")
last_err = None
for attempt in range(3):
try:
client = _create_ssh_client(ip, user, old_password, timeout=10)
if status_cb:
status_cb("Setting password via SSH...")
shell = client.invoke_shell()
time.sleep(1)
if shell.recv_ready():
shell.recv(65535)
shell.send("passwd\n"); time.sleep(2)
shell.send(f"{new_password}\n"); time.sleep(1)
shell.send(f"{new_password}\n"); time.sleep(2)
if shell.recv_ready():
shell.recv(65535)
shell.send("exit\n"); time.sleep(0.5)
shell.close()
client.close()
if status_cb:
status_cb("Password set ✓")
return "DONE"
except paramiko.AuthenticationException as e:
return f"FAIL: Cannot connect SSH (Old pass incorrect?) — {e}"
except Exception as e:
last_err = e
try:
client.close()
except Exception:
pass
time.sleep(2)
return f"FAIL: Cannot connect SSH after 3 attempts — {last_err}"
# ──────────────────────────────────────────────────
# Flash firmware — Nạp Mới (Factory Reset)
# ──────────────────────────────────────────────────
def flash_device_new_ssh(ip, firmware_path, user="root", password="admin123a",
backup_password="", set_passwd=False, status_cb=None):
"""
Flash firmware lên thiết bị OpenWrt vừa reset / chưa có mật khẩu.
Luồng:
0. (Tuỳ chọn) Đặt mật khẩu qua Telnet / SSH
1. Kết nối SSH
2. Upload firmware qua SCP lên /tmp/
3. Verify file tồn tại
4. sync + sysupgrade
Args:
ip : địa chỉ IP thiết bị
firmware_path : đường dẫn file .bin trên máy tính
user : SSH username (mặc định "root")
password : mật khẩu SSH (hoặc mật khẩu mới sẽ đặt)
backup_password : mật khẩu dự phòng nếu password chính không vào được
set_passwd : True = chạy bước đặt mật khẩu trước khi flash
status_cb : callback(str) để cập nhật trạng thái lên UI
Returns:
"DONE" — flash thành công
"FAIL: …" — thất bại, kèm lý do
"""
# ── STEP 0: Đặt mật khẩu (tuỳ chọn) ──────────────────────────
if set_passwd:
result = set_device_password(ip, user, "", password, status_cb)
if result.startswith("FAIL"):
# Thử với backup_password nếu có
if backup_password:
result = set_device_password(ip, user, backup_password, password, status_cb)
# Thử xem password đã được đặt chưa (idempotent)
if result.startswith("FAIL"):
result = set_device_password(ip, user, password, password, status_cb)
if result.startswith("FAIL"):
return result
else:
time.sleep(random.uniform(0.1, 1.5)) # Jitter khi không set_passwd
# ── STEP 1: Kết nối SSH ─────────────────────────────────────────
if status_cb:
status_cb("Connecting SSH...")
try:
client = _create_ssh_client(ip, user, password)
except paramiko.AuthenticationException:
return "FAIL: SSH authentication failed"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
# ── STEP 24: Upload → Verify → sysupgrade ─────────────────────
try:
remote_path = _upload_firmware(client, firmware_path, status_cb)
_verify_firmware(client, remote_path, status_cb)
return _sync_and_sysupgrade(client, remote_path, status_cb)
except RuntimeError as e:
return f"FAIL: {e}"
except Exception as e:
return f"FAIL: {e}"
finally:
try:
client.close()
except Exception:
pass

92
core/ssh_update_flash.py Normal file
View File

@@ -0,0 +1,92 @@
"""
Luồng SSH cho chế độ "Update Firmware" (thiết bị đã cài sẵn OpenWrt).
Đặc điểm:
- Thiết bị đang chạy bình thường, SSH đã mở sẵn với pass đã biết.
- Không có bước Telnet / đặt lại mật khẩu.
- Kết nối trực tiếp → upload SCP → verify → sync + sysupgrade.
Public API:
flash_device_update_ssh(ip, firmware_path, user, password,
backup_password, status_cb)
"""
import time
import paramiko
from core.ssh_utils import (
_create_ssh_client,
_upload_firmware,
_verify_firmware,
_sync_and_sysupgrade,
)
def flash_device_update_ssh(ip, firmware_path, user="root",
password="admin123a", backup_password="",
status_cb=None):
"""
Cập nhật firmware lên thiết bị OpenWrt đang chạy qua SSH / sysupgrade.
Luồng:
1. Kết nối SSH (thử password, nếu lỗi auth thử backup_password)
2. Upload firmware qua SCP lên /tmp/
3. Verify file tồn tại
4. sync + sysupgrade
Args:
ip : địa chỉ IP thiết bị
firmware_path : đường dẫn file .bin trên máy tính
user : SSH username (mặc định "root")
password : SSH password chính
backup_password : SSH password dự phòng nếu password chính sai
status_cb : callback(str) để cập nhật trạng thái lên UI
Returns:
"DONE" — update thành công
"FAIL: …" — thất bại, kèm lý do
"""
# ── STEP 1: Kết nối SSH ─────────────────────────────────────────
if status_cb:
status_cb("Connecting SSH...")
client = None
# Thử password chính trước
try:
client = _create_ssh_client(ip, user, password)
except paramiko.AuthenticationException:
# Thử backup_password nếu có
if backup_password:
try:
client = _create_ssh_client(ip, user, backup_password)
except paramiko.AuthenticationException:
return "FAIL: SSH authentication failed (wrong password)"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
else:
return "FAIL: SSH authentication failed"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
# ── STEP 24: Upload → Verify → sysupgrade ─────────────────────
try:
remote_path = _upload_firmware(client, firmware_path, status_cb)
_verify_firmware(client, remote_path, status_cb)
return _sync_and_sysupgrade(client, remote_path, status_cb)
except RuntimeError as e:
return f"FAIL: {e}"
except Exception as e:
return f"FAIL: {e}"
finally:
try:
client.close()
except Exception:
pass

120
core/ssh_utils.py Normal file
View File

@@ -0,0 +1,120 @@
"""
SSH/SCP helper functions dùng chung cho cả 2 luồng flash.
Không chứa logic nghiệp vụ — chỉ là transport layer:
_create_ssh_client() — kết nối SSH với retry
_upload_firmware() — upload file qua SCP lên /tmp/
_verify_firmware() — kiểm tra file tồn tại trên device
_sync_and_sysupgrade() — sync + sysupgrade, trả về "DONE" / "FAIL: ..."
"""
import os
import time
import paramiko
from scp import SCPClient
def _create_ssh_client(ip, user, password, timeout=15):
"""Tạo SSH client với AutoAddPolicy, có retry 3 lần."""
last_err = None
for attempt in range(3):
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
ip, username=user, password=password,
timeout=timeout, look_for_keys=False, allow_agent=False,
)
return client
except paramiko.AuthenticationException:
# Sai password — retry không giúp ích gì
raise
except Exception as e:
last_err = e
if attempt < 2:
time.sleep(1)
raise last_err
def _upload_firmware(client, firmware_path, status_cb=None):
"""
Upload firmware qua SCP lên /tmp/<filename>.
Trả về remote_path khi thành công, raise RuntimeError nếu thất bại.
"""
if status_cb:
status_cb("Uploading firmware via SCP...")
filename = os.path.basename(firmware_path)
remote_path = f"/tmp/{filename}"
last_err = None
for attempt in range(3):
try:
scp = SCPClient(client.get_transport(), socket_timeout=350)
scp.put(firmware_path, remote_path)
scp.close()
time.sleep(2)
return remote_path
except Exception as e:
last_err = e
time.sleep(1.5)
raise RuntimeError(f"SCP upload failed (Check Network) — {last_err}")
def _verify_firmware(client, remote_path, status_cb=None):
"""
Xác nhận file firmware tồn tại trên thiết bị.
Raise RuntimeError nếu file không có.
"""
if status_cb:
status_cb("Verifying firmware...")
_, stdout, _ = client.exec_command(
f"test -f {remote_path} && ls -lh {remote_path}", timeout=10
)
output = stdout.read().decode("utf-8", errors="ignore").strip()
if not output:
raise RuntimeError("Firmware file not found on device after upload")
def _sync_and_sysupgrade(client, remote_path, status_cb=None):
"""
Sync filesystem rồi chạy sysupgrade.
Trả về "DONE" khi thành công, "FAIL: ..." khi sysupgrade lỗi sớm.
Connection drop trong lúc sysupgrade = thành công (thiết bị đang reboot).
"""
if status_cb:
status_cb("Syncing filesystem...")
client.exec_command("sync", timeout=10)
time.sleep(2)
if status_cb:
status_cb("Flashing firmware (sysupgrade)...")
try:
# -F: bỏ qua kiểm tra metadata (cần cho uImage)
# -v: verbose -n: không giữ cấu hình cũ
_, stdout, _ = client.exec_command(
f"sysupgrade -F -v -n {remote_path} > /tmp/sysup.log 2>&1"
)
# Chờ tối đa 4 giây — nếu exit quá sớm thì coi như lỗi
time.sleep(4)
if stdout.channel.exit_status_ready():
exit_code = stdout.channel.recv_exit_status()
_, log_out, _ = client.exec_command("cat /tmp/sysup.log")
err_msg = log_out.read().decode("utf-8", errors="ignore").strip()
return (
f"FAIL: sysupgrade terminated early "
f"(Code {exit_code}). Details:\n{err_msg}"
)
except Exception:
# Connection drop = device đang reboot = thành công
pass
if status_cb:
status_cb("Rebooting...")
time.sleep(3)
return "DONE"

43
core/workers.py Normal file
View File

@@ -0,0 +1,43 @@
"""
Workers module — chứa các QThread dùng cho tác vụ nền.
Scan:
ScanThread — quét mạng LAN tìm thiết bị OpenWrt.
Flash (tách thành 2 module riêng):
core.flash_new_worker → NewFlashThread (Nạp Mới FW)
core.flash_update_worker → UpdateFlashThread (Update FW)
"""
from PyQt6.QtCore import QThread, pyqtSignal
from core.scanner import scan_network
class ScanThread(QThread):
"""Quét mạng LAN trong background thread để không đóng băng UI."""
finished = pyqtSignal(list)
error = pyqtSignal(str)
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
stage = pyqtSignal(str) # tên giai đoạn hiện tại
def __init__(self, network):
super().__init__()
self.network = network
def run(self):
try:
def on_ping_progress(done, total):
self.scan_progress.emit(done, total)
def on_stage(s):
self.stage.emit(s)
results = scan_network(self.network,
progress_cb=on_ping_progress,
stage_cb=on_stage)
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))

View File

@@ -1,205 +0,0 @@
"""
Debug Flash — chạy đầy đủ 3 bước flash và log chi tiết từng bước.
Usage:
python debug_full.py <IP> <firmware.bin>
python debug_full.py 192.168.11.17 V3.0.6p5.bin
"""
import sys
import os
import requests
import re
def debug_flash(ip, firmware_path, username="root", password=""):
base_url = f"http://{ip}"
session = requests.Session()
print(f"{'='*65}")
print(f" Full Flash Debug — {ip}")
print(f" Firmware: {os.path.basename(firmware_path)}")
print(f" Size: {os.path.getsize(firmware_path) / 1024 / 1024:.2f} MB")
print(f"{'='*65}")
# ═══════════════════════════════════
# STEP 1: Login
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 1: Login")
print(f"{''*65}")
login_url = f"{base_url}/cgi-bin/luci"
print(f" → GET {login_url}")
try:
r = session.get(login_url, timeout=10)
print(f" Status: {r.status_code}")
except Exception as e:
print(f" ❌ Cannot connect: {e}")
return
# Detect form fields
if 'name="luci_username"' in r.text:
login_data = {"luci_username": username, "luci_password": password}
print(f" Fields: luci_username / luci_password")
else:
login_data = {"username": username, "password": password}
print(f" Fields: username / password")
print(f"\n → POST {login_url}")
print(f" Data: {login_data}")
resp = session.post(login_url, data=login_data,
timeout=10, allow_redirects=True)
print(f" Status: {resp.status_code}")
print(f" Cookies: {dict(session.cookies)}")
# Extract stok
stok = None
for source in [resp.url, resp.text]:
match = re.search(r";stok=([a-f0-9]+)", source)
if match:
stok = match.group(1)
break
if not stok:
for hist in resp.history:
match = re.search(r";stok=([a-f0-9]+)",
hist.headers.get("Location", ""))
if match:
stok = match.group(1)
break
print(f" stok: {stok}")
has_cookie = "sysauth" in str(session.cookies)
print(f" sysauth: {'✅ YES' if has_cookie else '❌ NO'}")
if not stok and not has_cookie:
print(f"\n ❌ LOGIN FAILED — no stok, no sysauth cookie")
return
print(f"\n ✅ LOGIN OK")
# Build flash URL
if stok:
flash_url = f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops"
else:
flash_url = f"{base_url}/cgi-bin/luci/admin/system/flashops"
# ═══════════════════════════════════
# STEP 2: Upload firmware
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 2: Upload firmware")
print(f"{''*65}")
print(f" → POST {flash_url}")
print(f" File field: image")
print(f" File name: {os.path.basename(firmware_path)}")
print(f" keep: NOT sent (unchecked)")
print(f" Uploading...")
filename = os.path.basename(firmware_path)
with open(firmware_path, "rb") as f:
resp = session.post(
flash_url,
data={}, # No keep = uncheck Keep Settings
files={"image": (filename, f, "application/octet-stream")},
timeout=300,
)
print(f" Status: {resp.status_code}")
print(f" URL: {resp.url}")
# Check response content
resp_lower = resp.text.lower()
has_verify = "verify" in resp_lower
has_proceed = "proceed" in resp_lower
has_checksum = "checksum" in resp_lower
has_image_form = 'name="image"' in resp.text and 'type="file"' in resp.text
print(f"\n Response analysis:")
print(f" Has 'verify': {'' if has_verify else ''}")
print(f" Has 'proceed': {'' if has_proceed else ''}")
print(f" Has 'checksum': {'' if has_checksum else ''}")
print(f" Has flash form: {'⚠️ (upload ignored!)' if has_image_form else '✅ (not flash form)'}")
# Extract checksum from verification page
checksum = re.search(r'Checksum:\s*<code>([a-f0-9]+)</code>', resp.text)
size_info = re.search(r'Size:\s*([\d.]+\s*MB)', resp.text)
if checksum:
print(f" Checksum: {checksum.group(1)}")
if size_info:
print(f" Size: {size_info.group(1)}")
# Check for keep settings status
if "will be erased" in resp.text:
print(f" Config: Will be ERASED ✅ (keep=off)")
elif "will be kept" in resp.text:
print(f" Config: Will be KEPT ⚠️ (keep=on)")
if not has_verify or not has_proceed:
print(f"\n ❌ Did NOT get verification page!")
# Show cleaned response
text = re.sub(r'<[^>]+>', ' ', resp.text)
text = re.sub(r'\s+', ' ', text).strip()
print(f" Response: {text[:500]}")
return
# Show the Proceed form
print(f"\n Proceed form (from HTML):")
forms = re.findall(r'<form[^>]*>(.*?)</form>', resp.text, re.DOTALL)
for i, form_body in enumerate(forms):
if 'value="Proceed"' in form_body or 'step' in form_body:
inputs = re.findall(r'<input[^>]*/?\s*>', form_body)
for inp in inputs:
print(f" {inp.strip()}")
print(f"\n ✅ UPLOAD OK — Verification page received")
# ═══════════════════════════════════
# STEP 3: Proceed (confirm flash)
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 3: Proceed (confirm flash)")
print(f"{''*65}")
confirm_data = {
"step": "2",
"keep": "",
}
print(f" → POST {flash_url}")
print(f" Data: {confirm_data}")
try:
resp = session.post(flash_url, data=confirm_data, timeout=300)
print(f" Status: {resp.status_code}")
# Show cleaned response
text = re.sub(r'<[^>]+>', ' ', resp.text)
text = re.sub(r'\s+', ' ', text).strip()
print(f" Response: {text[:300]}")
except requests.ConnectionError:
print(f" ✅ Connection dropped — device is REBOOTING!")
except requests.ReadTimeout:
print(f" ✅ Timeout — device is REBOOTING!")
print(f"\n{'='*65}")
print(f" 🎉 FLASH COMPLETE")
print(f"{'='*65}")
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python debug_full.py <IP> <firmware.bin>")
print("Example: python debug_full.py 192.168.11.17 V3.0.6p5.bin")
sys.exit(1)
ip = sys.argv[1]
fw = sys.argv[2]
if not os.path.exists(fw):
print(f"❌ File not found: {fw}")
sys.exit(1)
debug_flash(ip, fw)

View File

@@ -1,155 +0,0 @@
# Tài liệu Flash Firmware — IoT Firmware Loader
## Tổng quan
Ứng dụng tự động hóa quá trình nạp firmware cho thiết bị **OpenWrt Barrier Breaker 14.07** thông qua giao diện web **LuCI**. Thay vì thao tác thủ công trên trình duyệt, ứng dụng thực hiện 3 bước HTTP tự động cho mỗi thiết bị.
---
## Luồng hoạt động
```mermaid
flowchart TD
A[Chọn Firmware .bin] --> B[Scan LAN]
B --> C[Hiển thị danh sách thiết bị]
C --> D{Chọn thiết bị ☑}
D --> E[Nhấn Flash Selected Devices]
E --> F[FlashThread chạy background]
F --> G["ThreadPoolExecutor\n(max_workers = N)"]
G --> H1[Device 1 → flash_device]
G --> H2[Device 2 → flash_device]
G --> H3[Device N → flash_device]
H1 & H2 & H3 --> I[All Done → Thông báo]
```
### Chi tiết `flash_device()` cho mỗi thiết bị
```mermaid
flowchart TD
S1["STEP 1: Login\nGET /cgi-bin/luci\nPOST username=root, password=empty"] --> C1{Thành công?}
C1 -->|Có cookie sysauth + stok| S2
C1 -->|403 Denied| F1["FAIL: Login denied (403)"]
C1 -->|Không có session| F2["FAIL: Login failed — no session"]
S2["STEP 2: Upload Firmware\nPOST /flashops\nField: image=firmware.bin\nkeep=KHÔNG gửi (bỏ tích)"] --> C2{Response?}
C2 -->|Trang Verify + Proceed| S3
C2 -->|HTTP ≠ 200| F3["FAIL: Upload HTTP xxx"]
C2 -->|invalid image| F4["FAIL: Invalid firmware image"]
C2 -->|unsupported| F5["FAIL: Firmware not compatible"]
C2 -->|Vẫn hiện form upload| F6["FAIL: Upload ignored by server"]
S3["STEP 3: Proceed\nPOST step=2, keep=empty\nXác nhận flash"] --> C3{Response?}
C3 -->|200 Flashing...| R["DONE ✅\nThiết bị đang reboot"]
C3 -->|Connection dropped| R
C3 -->|Read timeout| R
```
---
## Bảng Status
### Status trên cột "Status" trong bảng thiết bị
| Icon | Status | Điều kiện hiển thị |
| ---- | ---------------------------------------- | ------------------------------------------------------------- |
| — | `READY` | Sau khi scan, thiết bị chưa được flash |
| ⏳ | `Logging in...` | Đang POST login vào LuCI |
| ⏳ | `Uploading firmware...` | Đang upload file .bin (~30MB) lên thiết bị |
| ⏳ | `Confirming (Proceed)...` | Đã upload xong, đang gửi lệnh xác nhận flash |
| ⏳ | `Rebooting...` | Thiết bị đang reboot sau khi flash |
| ✅ | `DONE` | Flash thành công, thiết bị đang khởi động lại |
| ✅ | `DONE (rebooting)` | Flash thành công nhưng timeout khi chờ response (bình thường) |
| ❌ | `FAIL: Cannot connect` | Không kết nối được tới thiết bị (sai IP, khác mạng) |
| ❌ | `FAIL: Login denied (403)` | Thiết bị từ chối đăng nhập (sai mật khẩu) |
| ❌ | `FAIL: Login failed — no session` | Login không trả về cookie hoặc token |
| ❌ | `FAIL: Upload HTTP xxx` | Server trả mã lỗi HTTP khi upload |
| ❌ | `FAIL: Invalid firmware image` | File firmware không hợp lệ |
| ❌ | `FAIL: Firmware not compatible` | Firmware không tương thích với thiết bị |
| ❌ | `FAIL: Upload ignored by server` | Server nhận file nhưng không xử lý (sai form field) |
| ❌ | `FAIL: Unexpected response after upload` | Không nhận được trang xác nhận Verify |
---
## Chi tiết kỹ thuật HTTP
### Step 1: Login
```
GET http://{IP}/cgi-bin/luci → Lấy trang login, phát hiện field name
POST http://{IP}/cgi-bin/luci → Gửi username=root&password=
```
| Kết quả | Điều kiện |
| -------------- | ------------------------------------------------------------------ |
| **Thành công** | Response chứa `sysauth` cookie VÀ/HOẶC `stok` token trong URL/body |
| **Thất bại** | HTTP 403, hoặc không có cookie/token |
**Field names tự động phát hiện:**
- OpenWrt Barrier Breaker 14.07: `username` / `password`
- OpenWrt mới hơn: `luci_username` / `luci_password`
### Step 2: Upload Firmware
```
POST http://{IP}/cgi-bin/luci/;stok={TOKEN}/admin/system/flashops
Content-Type: multipart/form-data
Fields:
image = [firmware.bin] (file upload)
keep = (KHÔNG gửi) (bỏ tích Keep Settings)
```
| Kết quả | Điều kiện |
| -------------- | ----------------------------------------------------------------------- |
| **Thành công** | Response chứa "Flash Firmware - Verify" + "Proceed" + checksum |
| **Thất bại** | Response chứa "invalid image", "unsupported", hoặc vẫn hiện form upload |
### Step 3: Proceed (Xác nhận)
```
POST http://{IP}/cgi-bin/luci/;stok={TOKEN}/admin/system/flashops
Fields:
step = 2
keep = (empty string)
```
| Kết quả | Điều kiện |
| -------------- | ------------------------------------------------------------------------------- |
| **Thành công** | Response "The system is flashing now" HOẶC connection bị ngắt (thiết bị reboot) |
| **Thất bại** | Hiếm khi xảy ra — nếu đã qua Step 2 thì Step 3 gần như luôn thành công |
---
## Xử lý song song
```
FlashThread (QThread - background)
└── ThreadPoolExecutor (max_workers = N)
├── Thread 1 → flash_device(IP_1)
├── Thread 2 → flash_device(IP_2)
├── ...
└── Thread N → flash_device(IP_N)
```
| Config | Giá trị | Ý nghĩa |
| ------------------------- | --------- | ------------------------------ |
| Concurrent devices = `10` | Mặc định | Flash 10 thiết bị song song |
| Concurrent devices = `0` | Unlimited | Flash tất cả thiết bị cùng lúc |
| Concurrent devices = `1` | Tuần tự | Flash từng thiết bị một |
**Mỗi thiết bị có session HTTP riêng** → không bị lẫn cookie/token giữa các thiết bị.
---
## Files liên quan
| File | Chức năng |
| --------------- | --------------------------------------------- |
| `flasher.py` | Logic flash 3 bước (login → upload → proceed) |
| `main.py` | UI PyQt6, FlashThread, quản lý song song |
| `debug_full.py` | Script debug — chạy 3 bước với log chi tiết |
| `scanner.py` | Scan mạng LAN tìm thiết bị |
""", "Complexity": 3, "Description": "Created flash documentation file with workflow diagrams, status conditions, HTTP details, and parallel processing explanation.", "EmptyFile": false, "IsArtifact": false, "Overwrite": false, "TargetFile": "/Users/nguyennhatminh/Documents/file code/Smatec/iot_fw_loader/FLASH_DOC.md

123
docs/api_flash_docs.md Normal file
View File

@@ -0,0 +1,123 @@
# Tài liệu Kỹ thuật: Flash Firmware qua LuCI API (`core/api_flash.py`)
Module `api_flash.py` tự động hoá quá trình nạp firmware cho thiết bị **OpenWrt** thông qua giao diện web **LuCI HTTP**. Được dùng trong chế độ **Nạp Mới FW** với method `"api"`.
---
## 1. Kiến Trúc — Vai Trò File
| File | Vai trò |
| -------------------------- | ----------------------------------------------------------------------- |
| `core/api_flash.py` | Logic flash 3 bước (login → upload → proceed) qua LuCI |
| `core/flash_new_worker.py` | `NewFlashThread` — dispatch tới `flash_device_api()` khi `method="api"` |
| `main.py` | UI PyQt6, chọn mode/method, truyền tham số vào worker |
---
## 2. Sơ Đồ Luồng
```mermaid
flowchart TD
A[NewFlashThread - method=api] --> B[flash_device_api]
B --> S1["STEP 1: Login\nGET /cgi-bin/luci — phát hiện field name\nPOST username=root & password="]
S1 --> C1{Thành công?}
C1 -->|sysauth cookie + stok| S2
C1 -->|HTTP 403| F1["FAIL: Login denied (403)"]
C1 -->|Không có session| F2["FAIL: Login failed — no session"]
S2["STEP 2: Upload Firmware\nPOST /flashops\nmultipart: image=firmware.bin"] --> C2{Response?}
C2 -->|Trang Verify + Proceed| S3
C2 -->|HTTP ≠ 200| F3["FAIL: Upload HTTP xxx"]
C2 -->|invalid image| F4["FAIL: Invalid firmware image"]
C2 -->|unsupported| F5["FAIL: Firmware not compatible"]
C2 -->|Vẫn hiện form upload| F6["FAIL: Upload ignored by server"]
S3["STEP 3: Proceed\nPOST step=2 & keep=empty"] --> C3{Response?}
C3 -->|Connection dropped / Timeout| R["DONE ✅ — Device đang reboot"]
C3 -->|200 OK| R
```
---
## 3. Chi Tiết Kỹ Thuật HTTP
### Step 1 — Login
```
GET http://{IP}/cgi-bin/luci → Lấy HTML login, phát hiện field name
POST http://{IP}/cgi-bin/luci → Đăng nhập
```
**Tự động phát hiện field name tương thích:**
| Phiên bản | Field |
| --------------------- | --------------------------------- |
| Barrier Breaker 14.07 | `username` / `password` |
| OpenWrt mới hơn | `luci_username` / `luci_password` |
**Lấy session:** `stok` token được tìm tuần tự trong URL → body HTML → redirect history. Cookie `sysauth` là fallback nếu không có `stok`.
### Step 2 — Upload Firmware
```
POST http://{IP}/cgi-bin/luci/;stok={TOKEN}/admin/system/flashops
Content-Type: multipart/form-data
image = firmware.bin (file upload)
keep = (không gửi) → bỏ tích "Keep Settings" = Clean Flash
```
Thành công khi response trả về trang **"Flash Firmware - Verify"** chứa từ khoá `verify` + `proceed`.
### Step 3 — Confirm (Proceed)
```
POST http://{IP}/cgi-bin/luci/;stok={TOKEN}/admin/system/flashops
step = 2
keep = (empty)
```
**Đứt kết nối = Thành công:** Device bắt đầu flash → SSH/HTTP request bị drop là hành vi mong muốn. `ConnectionError``ReadTimeout` đều được bắt và trả về `"DONE"`.
---
## 4. Bảng Status UI
| Icon | Status | Điều kiện |
| ---- | ---------------------------------------- | ------------------------------------------ |
| ⏳ | `Logging in...` | Đang POST login vào LuCI |
| ⏳ | `Uploading firmware...` | Đang upload file .bin |
| ⏳ | `Confirming (Proceed)...` | Đang gửi lệnh xác nhận flash |
| ⏳ | `Rebooting...` | Chờ device khởi động lại |
| ✅ | `DONE` | Flash thành công |
| ✅ | `DONE (rebooting)` | Flash thành công, timeout khi chờ response |
| ❌ | `FAIL: Cannot connect` | Không kết nối được (sai IP / khác mạng) |
| ❌ | `FAIL: Login denied (403)` | Sai mật khẩu LuCI |
| ❌ | `FAIL: Login failed — no session` | Không có cookie/token sau login |
| ❌ | `FAIL: Upload HTTP xxx` | Server trả lỗi HTTP khi upload |
| ❌ | `FAIL: Invalid firmware image` | File firmware không hợp lệ |
| ❌ | `FAIL: Firmware not compatible` | Firmware không tương thích thiết bị |
| ❌ | `FAIL: Upload ignored by server` | Server không xử lý file (sai form field) |
| ❌ | `FAIL: Unexpected response after upload` | Không nhận được trang Verify |
---
## 5. Xử Lý Song Song
```
NewFlashThread (QThread)
└── ThreadPoolExecutor (max_workers = N)
├── Thread 1 → flash_device_api(IP_1)
├── Thread 2 → flash_device_api(IP_2)
└── Thread N → flash_device_api(IP_N)
```
| Concurrent devices | Ý nghĩa |
| ------------------ | --------------------------------- |
| `10` (mặc định) | Flash 10 thiết bị song song |
| `0` | Unlimited — flash tất cả cùng lúc |
| `1` | Tuần tự — từng thiết bị một |
Mỗi thiết bị có `requests.Session()` riêng — không bị lẫn cookie/token.

166
docs/load_fw_ssh_docs.md Normal file
View File

@@ -0,0 +1,166 @@
# Tài liệu Kỹ thuật: Flash Firmware qua SSH
Tài liệu này gộp toàn bộ logic SSH cho cả hai quy trình: **Nạp Mới FW** (Factory Reset) và **Update FW** (thiết bị đang chạy). Luồng SSH được tách thành nhiều file với trách nhiệm rõ ràng thay vì gói gọn trong một module duy nhất.
---
## 1. Kiến Trúc Tổng Quan — Vai Trò Từng File
```
core/
├── ssh_utils.py ← Transport layer dùng chung (không có business logic)
├── ssh_new_flash.py ← Luồng SSH Nạp Mới FW (Telnet → SSH → sysupgrade)
├── ssh_update_flash.py ← Luồng SSH Update FW (SSH trực tiếp → sysupgrade)
├── flash_new_worker.py ← QThread điều phối Nạp Mới (API LuCI hoặc SSH)
└── flash_update_worker.py← QThread điều phối Update FW (SSH only)
```
### `core/ssh_utils.py` — Transport Layer Dùng Chung
Chứa các helper function cấp thấp, **không mang logic nghiệp vụ**, được import bởi cả 2 luồng SSH:
| Hàm | Mô tả |
| ------------------------------------------------------ | ------------------------------------------------------------------------------------------------------------------------- |
| `_create_ssh_client(ip, user, password, timeout)` | Tạo SSH client với `AutoAddPolicy`, retry 3 lần. Nếu lỗi `AuthenticationException` thì raise ngay (không retry vô nghĩa). |
| `_upload_firmware(client, firmware_path, status_cb)` | Upload file `.bin` qua SCP lên `/tmp/<filename>`. Retry 3 lần, timeout SCP là 350 giây. Trả về `remote_path`. |
| `_verify_firmware(client, remote_path, status_cb)` | Chạy `test -f && ls -lh` để xác nhận file tồn tại trên device sau khi upload. |
| `_sync_and_sysupgrade(client, remote_path, status_cb)` | Chạy `sync` rồi `sysupgrade -F -v -n`. Đứt kết nối = thành công. Trả về `"DONE"` hoặc `"FAIL: ..."`. |
### `core/ssh_new_flash.py` — Luồng Nạp Mới FW
Xử lý thiết bị **vừa Factory Reset** hoặc **chưa có mật khẩu**. Bao gồm:
- **`_SimpleTelnet`**: Telnet client thủ công bằng raw socket — thay thế `telnetlib` bị xoá khỏi Python 3.13+.
- **`set_device_password()`**: Đặt mật khẩu thiết bị, thử Telnet port 23 trước, fallback sang SSH nếu Telnet đóng.
- **`flash_device_new_ssh()`**: Hàm flash chính — gọi `set_device_password` (nếu `set_passwd=True`) rồi kết nối SSH và gọi pipeline `_upload → _verify → _sync_and_sysupgrade` từ `ssh_utils`.
### `core/ssh_update_flash.py` — Luồng Update FW
Xử lý thiết bị **đang chạy MiraV3**, SSH đã mở sẵn với pass đã biết. Không có Telnet, không có set_passwd:
- **`flash_device_update_ssh()`**: Kết nối SSH (thử `password`, fallback `backup_password` nếu auth fail), rồi gọi thẳng pipeline `_upload → _verify → _sync_and_sysupgrade` từ `ssh_utils`.
### `core/flash_new_worker.py` — QThread Nạp Mới
`NewFlashThread` — điều phối flash song song cho chế độ **Nạp Mới**:
- Nếu `method="ssh"` → gọi `flash_device_new_ssh()` từ `ssh_new_flash`.
- Nếu `method="api"` → gọi `flash_device()` từ `flasher` (LuCI HTTP).
- Nhận đủ credentials từ UI: `ssh_user`, `ssh_password`, `ssh_backup_password`, `set_passwd`.
### `core/flash_update_worker.py` — QThread Update FW
`UpdateFlashThread` — điều phối flash song song cho chế độ **Update FW**:
- Luôn dùng SSH, credentials hardcode an toàn trong module (`root` / `admin123a`, backup `admin`).
- Không nhận credentials từ UI — tránh nhập sai gây flash nhầm.
---
## 2. Sơ Đồ Luồng — Nạp Mới FW (SSH)
```mermaid
graph TD
A[NewFlashThread.run] --> B{method = ssh?}
B -- api --> API[flash_device - LuCI HTTP]
B -- ssh --> C[flash_device_new_ssh]
C -->|set_passwd=True| D[set_device_password]
C -->|set_passwd=False| Jitter[Jitter 0.1s - 1.5s]
D --> D1[Thử Telnet port 23]
D1 -- Thành công --> D2[passwd > new_password via Telnet]
D2 --> D3[Chờ 3s để Dropbear khởi động]
D1 -- Lỗi / Timeout --> D4[Fallback SSH với old_password rỗng]
D4 -- Auth Fail --> D5[Thử backup_password]
D5 -- Auth Fail --> D6[Thử chính new_password - idempotent]
D6 -- Fail --> Z[DỪNG - Báo FAIL]
D3 --> F
D4 -- OK --> F
D5 -- OK --> F
D6 -- OK --> F
Jitter --> F[_create_ssh_client - retry 3 lần]
F --> G[_upload_firmware via SCP vào /tmp/]
G -->|retry 3 lần| H[_verify_firmware - test -f]
H --> I[_sync_and_sysupgrade]
I --> J[sync rồi sysupgrade -F -v -n]
J --> K{Kết nối đứt?}
K -- Có --> L[DONE - Device đang Reboot]
K -- Không trong 4s --> M[FAIL - Lấy log /tmp/sysup.log]
```
## 3. Sơ Đồ Luồng — Update FW (SSH)
```mermaid
graph TD
A[UpdateFlashThread.run] --> B[flash_device_update_ssh]
B --> C[_create_ssh_client với password chính]
C -- Auth OK --> F
C -- AuthenticationException --> D[Thử backup_password]
D -- Auth OK --> F
D -- Fail --> Z[DỪNG - Báo FAIL]
F[_upload_firmware via SCP vào /tmp/] -->|retry 3 lần| G[_verify_firmware - test -f]
G --> H[_sync_and_sysupgrade]
H --> I[sync rồi sysupgrade -F -v -n]
I --> J{Kết nối đứt?}
J -- Có --> K[DONE - Device đang Reboot]
J -- Không trong 4s --> L[FAIL - Lấy log /tmp/sysup.log]
```
---
## 4. Phân Tích Logic Cốt Lõi
### 4.1 Cơ Chế Dự Phòng Mật Khẩu Đa Tầng (Nạp Mới)
Thuật toán `set_device_password` có 4 lớp dự phòng để chiếm được quyền Root trên thiết bị dù ở trạng thái nào:
1. **Telnet (Port 23):** OpenWrt ngay sau Factory Reset mở Telnet không cần mật khẩu nhưng chặn SSH. Script luôn thử cổng này đầu tiên. Mọi exception (Refused, Timeout, Broken Pipe) đều kích hoạt fallback.
2. **SSH mật khẩu rỗng `""`:** Nếu Telnet đóng, thử SSH với pass rỗng — trường hợp thiết bị đang ở trạng thái semi-configured.
3. **SSH Backup Password:** Nếu pass rỗng bị `AuthenticationException`, thiết bị đang kẹt một pass cũ — thử `backup_password`.
4. **SSH Target Password (Idempotent):** Thử lại với chính `new_password` để chặn trường hợp device đã đổi pass thành công từ lần flash trước nhưng chưa được flash firmware.
### 4.2 Cơ Chế Dự Phòng Mật Khẩu (Update FW)
Đơn giản hơn: thử `password` chính → nếu `AuthenticationException` → thử `backup_password`. Không có Telnet, không có set_passwd. Nếu cả hai đều fail thì trả về `FAIL` ngay.
### 4.3 Xử Lý Đa Luồng Chống Nghẽn
`paramiko` dễ báo lỗi `[Errno 64] Host is down` hoặc `[Errno 32] Broken Pipe` khi hàng chục thread cùng hit network stack cùng một lúc. Hệ thống chống nghẽn bằng 2 cơ chế:
- **Jitter ngẫu nhiên:** `time.sleep(random.uniform(0.1, 1.5))` phân tán chùm request theo thời gian, tránh tự gây DDoS nội bộ.
- **Retry Hooks:** `_create_ssh_client` retry 3 lần (nghỉ 1s giữa mỗi lần). `_upload_firmware` retry 3 lần (nghỉ 1.5s). Triệt 99% lỗi TCP transient.
### 4.4 RAM Disk SCP
Firmware được upload thẳng vào `/tmp/<tên_file>.bin` — phân vùng RAM ảo của OpenWrt. Lợi ích:
- Tốc độ ghi nhanh nhất (RAM vs Flash).
- Không gây hao mòn FlashNOR của router.
- File biến mất sau reboot, không để lại rác.
### 4.5 Thủ Thuật `sysupgrade -F -v -n`
| Cờ | Tác dụng |
| -------------- | ------------------------------------------------------------------------------------------------------------------------ |
| `-F` (Force) | Bỏ qua kiểm tra Image Metadata — bắt buộc với file build Raw / `tim_uImage` để tránh lỗi _"Image metadata not present"_. |
| `-v` (Verbose) | Log chi tiết vào `/tmp/sysup.log` để debug khi cần. |
| `-n` (No-keep) | Clean Flash — không giữ cấu hình cũ, tránh xung đột config giữa các phiên bản. |
**Đứt kết nối = Thành công:** `sysupgrade` phá kernel hiện tại rồi reboot nạp firmware mới → SSH session bị đứt là kết quả tất yếu và mong muốn. Script bẫy exception này và trả về `"DONE"`. Ngược lại, nếu sysupgrade fail sớm (< 4 giây, còn giữ kết nối), script đọc `/tmp/sysup.log` trả về error code đầy đủ.
---
## 5. Trình Gỡ Lỗi Nhanh
Cả hai luồng đều gọi `status_cb(msg)` tại mỗi bước toàn bộ tiến độ như _"Connecting SSH"_, _"Uploading firmware"_, _"Syncing filesystem"_ hiển thị trực tiếp tại cột **Status** trên giao diện chính.
Để debug chi tiết hơn, chạy app qua terminal:
```bash
./run.sh
```
Mọi exception đều bị bắt trả về chuỗi `"FAIL: <lý do>"` hiển thị lên UI không silent failure.

View File

@@ -1,164 +0,0 @@
"""
OpenWrt LuCI Firmware Flasher (Barrier Breaker 14.07)
Automates the 3-step firmware flash process via LuCI web interface:
Step 1: POST /cgi-bin/luci → username=root&password= → get sysauth cookie + stok
Step 2: POST /cgi-bin/luci/;stok=XXX/admin/system/flashops
→ multipart: image=firmware.bin → get verification page
Step 3: POST /cgi-bin/luci/;stok=XXX/admin/system/flashops
→ step=2&keep= → device flashes and reboots
"""
import requests
import re
import time
import os
def flash_device(ip, firmware_path, username="root", password="",
keep_settings=False, status_cb=None):
"""
Flash firmware to an OpenWrt device via LuCI.
Returns:
"DONE" on success, "FAIL: reason" on error
"""
base_url = f"http://{ip}"
session = requests.Session()
try:
# ═══════════════════════════════════════════
# STEP 1: Login
# ═══════════════════════════════════════════
if status_cb:
status_cb("Logging in...")
# GET login page to detect form field names
login_url = f"{base_url}/cgi-bin/luci"
try:
get_resp = session.get(login_url, timeout=10)
page_html = get_resp.text
except Exception:
page_html = ""
# Barrier Breaker uses "username"/"password"
# Newer LuCI uses "luci_username"/"luci_password"
if 'name="luci_username"' in page_html:
login_data = {"luci_username": username, "luci_password": password}
else:
login_data = {"username": username, "password": password}
resp = session.post(login_url, data=login_data,
timeout=10, allow_redirects=True)
if resp.status_code == 403:
return "FAIL: Login denied (403)"
# Extract stok from response body or URL
stok = None
for source in [resp.url, resp.text]:
match = re.search(r";stok=([a-f0-9]+)", source)
if match:
stok = match.group(1)
break
# Also check redirect history
if not stok:
for hist in resp.history:
match = re.search(r";stok=([a-f0-9]+)",
hist.headers.get("Location", ""))
if match:
stok = match.group(1)
break
# Verify login succeeded
has_cookie = "sysauth" in str(session.cookies)
if not stok and not has_cookie:
return "FAIL: Login failed — no session"
# Build flash URL with stok
if stok:
flash_url = f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops"
else:
flash_url = f"{base_url}/cgi-bin/luci/admin/system/flashops"
# ═══════════════════════════════════════════
# STEP 2: Upload firmware image
# ═══════════════════════════════════════════
if status_cb:
status_cb("Uploading firmware...")
filename = os.path.basename(firmware_path)
with open(firmware_path, "rb") as f:
# Don't send "keep" field = uncheck "Keep settings"
# Send "keep": "on" only if keep_settings is True
data = {}
if keep_settings:
data["keep"] = "on"
resp = session.post(
flash_url,
data=data,
files={"image": (filename, f, "application/octet-stream")},
timeout=300,
)
if resp.status_code != 200:
return f"FAIL: Upload HTTP {resp.status_code}"
resp_lower = resp.text.lower()
# Check for errors
if "invalid image" in resp_lower or "bad image" in resp_lower:
return "FAIL: Invalid firmware image"
if "unsupported" in resp_lower or "not compatible" in resp_lower:
return "FAIL: Firmware not compatible"
# Verify we got the "Flash Firmware - Verify" page
if "verify" not in resp_lower or "proceed" not in resp_lower:
# Still on flash form = upload was ignored
if 'name="image"' in resp.text:
return "FAIL: Upload ignored by server"
return "FAIL: Unexpected response after upload"
# ═══════════════════════════════════════════
# STEP 3: Click "Proceed" to start flash
# ═══════════════════════════════════════════
if status_cb:
status_cb("Confirming (Proceed)...")
# The Proceed form from the verification page:
# <input type="hidden" name="step" value="2" />
# <input type="hidden" name="keep" value="" />
confirm_data = {
"step": "2",
"keep": "",
}
if keep_settings:
confirm_data["keep"] = "on"
try:
session.post(flash_url, data=confirm_data, timeout=300)
except requests.exceptions.ConnectionError:
# Device rebooting — this is expected and means SUCCESS
pass
except requests.exceptions.ReadTimeout:
# Also expected during reboot
pass
if status_cb:
status_cb("Rebooting...")
time.sleep(3)
return "DONE"
except requests.ConnectionError:
return "FAIL: Cannot connect"
except requests.Timeout:
return "DONE (rebooting)"
except Exception as e:
return f"FAIL: {e}"
finally:
session.close()

797
main.py
View File

@@ -11,7 +11,8 @@ from PyQt6.QtWidgets import (
QVBoxLayout, QHBoxLayout, QFileDialog, QTableWidget,
QTableWidgetItem, QLabel, QProgressBar,
QMessageBox, QGroupBox, QHeaderView, QLineEdit,
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy,
QComboBox
)
from PyQt6.QtCore import (
Qt, QThread, pyqtSignal, QPropertyAnimation, QAbstractAnimation,
@@ -21,501 +22,40 @@ from PyQt6.QtGui import QFont, QColor, QIcon, QAction
import datetime
from scanner import scan_network
from flasher import flash_device
from core.scanner import scan_network
from core.workers import ScanThread
from core.flash_new_worker import NewFlashThread
from core.flash_update_worker import UpdateFlashThread
from utils.network import _resolve_hostname, get_default_network
from utils.system import resource_path, get_machine_info, get_version
class CollapsibleGroupBox(QGroupBox):
def __init__(self, title="", parent=None):
super().__init__(title, parent)
self.setCheckable(True)
self.setChecked(True)
self.animation = QPropertyAnimation(self, b"maximumHeight")
self.animation.setDuration(200)
# Connect the toggled signal to our animation function
self.toggled.connect(self._toggle_animation)
from ui.components import CollapsibleGroupBox
from ui.styles import STYLE
self._full_height = 0
def set_content_layout(self, layout):
# We need a wrapper widget to hold the layout
self.content_widget = QWidget()
self.content_widget.setStyleSheet("background-color: transparent;")
self.content_widget.setLayout(layout)
main_layout = QVBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(self.content_widget)
self.setLayout(main_layout)
def _toggle_animation(self, checked):
if not hasattr(self, 'content_widget'):
return
if checked:
# Expand: show content first, then animate
self.content_widget.setVisible(True)
target_height = self.sizeHint().height()
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(target_height)
self.animation.finished.connect(self._on_expand_finished)
self.animation.start()
else:
# Collapse
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(32)
self.animation.finished.connect(self._on_collapse_finished)
self.animation.start()
def _on_expand_finished(self):
# Remove height constraint so content can grow dynamically
self.setMaximumHeight(16777215)
try:
self.animation.finished.disconnect(self._on_expand_finished)
except TypeError:
pass
def _on_collapse_finished(self):
if not self.isChecked():
self.content_widget.setVisible(False)
try:
self.animation.finished.disconnect(self._on_collapse_finished)
except TypeError:
pass
def _resolve_hostname(ip):
"""Reverse DNS lookup for a single IP."""
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ""
def get_local_ip():
"""Get the local IP address of this machine."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "N/A"
def get_default_network(ip):
"""Guess the /24 network from local IP."""
try:
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
except Exception:
return "192.168.1.0/24"
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_machine_info():
"""Collect machine info."""
hostname = socket.gethostname()
local_ip = get_local_ip()
os_info = f"{platform.system()} {platform.release()}"
mac_addr = "N/A"
try:
import uuid
mac = uuid.getnode()
mac_addr = ":".join(
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
)
except Exception:
pass
return {
"hostname": hostname,
"ip": local_ip,
"os": os_info,
"mac": mac_addr,
}
# ── Stylesheet ──────────────────────────────────────────────
STYLE = """
QWidget {
background-color: #1a1b2e;
color: #e2e8f0;
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
font-size: 12px;
}
QGroupBox {
border: 1px solid #2d3748;
border-radius: 8px;
margin-top: 10px;
padding: 20px 8px 6px 8px;
font-weight: bold;
color: #7eb8f7;
background-color: #1e2035;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 14px;
top: 5px;
padding: 0px 8px;
background-color: transparent;
}
QGroupBox::indicator {
width: 14px;
height: 14px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
margin-top: 5px;
}
QGroupBox::indicator:unchecked {
background-color: #13141f;
}
QGroupBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QLabel#title {
font-size: 16px;
font-weight: bold;
color: #7eb8f7;
letter-spacing: 1px;
}
QLabel#info {
color: #94a3b8;
font-size: 11px;
}
QPushButton {
background-color: #2d3352;
border: 1px solid #3d4a6b;
border-radius: 6px;
padding: 4px 12px;
color: #e2e8f0;
font-weight: 600;
min-height: 24px;
}
QPushButton:hover {
background-color: #3d4a6b;
border-color: #7eb8f7;
color: #ffffff;
}
QPushButton:pressed {
background-color: #1a2040;
}
QPushButton#scan {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1a56db, stop:1 #1e66f5);
border-color: #1a56db;
color: #ffffff;
}
QPushButton#scan:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #2563eb, stop:1 #3b82f6);
}
QPushButton#flash {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #15803d, stop:1 #16a34a);
border-color: #15803d;
color: #ffffff;
font-size: 13px;
min-height: 30px;
}
QPushButton#flash:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #16a34a, stop:1 #22c55e);
}
QTableWidget {
background-color: #13141f;
alternate-background-color: #1a1b2e;
border: 1px solid #2d3748;
border-radius: 8px;
gridline-color: #2d3748;
selection-background-color: #2d3a5a;
selection-color: #e2e8f0;
}
QTableWidget::item {
padding: 2px 6px;
border: none;
}
QTableWidget::item:selected {
background-color: #2d3a5a;
color: #7eb8f7;
}
QHeaderView::section {
background-color: #1e2035;
color: #7eb8f7;
border: none;
border-bottom: 2px solid #3b82f6;
border-right: 1px solid #2d3748;
padding: 4px 6px;
font-weight: bold;
font-size: 11px;
letter-spacing: 0.5px;
text-transform: uppercase;
}
QHeaderView::section:last {
border-right: none;
}
QProgressBar {
border: 1px solid #2d3748;
border-radius: 6px;
text-align: center;
background-color: #13141f;
color: #e2e8f0;
height: 20px;
font-size: 11px;
font-weight: 600;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #3b82f6, stop:1 #7eb8f7);
border-radius: 7px;
}
QProgressBar#scan_bar {
border: 1px solid #374151;
border-radius: 5px;
text-align: center;
background-color: #13141f;
color: #fbbf24;
height: 16px;
font-size: 10px;
font-weight: 600;
}
QProgressBar#scan_bar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #d97706, stop:1 #fbbf24);
border-radius: 4px;
}
QLineEdit {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 8px;
padding: 7px 12px;
color: #e2e8f0;
selection-background-color: #2d3a5a;
}
QLineEdit:focus {
border-color: #3b82f6;
background-color: #161727;
}
QScrollBar:vertical {
background: #13141f;
width: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:vertical {
background: #3d4a6b;
border-radius: 5px;
min-height: 30px;
}
QScrollBar::handle:vertical:hover {
background: #7eb8f7;
}
QScrollBar::handle:vertical:pressed {
background: #3b82f6;
}
QScrollBar::add-line:vertical,
QScrollBar::sub-line:vertical {
height: 0px;
background: transparent;
}
QScrollBar::add-page:vertical,
QScrollBar::sub-page:vertical {
background: transparent;
}
QScrollBar:horizontal {
background: #13141f;
height: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:horizontal {
background: #3d4a6b;
border-radius: 5px;
min-width: 30px;
}
QScrollBar::handle:horizontal:hover {
background: #7eb8f7;
}
QScrollBar::handle:horizontal:pressed {
background: #3b82f6;
}
QScrollBar::add-line:horizontal,
QScrollBar::sub-line:horizontal {
width: 0px;
background: transparent;
}
QScrollBar::add-page:horizontal,
QScrollBar::sub-page:horizontal {
background: transparent;
}
QCheckBox {
spacing: 6px;
color: #94a3b8;
font-size: 12px;
}
QCheckBox::indicator {
width: 16px;
height: 16px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
}
QCheckBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QCheckBox::indicator:hover {
border-color: #7eb8f7;
}
"""
class ScanThread(QThread):
"""Run network scan in a background thread so UI doesn't freeze."""
finished = pyqtSignal(list)
error = pyqtSignal(str)
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
stage = pyqtSignal(str) # current scan phase
def __init__(self, network):
super().__init__()
self.network = network
def run(self):
try:
def on_ping_progress(done, total):
self.scan_progress.emit(done, total)
def on_stage(s):
self.stage.emit(s)
results = scan_network(self.network,
progress_cb=on_ping_progress,
stage_cb=on_stage)
# Resolve hostnames in parallel
self.stage.emit("hostname")
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_dev = {
executor.submit(_resolve_hostname, d["ip"]): d
for d in results
}
for future in as_completed(future_to_dev):
dev = future_to_dev[future]
try:
dev["name"] = future.result(timeout=3)
except Exception:
dev["name"] = ""
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class FlashThread(QThread):
"""Run firmware flash in background so UI stays responsive."""
device_status = pyqtSignal(int, str) # index, status message
device_done = pyqtSignal(int, str) # index, result
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10):
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
def run(self):
def _flash_one(i, dev):
try:
def on_status(msg):
self.device_status.emit(i, msg)
result = flash_device(
dev["ip"], self.firmware_path,
status_cb=on_status
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
# Use configured max_workers (0 = unlimited = one per device)
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i, dev in enumerate(self.devices):
futures.append(executor.submit(_flash_one, i, dev))
for f in futures:
f.result()
self.all_done.emit()
class App(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("MiraV3 Firmware Loader")
self.setWindowTitle(f"Mira Firmware Loader v{get_version()}")
self.setWindowIcon(QIcon(resource_path("icon.ico")))
# Data state
self.local_ip = ""
self.gateway_ip = ""
self.firmware = None
self.all_devices = [] # all scan results (unfiltered)
self.devices = [] # currently displayed (filtered)
self.all_devices = [] # raw list from scanner
self.devices = [] # filtered list for table
self.flashed_macs = {} # MAC addresses flashed successfully in session (MAC -> timestamp)
self.scan_thread = None
info = get_machine_info()
@@ -527,12 +67,12 @@ class App(QWidget):
layout.setContentsMargins(8, 6, 8, 6)
# ── Title ──
title = QLabel("⚡ MiraV3 Firmware Loader")
title = QLabel("⚡ Mira Firmware Loader")
title.setObjectName("title")
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(title)
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v1.0.0")
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v{get_version()}")
copyright_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
copyright_label.setStyleSheet(
"color: #9399b2; font-size: 11px; font-weight: 500; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto;"
@@ -593,11 +133,11 @@ class App(QWidget):
self.net_input.setPlaceholderText("e.g. 192.168.4.0/24")
net_row.addWidget(self.net_input, 1)
btn_scan = QPushButton("🔍 Scan LAN")
btn_scan.setObjectName("scan")
btn_scan.clicked.connect(self.scan)
btn_scan.setFixedWidth(110)
net_row.addWidget(btn_scan)
self.btn_scan = QPushButton("🔍 Scan LAN")
self.btn_scan.setObjectName("scan")
self.btn_scan.clicked.connect(self.scan)
self.btn_scan.setFixedWidth(110)
net_row.addWidget(self.btn_scan)
scan_layout.addLayout(net_row)
self.scan_progress_bar = QProgressBar()
@@ -621,14 +161,20 @@ class App(QWidget):
dev_layout.setContentsMargins(4, 4, 4, 4)
self.table = QTableWidget()
self.table.setColumnCount(5)
self.table.setHorizontalHeaderLabels(["", "IP", "Name", "MAC", "Status"])
self.table.setColumnCount(4)
self.table.setHorizontalHeaderLabels(["", "IP", "MAC", "Status"])
self.table.setAlternatingRowColors(True)
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 40)
for col in range(1, 5):
header.setSectionResizeMode(col, QHeaderView.ResizeMode.Stretch)
# IP and MAC can be fixed/stretch, Status to stretch to cover full info
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Interactive)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
self.table.verticalHeader().setVisible(False)
self.table.setSelectionBehavior(
QTableWidget.SelectionBehavior.SelectRows
@@ -641,6 +187,11 @@ class App(QWidget):
filter_row.addWidget(self.device_count_label)
filter_row.addStretch()
btn_history = QPushButton("📋 Flash History")
btn_history.clicked.connect(self._show_history)
btn_history.setStyleSheet("background-color: #313244; color: #cdd6f4;")
filter_row.addWidget(btn_history)
btn_select_all = QPushButton("☑ Select All")
btn_select_all.clicked.connect(self._select_all_devices)
filter_row.addWidget(btn_select_all)
@@ -659,32 +210,139 @@ class App(QWidget):
layout.addWidget(dev_group, stretch=1)
# ── Flash Controls ──
flash_group = QGroupBox("🚀 Flash")
flash_group = QGroupBox("🚀 Flash Controls")
flash_layout = QVBoxLayout()
flash_layout.setSpacing(4)
flash_layout.setContentsMargins(4, 4, 4, 4)
flash_layout.setSpacing(12)
flash_layout.setContentsMargins(10, 15, 10, 12)
self.progress = QProgressBar()
self.progress.setFormat("%v / %m devices (%p%)")
self.progress.setMinimumHeight(22)
flash_layout.addWidget(self.progress)
# Mode selector row (Nạp mới vs Update)
mode_row = QHBoxLayout()
mode_lbl = QLabel("Flash Mode:")
mode_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
mode_row.addWidget(mode_lbl)
self.mode_combo = QComboBox()
self.mode_combo.addItem("⚡ New Flash (Raw / Factory Reset)", "new")
self.mode_combo.addItem("🔄 Update Firmware (Pre-installed)", "update")
self.mode_combo.setMinimumWidth(380)
self.mode_combo.setMinimumHeight(35)
self.mode_combo.setStyleSheet("""
QComboBox {
background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 4px;
padding: 4px 10px; color: #ffffff; font-size: 14px; font-weight: bold;
}
""")
self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
mode_row.addWidget(self.mode_combo)
mode_row.addStretch()
flash_layout.addLayout(mode_row)
# Container cho cấu hình tuỳ chọn "Nạp Mới FW"
self.method_container = QWidget()
method_layout = QVBoxLayout(self.method_container)
method_layout.setContentsMargins(0, 0, 0, 0)
method_layout.setSpacing(10)
# Method selector row
method_row = QHBoxLayout()
method_lbl = QLabel("Method:")
method_lbl.setStyleSheet("font-size: 13px; font-weight: bold;")
method_row.addWidget(method_lbl)
self.method_combo = QComboBox()
self.method_combo.addItem("🌐 API (LuCI - Recommended)", "api")
self.method_combo.addItem("💻 SSH (paramiko/scp)", "ssh")
self.method_combo.setMinimumWidth(220)
self.method_combo.setMinimumHeight(32)
self.method_combo.setStyleSheet("""
QComboBox {
background-color: #1e1e2e; border: 1px solid #3d4a6b; border-radius: 4px;
padding: 4px 10px; color: #ffffff; font-size: 13px; font-weight: bold;
}
""")
self.method_combo.currentIndexChanged.connect(self._on_method_changed)
method_row.addWidget(self.method_combo)
method_row.addStretch()
method_layout.addLayout(method_row)
# SSH credentials (hidden by default)
self.ssh_creds_widget = QWidget()
self.ssh_creds_widget.setStyleSheet("background-color: #11121d; border-radius: 6px; border: 1px dashed #3d4a6b;")
ssh_creds_layout = QVBoxLayout(self.ssh_creds_widget)
ssh_creds_layout.setContentsMargins(15, 12, 15, 12)
ssh_creds_layout.setSpacing(10)
ssh_row1 = QHBoxLayout()
ssh_lbl1 = QLabel("SSH User:")
ssh_lbl1.setStyleSheet("font-size: 12px; font-weight: bold;")
ssh_row1.addWidget(ssh_lbl1)
self.ssh_user_input = QLineEdit("root")
self.ssh_user_input.setFixedWidth(130)
str_qlineedit = "QLineEdit { background-color: #1a1b2e; font-size: 13px; padding: 4px; border: 1px solid #3d4a6b; color: #ffffff; }"
self.ssh_user_input.setStyleSheet(str_qlineedit)
ssh_row1.addWidget(self.ssh_user_input)
ssh_row1.addSpacing(25)
ssh_lbl2 = QLabel("SSH Password:")
ssh_lbl2.setStyleSheet("font-size: 12px; font-weight: bold;")
ssh_row1.addWidget(ssh_lbl2)
self.ssh_pass_input = QLineEdit("admin123a")
self.ssh_pass_input.setEchoMode(QLineEdit.EchoMode.Password)
self.ssh_pass_input.setFixedWidth(130)
self.ssh_pass_input.setStyleSheet(str_qlineedit)
ssh_row1.addWidget(self.ssh_pass_input)
ssh_row1.addStretch()
ssh_creds_layout.addLayout(ssh_row1)
self.set_passwd_cb = QCheckBox("Set password before flash (passwd → admin123a)")
self.set_passwd_cb.setChecked(True)
self.set_passwd_cb.setStyleSheet("color: #94a3b8; font-size: 12px; font-weight: bold;")
ssh_creds_layout.addWidget(self.set_passwd_cb)
self.ssh_creds_widget.setVisible(False)
method_layout.addWidget(self.ssh_creds_widget)
flash_layout.addWidget(self.method_container)
# Warning UI for Update Mode
self.update_warning_lbl = QLabel("⚠️ FW Update Mode: Forced to SSH. Target IP [192.168.11.102]. Other IPs require confirmation.")
self.update_warning_lbl.setStyleSheet("color: #f38ba8; font-size: 13px; font-weight: bold; padding: 8px; border: 1px dotted #f38ba8;")
self.update_warning_lbl.setWordWrap(True)
self.update_warning_lbl.setVisible(False)
flash_layout.addWidget(self.update_warning_lbl)
# Parallel count row
parallel_row = QHBoxLayout()
parallel_row.addWidget(QLabel("Concurrent devices:"))
parallel_lbl = QLabel("Concurrent devices:")
parallel_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
parallel_row.addWidget(parallel_lbl)
self.parallel_spin = QSpinBox()
self.parallel_spin.setRange(0, 100)
self.parallel_spin.setValue(10)
self.parallel_spin.setSpecialValueText("Unlimited")
self.parallel_spin.setSpecialValueText("0 (Unlimited)")
self.parallel_spin.setToolTip("0 = unlimited (all devices at once)")
self.parallel_spin.setFixedWidth(80)
self.parallel_spin.setMinimumWidth(160)
self.parallel_spin.setMinimumHeight(35)
self.parallel_spin.setStyleSheet("""
QSpinBox { font-size: 15px; font-weight: bold; text-align: center; }
""")
parallel_row.addWidget(self.parallel_spin)
parallel_row.addStretch()
flash_layout.addLayout(parallel_row)
btn_flash = QPushButton(" Flash Selected Devices")
btn_flash.setObjectName("flash")
btn_flash.clicked.connect(self.flash_all)
flash_layout.addWidget(btn_flash)
self.btn_flash = QPushButton("FLASH SELECTED DEVICES")
self.btn_flash.setObjectName("flash")
self.btn_flash.setMinimumHeight(45)
self.btn_flash.setStyleSheet("QPushButton#flash { font-size: 16px; font-weight: bold; letter-spacing: 1px; }")
self.btn_flash.clicked.connect(self.flash_all)
flash_layout.addWidget(self.btn_flash)
flash_group.setLayout(flash_layout)
layout.addWidget(flash_group)
@@ -724,27 +382,42 @@ class App(QWidget):
self.table.setRowCount(len(self.devices))
for i, d in enumerate(self.devices):
mac_str = d["mac"].upper()
# Checkbox column
cb_item = QTableWidgetItem()
cb_item.setCheckState(Qt.CheckState.Checked)
# If device MAC is already flashed, mark it
is_already_flashed = mac_str in self.flashed_macs
if is_already_flashed and d["status"] in ["READY", ""]:
d["status"] = "Already Flashed"
# Checkbox logic
if is_already_flashed:
cb_item.setCheckState(Qt.CheckState.Unchecked)
else:
cb_item.setCheckState(Qt.CheckState.Checked)
cb_item.setFlags(cb_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
self.table.setItem(i, 0, cb_item)
ip_item = QTableWidgetItem(d["ip"])
name_item = QTableWidgetItem(d.get("name", ""))
mac_item = QTableWidgetItem(d["mac"].upper())
mac_item = QTableWidgetItem(mac_str)
status_item = QTableWidgetItem(d["status"])
if is_already_flashed:
status_item.setForeground(QColor("#a6e3a1"))
ip_item.setForeground(QColor("#a6e3a1"))
# Dim gateway & self if showing all
if d["ip"] in {self.local_ip, self.gateway_ip}:
for item in [ip_item, name_item, mac_item, status_item]:
for item in [ip_item, mac_item, status_item]:
item.setForeground(QColor("#4a5568"))
cb_item.setCheckState(Qt.CheckState.Unchecked)
self.table.setItem(i, 1, ip_item)
self.table.setItem(i, 2, name_item)
self.table.setItem(i, 3, mac_item)
self.table.setItem(i, 4, status_item)
self.table.setItem(i, 2, mac_item)
self.table.setItem(i, 3, status_item)
total = len(self.devices)
hidden = len(self.all_devices) - total
@@ -756,9 +429,12 @@ class App(QWidget):
def _select_all_devices(self):
"""Check all device checkboxes."""
for i in range(self.table.rowCount()):
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Checked)
mac = self.table.item(i, 2).text().strip()
# Prevent checking if already flashed or is a gateway (optional but good UI logic)
if mac not in self.flashed_macs:
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Checked)
def _deselect_all_devices(self):
"""Uncheck all device checkboxes."""
@@ -766,6 +442,21 @@ class App(QWidget):
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Unchecked)
def _show_history(self):
"""Show list of successfully flashed MACs this session."""
if not self.flashed_macs:
QMessageBox.information(self, "Flash History", "No successful flashes during this session.")
else:
# Sort by MAC and format with timestamp
history_lines = []
for mac in sorted(self.flashed_macs.keys()):
time_str = self.flashed_macs[mac]
history_lines.append(f"[{time_str}] {mac}")
macs_str = "\n".join(history_lines)
msg = f"Successfully flashed devices in this session ({len(self.flashed_macs)}):\n\n{macs_str}"
QMessageBox.information(self, "Flash History", msg)
# ── Actions ──
@@ -801,6 +492,7 @@ class App(QWidget):
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Starting...")
self.scan_progress_bar.setVisible(True)
self.btn_scan.setEnabled(False)
self.scan_thread = ScanThread(network)
self.scan_thread.finished.connect(self._on_scan_done)
@@ -811,6 +503,7 @@ class App(QWidget):
def _on_scan_done(self, results):
self.scan_progress_bar.setVisible(False)
self.btn_scan.setEnabled(True)
self.all_devices = []
for dev in results:
@@ -840,10 +533,7 @@ class App(QWidget):
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" ARP broadcast scan...")
self.scan_status.setText("⏳ Running ARP broadcast scan...")
elif stage == "hostname":
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Resolving hostnames...")
self.scan_status.setText("⏳ Resolving hostnames...")
def _on_scan_progress(self, done, total):
self.scan_progress_bar.setRange(0, total)
@@ -853,12 +543,28 @@ class App(QWidget):
def _on_scan_error(self, error_msg):
self.scan_progress_bar.setVisible(False)
self.btn_scan.setEnabled(True)
self.scan_status.setText("❌ Scan failed")
self.scan_status.setStyleSheet("color: #f38ba8;")
QMessageBox.critical(
self, "Scan Error", f"Failed to scan network:\n{error_msg}"
)
def _on_mode_changed(self, index):
"""Show/hide method based on selected mode."""
mode = self.mode_combo.currentData()
if mode == "update":
self.method_container.setVisible(False)
self.update_warning_lbl.setVisible(True)
else:
self.method_container.setVisible(True)
self.update_warning_lbl.setVisible(False)
self._on_method_changed(self.method_combo.currentIndex())
def _on_method_changed(self, index):
"""SSH credentials are always hidden; credentials are hardcoded."""
self.ssh_creds_widget.setVisible(False)
def _get_selected_devices(self):
"""Return list of (table_row_index, device_dict) for checked devices."""
selected = []
@@ -905,20 +611,64 @@ class App(QWidget):
self._flash_row_map[idx] = row
flash_devices.append(dev)
# Run flashing in background thread so UI doesn't freeze
self.flash_thread = FlashThread(
flash_devices, self.firmware,
max_workers=self.parallel_spin.value()
)
# Determine flash method and SSH credentials
mode = self.mode_combo.currentData()
if mode == "update":
# Hỏi xác nhận nếu có device nào khác 192.168.11.102
strange_ips = [dev["ip"] for _, dev in selected if dev["ip"] != "192.168.11.102"]
if strange_ips:
msg = (f"⚠️ Detected IP(s) other than [192.168.11.102] in your selection:\n"
f"{', '.join(strange_ips[:3])}{'...' if len(strange_ips) > 3 else ''}\n\n"
f"Are you SURE you want to update firmware on these devices?")
reply = QMessageBox.question(
self, "Confirm Update Target", msg,
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.No:
return
method = "ssh"
ssh_user = "root"
ssh_password = "admin123a"
ssh_backup_password = "admin"
set_passwd = False
else:
method = self.method_combo.currentData()
ssh_user = "root"
ssh_password = "admin123a"
ssh_backup_password = "admin123a"
set_passwd = True if method == "ssh" else False
# Chọn đúng worker theo mode và chạy trong background thread
max_w = self.parallel_spin.value()
if mode == "update":
self.flash_thread = UpdateFlashThread(
flash_devices, self.firmware,
max_workers=max_w,
)
else:
self.flash_thread = NewFlashThread(
flash_devices, self.firmware,
max_workers=max_w,
method=method,
ssh_user=ssh_user,
ssh_password=ssh_password,
ssh_backup_password=ssh_backup_password,
set_passwd=set_passwd,
)
self.flash_thread.device_status.connect(self._on_flash_status)
self.flash_thread.device_done.connect(self._on_flash_done)
self.flash_thread.all_done.connect(self._on_flash_all_done)
# Disable flash button trong khi đang flash
self.btn_flash.setEnabled(False)
self.flash_thread.start()
def _on_flash_status(self, index, msg):
"""Update status column while flashing."""
row = self._flash_row_map.get(index, index)
self.table.setItem(row, 4, QTableWidgetItem(f"{msg}"))
self.table.setItem(row, 3, QTableWidgetItem(f"{msg}"))
def _on_flash_done(self, index, result):
"""One device finished flashing."""
@@ -926,6 +676,14 @@ class App(QWidget):
if result.startswith("DONE"):
item = QTableWidgetItem(f"{result}")
item.setForeground(QColor("#a6e3a1"))
# Save MAC to history with current timestamp
mac_item = self.table.item(row, 2)
if mac_item:
import datetime
now_str = datetime.datetime.now().strftime("%H:%M:%S")
self.flashed_macs[mac_item.text().strip()] = now_str
# Auto-uncheck so it won't be flashed again
cb = self.table.item(row, 0)
if cb:
@@ -933,11 +691,12 @@ class App(QWidget):
else:
item = QTableWidgetItem(f"{result}")
item.setForeground(QColor("#f38ba8"))
self.table.setItem(row, 4, item)
self.table.setItem(row, 3, item)
self.progress.setValue(self.progress.value() + 1)
def _on_flash_all_done(self):
"""All flashing complete."""
self.btn_flash.setEnabled(True)
QMessageBox.information(self, "Flash Complete", "All devices have been processed.")

View File

@@ -1,3 +1,5 @@
PyQt6
scapy
requests
paramiko
scp

11
run.bat
View File

@@ -3,14 +3,15 @@ REM IoT Firmware Loader - Windows launcher
cd /d "%~dp0"
if exist "venv" (
call venv\Scripts\activate.bat
) else (
if not exist "venv" (
echo Creating virtual environment...
python -m venv venv
call venv\Scripts\activate.bat
pip install PyQt6 scapy requests
)
call venv\Scripts\activate.bat
echo Checking and installing required packages...
pip install -r requirements.txt --quiet
echo Starting IoT Firmware Loader...
python main.py

64
ui/components.py Normal file
View File

@@ -0,0 +1,64 @@
from PyQt6.QtWidgets import QGroupBox, QWidget, QVBoxLayout
from PyQt6.QtCore import QPropertyAnimation
class CollapsibleGroupBox(QGroupBox):
def __init__(self, title="", parent=None):
super().__init__(title, parent)
self.setCheckable(True)
self.setChecked(True)
self.animation = QPropertyAnimation(self, b"maximumHeight")
self.animation.setDuration(200)
# Connect the toggled signal to our animation function
self.toggled.connect(self._toggle_animation)
self._full_height = 0
def set_content_layout(self, layout):
# We need a wrapper widget to hold the layout
self.content_widget = QWidget()
self.content_widget.setStyleSheet("background-color: transparent;")
self.content_widget.setLayout(layout)
main_layout = QVBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(self.content_widget)
self.setLayout(main_layout)
def _toggle_animation(self, checked):
if not hasattr(self, 'content_widget'):
return
if checked:
# Expand: show content first, then animate
self.content_widget.setVisible(True)
target_height = self.sizeHint().height()
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(target_height)
self.animation.finished.connect(self._on_expand_finished)
self.animation.start()
else:
# Collapse
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(32)
self.animation.finished.connect(self._on_collapse_finished)
self.animation.start()
def _on_expand_finished(self):
# Remove height constraint so content can grow dynamically
self.setMaximumHeight(16777215)
try:
self.animation.finished.disconnect(self._on_expand_finished)
except TypeError:
pass
def _on_collapse_finished(self):
if not self.isChecked():
self.content_widget.setVisible(False)
try:
self.animation.finished.disconnect(self._on_collapse_finished)
except TypeError:
pass

280
ui/styles.py Normal file
View File

@@ -0,0 +1,280 @@
STYLE = """
QWidget {
background-color: #1a1b2e;
color: #e2e8f0;
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
font-size: 12px;
}
QGroupBox {
border: 1px solid #2d3748;
border-radius: 8px;
margin-top: 10px;
padding: 20px 8px 6px 8px;
font-weight: bold;
color: #7eb8f7;
background-color: #1e2035;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 14px;
top: 5px;
padding: 0px 8px;
background-color: transparent;
}
QGroupBox::indicator {
width: 14px;
height: 14px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
margin-top: 5px;
}
QGroupBox::indicator:unchecked {
background-color: #13141f;
}
QGroupBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QLabel {
background-color: transparent;
}
QLabel#title {
font-size: 16px;
font-weight: bold;
color: #7eb8f7;
letter-spacing: 1px;
}
QLabel#info {
color: #94a3b8;
font-size: 11px;
}
QPushButton {
background-color: #2d3352;
border: 1px solid #3d4a6b;
border-radius: 6px;
padding: 4px 12px;
color: #e2e8f0;
font-weight: 600;
min-height: 24px;
}
QPushButton:hover {
background-color: #3d4a6b;
border-color: #7eb8f7;
color: #ffffff;
}
QPushButton:pressed {
background-color: #1a2040;
}
QPushButton#scan {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1a56db, stop:1 #1e66f5);
border-color: #1a56db;
color: #ffffff;
}
QPushButton#scan:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #2563eb, stop:1 #3b82f6);
}
QPushButton#flash {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #15803d, stop:1 #16a34a);
border-color: #15803d;
color: #ffffff;
font-size: 13px;
min-height: 30px;
}
QPushButton#flash:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #16a34a, stop:1 #22c55e);
}
QTableWidget {
background-color: #13141f;
alternate-background-color: #1a1b2e;
border: 1px solid #2d3748;
border-radius: 8px;
gridline-color: #2d3748;
selection-background-color: #2d3a5a;
selection-color: #e2e8f0;
}
QTableWidget::item {
padding: 2px 6px;
border: none;
}
QTableWidget::item:selected {
background-color: #2d3a5a;
color: #7eb8f7;
}
QHeaderView::section {
background-color: #1e2035;
color: #7eb8f7;
border: none;
border-bottom: 2px solid #3b82f6;
border-right: 1px solid #2d3748;
padding: 4px 6px;
font-weight: bold;
font-size: 11px;
letter-spacing: 0.5px;
text-transform: uppercase;
}
QHeaderView::section:last {
border-right: none;
}
QProgressBar {
border: 1px solid #2d3748;
border-radius: 6px;
text-align: center;
background-color: #13141f;
color: #e2e8f0;
height: 20px;
font-size: 11px;
font-weight: 600;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #3b82f6, stop:1 #7eb8f7);
border-radius: 7px;
}
QProgressBar#scan_bar {
border: 1px solid #374151;
border-radius: 5px;
text-align: center;
background-color: #13141f;
color: #fbbf24;
height: 16px;
font-size: 10px;
font-weight: 600;
}
QProgressBar#scan_bar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #d97706, stop:1 #fbbf24);
border-radius: 4px;
}
QLineEdit {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 8px;
padding: 7px 12px;
color: #e2e8f0;
selection-background-color: #2d3a5a;
}
QLineEdit:focus {
border-color: #3b82f6;
background-color: #161727;
}
QScrollBar:vertical {
background: #13141f;
width: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:vertical {
background: #3d4a6b;
border-radius: 5px;
min-height: 30px;
}
QScrollBar::handle:vertical:hover {
background: #7eb8f7;
}
QScrollBar::handle:vertical:pressed {
background: #3b82f6;
}
QScrollBar::add-line:vertical,
QScrollBar::sub-line:vertical {
height: 0px;
background: transparent;
}
QScrollBar::add-page:vertical,
QScrollBar::sub-page:vertical {
background: transparent;
}
QScrollBar:horizontal {
background: #13141f;
height: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:horizontal {
background: #3d4a6b;
border-radius: 5px;
min-width: 30px;
}
QScrollBar::handle:horizontal:hover {
background: #7eb8f7;
}
QScrollBar::handle:horizontal:pressed {
background: #3b82f6;
}
QScrollBar::add-line:horizontal,
QScrollBar::sub-line:horizontal {
width: 0px;
background: transparent;
}
QScrollBar::add-page:horizontal,
QScrollBar::sub-page:horizontal {
background: transparent;
}
QCheckBox {
background-color: transparent;
spacing: 6px;
color: #94a3b8;
font-size: 12px;
}
QCheckBox::indicator {
width: 16px;
height: 16px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
}
QCheckBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QCheckBox::indicator:hover {
border-color: #7eb8f7;
}
"""

27
utils/network.py Normal file
View File

@@ -0,0 +1,27 @@
import socket
def _resolve_hostname(ip):
"""Reverse DNS lookup for a single IP."""
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ""
def get_local_ip():
"""Get the local IP address of this machine."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "N/A"
def get_default_network(ip):
"""Guess the /24 network from local IP."""
try:
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
except Exception:
return "192.168.1.0/24"

44
utils/system.py Normal file
View File

@@ -0,0 +1,44 @@
import os
import sys
import platform
import socket
from utils.network import get_local_ip
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_machine_info():
"""Collect machine info."""
hostname = socket.gethostname()
local_ip = get_local_ip()
os_info = f"{platform.system()} {platform.release()}"
mac_addr = "N/A"
try:
import uuid
mac = uuid.getnode()
mac_addr = ":".join(
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
)
except Exception:
pass
return {
"hostname": hostname,
"ip": local_ip,
"os": os_info,
"mac": mac_addr,
}
def get_version():
"""Read version from version.txt"""
try:
with open(resource_path('version.txt'), 'r', encoding='utf-8') as f:
return f.read().strip()
except Exception:
return "Unknown"

1
version.txt Normal file
View File

@@ -0,0 +1 @@
1.1.2