Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1c1fbb7f92 | |||
| ef363ac61d | |||
| f8ce6f5831 | |||
| 2c2a78d27c | |||
| ada3440ebc |
BIN
2023-11-03.webp
BIN
2023-11-03.webp
Binary file not shown.
|
Before Width: | Height: | Size: 7.4 KiB |
Binary file not shown.
34
README.md
34
README.md
@@ -8,11 +8,20 @@ Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng lo
|
||||
|
||||
## 📁 Cấu trúc dự án
|
||||
|
||||
```
|
||||
```text
|
||||
iot_fw_loader/
|
||||
├── main.py # UI chính (PyQt6) + Điều phối luồng và xử lý đa luồng
|
||||
├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
|
||||
├── flasher.py # Upload firmware và tự động hóa qua giao diện OpenWrt LuCI
|
||||
├── main.py # UI chính (PyQt6)
|
||||
├── core/ # Các thành phần cốt lõi và xử lý đa luồng
|
||||
│ ├── workers.py # Quản lý luồng dùng chung (ScanThread, FlashThread)
|
||||
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
|
||||
│ ├── flasher.py # Flash firmware và tự động hóa qua OpenWrt LuCI bằng API
|
||||
│ └── ssh_flasher.py # Load/Update firmware qua đường dẫn SSH
|
||||
├── ui/ # Các component thiết kế giao diện
|
||||
│ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox, etc.)
|
||||
│ └── styles.py # Các cấu hình Stylesheet
|
||||
├── utils/ # Các hàm helper tiện ích
|
||||
│ ├── network.py # Các hàm xử lý IP, Hostname
|
||||
│ └── system.py # Các hàm lấy thông tin máy và resources
|
||||
├── run.sh # Script khởi chạy (macOS/Linux)
|
||||
├── run.bat # Script khởi chạy (Windows)
|
||||
├── build_windows.bat # Script đóng gói thành file .exe độc lập (Windows)
|
||||
@@ -27,7 +36,7 @@ iot_fw_loader/
|
||||
|
||||
- Python 3.9+
|
||||
- Các thư viện: `PyQt6`, `scapy`, `requests`, `pyinstaller` (để build).
|
||||
- *Trên Windows:* Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
|
||||
- _Trên Windows:_ Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
|
||||
|
||||
### Khởi chạy nhanh (Môi trường Dev)
|
||||
|
||||
@@ -46,6 +55,7 @@ run.bat
|
||||
> Script tự tạo `venv` và cài dependencies nếu chưa có.
|
||||
|
||||
### 📦 Build ra file chạy độc lập (.exe) cho Windows
|
||||
|
||||
Chạy script sau để tự động đóng gói ứng dụng thành 1 file `.exe` duy nhất (không cần cài Python trên máy đích):
|
||||
|
||||
```bat
|
||||
@@ -58,7 +68,7 @@ File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`.
|
||||
|
||||
## 🏗 Kiến trúc hệ thống
|
||||
|
||||
```
|
||||
```text
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ main.py (UI) │
|
||||
│ ┌───────────┐ ┌───────────┐ ┌────────────────────────────┐ │
|
||||
@@ -99,7 +109,7 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
|
||||
1. **Ping Sweep:** Gửi gói tin Ping song song (tối đa 50 threads) để đánh thức thiết bị và điền IP/MAC vào bảng ARP Cache của hệ điều hành.
|
||||
2. **ARP Table Fallback:** Đọc bảng ARP nội bộ của OS (`arp -a`) bằng Regex. Hoạt động đa nền tảng (Windows/macOS/Linux) mà không cần quyền Admin/Root.
|
||||
3. **Scapy ARP (Tính năng nâng cao):** Gửi các gói tin ARP Broadcast trực tiếp để đảm bảo bao phủ gap nếu có. Yêu cầu quyền Root trên Linux/macOS hoặc Npcap trên Windows.
|
||||
*Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị.*
|
||||
_Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị._
|
||||
|
||||
### 2. Giao diện thiết bị (Device Table)
|
||||
|
||||
@@ -110,11 +120,11 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
|
||||
|
||||
Từ phiên bản hiện tại, phương thức ESP32 OTA không còn được áp dụng, thay vào đó module `flasher.py` tự động hóa quá trình update của router **OpenWrt (Barrier Breaker 14.07)**:
|
||||
|
||||
* **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
|
||||
* **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
|
||||
* **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
|
||||
* **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
|
||||
* 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
|
||||
- **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
|
||||
- **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
|
||||
- **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
|
||||
- **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
|
||||
- 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -22,12 +22,12 @@ call venv\Scripts\activate.bat
|
||||
|
||||
REM 3. Cai dependencies + PyInstaller
|
||||
echo [2/4] Installing dependencies...
|
||||
pip install PyQt6 scapy requests pyinstaller --quiet
|
||||
pip install -r requirements.txt pyinstaller --quiet
|
||||
|
||||
REM 4. Build .exe
|
||||
echo [3/4] Building executable...
|
||||
pyinstaller ^
|
||||
--name "MiraV3_Firmware_Loader" ^
|
||||
--name "Mira_Firmware_Loader_v1.1.0" ^
|
||||
--icon "icon.ico" ^
|
||||
--add-data "icon.ico;." ^
|
||||
--onefile ^
|
||||
@@ -47,11 +47,11 @@ echo.
|
||||
echo [4/4] Build complete!
|
||||
echo.
|
||||
|
||||
if exist "dist\MiraV3_Firmware_Loader.exe" (
|
||||
echo ✅ SUCCESS: dist\MiraV3_Firmware_Loader.exe
|
||||
if exist "dist\Mira_Firmware_Loader_v1.1.0.exe" (
|
||||
echo SUCCESS: dist\Mira_Firmware_Loader_v1.1.0.exe
|
||||
echo.
|
||||
echo File size:
|
||||
for %%A in ("dist\MiraV3_Firmware_Loader.exe") do echo %%~zA bytes
|
||||
for %%A in ("dist\Mira_Firmware_Loader_v1.1.0.exe") do echo %%~zA bytes
|
||||
echo.
|
||||
echo Ban co the copy file .exe nay sang may khac va chay truc tiep.
|
||||
) else (
|
||||
|
||||
@@ -74,7 +74,7 @@ def _ping_sweep(network, progress_cb=None):
|
||||
if progress_cb:
|
||||
progress_cb(done_count[0], total)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=50) as executor:
|
||||
with ThreadPoolExecutor(max_workers=70) as executor:
|
||||
futures = [executor.submit(_ping_and_track, ip) for ip in hosts]
|
||||
for f in as_completed(futures):
|
||||
pass
|
||||
313
core/ssh_flasher.py
Normal file
313
core/ssh_flasher.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""
|
||||
SSH-based Firmware Flasher for OpenWrt Devices
|
||||
|
||||
Replicates the flash.ps1 logic using Python paramiko/scp:
|
||||
1. Auto-accept SSH host key
|
||||
2. Set device password via `passwd` command
|
||||
3. Upload firmware via SCP to /tmp/
|
||||
4. Verify, sync, and flash via sysupgrade
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import paramiko
|
||||
from scp import SCPClient
|
||||
import random
|
||||
|
||||
|
||||
def _create_ssh_client(ip, user, password, timeout=15):
|
||||
"""Create an SSH client with auto-accept host key policy, including retries."""
|
||||
for attempt in range(3):
|
||||
try:
|
||||
client = paramiko.SSHClient()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
client.connect(ip, username=user, password=password, timeout=timeout,
|
||||
look_for_keys=False, allow_agent=False)
|
||||
return client
|
||||
except Exception as e:
|
||||
if attempt == 2:
|
||||
raise e
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
import socket
|
||||
|
||||
|
||||
class _SimpleTelnet:
|
||||
"""Minimal Telnet client using raw sockets (replaces telnetlib removed in Python 3.13+)."""
|
||||
def __init__(self, host, port=23, timeout=10):
|
||||
self._sock = socket.create_connection((host, port), timeout=timeout)
|
||||
|
||||
def read_very_eager(self):
|
||||
try:
|
||||
self._sock.setblocking(False)
|
||||
data = b""
|
||||
while True:
|
||||
try:
|
||||
chunk = self._sock.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
data += chunk
|
||||
except (BlockingIOError, OSError):
|
||||
break
|
||||
self._sock.setblocking(True)
|
||||
return data
|
||||
except Exception:
|
||||
return b""
|
||||
|
||||
def write(self, data):
|
||||
self._sock.sendall(data)
|
||||
|
||||
def close(self):
|
||||
try:
|
||||
self._sock.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def set_device_password(ip, user="root", old_password="", new_password="admin123a",
|
||||
status_cb=None):
|
||||
"""
|
||||
Set device password via Telnet (if raw/reset) or SSH.
|
||||
"""
|
||||
# Jitter to avoid hammering the network with many concurrent connections
|
||||
time.sleep(random.uniform(0.1, 1.5))
|
||||
|
||||
if status_cb:
|
||||
status_cb("Checking Telnet port for raw device...")
|
||||
|
||||
# 1. Thử Telnet trước (OpenWrt mặc định mở Telnet 23 và cấm SSH Root khi chưa có Pass)
|
||||
try:
|
||||
tn = _SimpleTelnet(ip, timeout=5)
|
||||
# Nếu vô được Telnet tức là thiết bị vừa Reset cứng chưa có pass
|
||||
if status_cb:
|
||||
status_cb("Telnet connected! Setting password...")
|
||||
|
||||
# Đợi logo OpenWrt và prompt "root@OpenWrt:/# "
|
||||
time.sleep(1)
|
||||
tn.read_very_eager()
|
||||
|
||||
# Gửi lệnh đổi pass
|
||||
tn.write(b"passwd\n")
|
||||
time.sleep(1)
|
||||
tn.write(new_password.encode('ascii') + b"\n")
|
||||
time.sleep(1)
|
||||
tn.write(new_password.encode('ascii') + b"\n")
|
||||
time.sleep(1)
|
||||
|
||||
# Thoát telnet
|
||||
tn.write(b"exit\n")
|
||||
time.sleep(0.5)
|
||||
tn.close()
|
||||
|
||||
# Chờ 3 giây để OpenWrt kịp đóng Telnet và nổ tiến trình Dropbear (SSH Server)
|
||||
if status_cb:
|
||||
status_cb("Password set via Telnet. Waiting for SSH to start...")
|
||||
time.sleep(3)
|
||||
return "DONE"
|
||||
|
||||
except Exception as e:
|
||||
# Bất kỳ lỗi Telnet nào (ConnectionRefused, Timeout, BrokenPipe...)
|
||||
# đều có nghĩa là Telnet không truy cập được hoặc bị đóng ngang.
|
||||
# Chuyển qua luồng kết nối SSH.
|
||||
pass
|
||||
|
||||
# 2. Rơi xuống luồng SSH nếu thiết bị cũ (cổng Telnet đóng hoặc lỗi)
|
||||
if status_cb:
|
||||
status_cb("Connecting SSH for password update...")
|
||||
|
||||
last_error = None
|
||||
for attempt in range(3):
|
||||
try:
|
||||
client = _create_ssh_client(ip, user, old_password, timeout=10)
|
||||
|
||||
if status_cb:
|
||||
status_cb("Setting password via SSH...")
|
||||
|
||||
shell = client.invoke_shell()
|
||||
time.sleep(1)
|
||||
if shell.recv_ready():
|
||||
shell.recv(65535)
|
||||
|
||||
shell.send("passwd\n")
|
||||
time.sleep(2)
|
||||
shell.send(f"{new_password}\n")
|
||||
time.sleep(1)
|
||||
shell.send(f"{new_password}\n")
|
||||
time.sleep(2)
|
||||
|
||||
if shell.recv_ready():
|
||||
shell.recv(65535)
|
||||
|
||||
shell.send("exit\n")
|
||||
time.sleep(0.5)
|
||||
shell.close()
|
||||
client.close()
|
||||
|
||||
if status_cb:
|
||||
status_cb("Password set ✓")
|
||||
return "DONE"
|
||||
|
||||
except paramiko.AuthenticationException as e:
|
||||
# Authentication means wrong password, retrying won't help here
|
||||
if 'client' in locals() and client:
|
||||
client.close()
|
||||
return f"FAIL: Cannot connect SSH (Old pass incorrect?) — {e}"
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
if 'client' in locals() and client:
|
||||
try: client.close()
|
||||
except: pass
|
||||
# Errno 64 or 113 usually means 'Host is down' or 'No route to host'.
|
||||
# It can be a transient switch issue. Wait a bit and retry.
|
||||
time.sleep(2)
|
||||
|
||||
return f"FAIL: Cannot connect SSH after 3 attempts — {last_error}"
|
||||
|
||||
|
||||
def flash_device_ssh(ip, firmware_path, user="root", password="admin123a",
|
||||
backup_password="", set_passwd=False, status_cb=None):
|
||||
"""
|
||||
Flash firmware to an OpenWrt device via SSH/SCP.
|
||||
|
||||
Steps (mirroring flash.ps1):
|
||||
1. (Optional) Set device password via passwd
|
||||
2. Upload firmware via SCP to /tmp/
|
||||
3. Verify uploaded file
|
||||
4. Sync filesystem
|
||||
5. Execute sysupgrade
|
||||
|
||||
Returns:
|
||||
"DONE" on success, "FAIL: reason" on error
|
||||
"""
|
||||
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 0: Set password (optional)
|
||||
# ═══════════════════════════════════════════
|
||||
if not set_passwd:
|
||||
# Jitter here if we skip set_passwd
|
||||
time.sleep(random.uniform(0.1, 1.5))
|
||||
|
||||
if set_passwd:
|
||||
result = set_device_password(ip, user, "", password, status_cb)
|
||||
if result.startswith("FAIL"):
|
||||
# Try with backup password if set
|
||||
if backup_password:
|
||||
result = set_device_password(ip, user, backup_password, password, status_cb)
|
||||
|
||||
# If still failing, try with current intended password just in case it was already set
|
||||
if result.startswith("FAIL"):
|
||||
result = set_device_password(ip, user, password, password, status_cb)
|
||||
|
||||
if result.startswith("FAIL"):
|
||||
return result
|
||||
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 1: Connect SSH
|
||||
# ═══════════════════════════════════════════
|
||||
if status_cb:
|
||||
status_cb("Connecting SSH...")
|
||||
|
||||
try:
|
||||
client = _create_ssh_client(ip, user, password)
|
||||
except paramiko.AuthenticationException:
|
||||
return "FAIL: SSH authentication failed"
|
||||
except paramiko.SSHException as e:
|
||||
return f"FAIL: SSH error — {e}"
|
||||
except Exception as e:
|
||||
return f"FAIL: Cannot connect — {e}"
|
||||
|
||||
try:
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 2: Upload firmware via SCP
|
||||
# ═══════════════════════════════════════════
|
||||
if status_cb:
|
||||
status_cb("Uploading firmware via SCP...")
|
||||
|
||||
filename = os.path.basename(firmware_path)
|
||||
remote_path = f"/tmp/{filename}"
|
||||
|
||||
upload_success = False
|
||||
last_error = None
|
||||
for attempt in range(3):
|
||||
try:
|
||||
scp_client = SCPClient(client.get_transport(), socket_timeout=350)
|
||||
scp_client.put(firmware_path, remote_path)
|
||||
scp_client.close()
|
||||
upload_success = True
|
||||
break
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
time.sleep(1.5)
|
||||
|
||||
if not upload_success:
|
||||
return f"FAIL: SCP upload failed (Check Network) — {last_error}"
|
||||
|
||||
time.sleep(2)
|
||||
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 3: Verify firmware uploaded
|
||||
# ═══════════════════════════════════════════
|
||||
if status_cb:
|
||||
status_cb("Verifying firmware...")
|
||||
|
||||
stdin, stdout, stderr = client.exec_command(
|
||||
f"test -f {remote_path} && ls -lh {remote_path}",
|
||||
timeout=10
|
||||
)
|
||||
verify_output = stdout.read().decode("utf-8", errors="ignore").strip()
|
||||
verify_err = stderr.read().decode("utf-8", errors="ignore").strip()
|
||||
|
||||
if not verify_output:
|
||||
return f"FAIL: Firmware file not found on device after upload"
|
||||
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 4: Sync filesystem
|
||||
# ═══════════════════════════════════════════
|
||||
if status_cb:
|
||||
status_cb("Syncing filesystem...")
|
||||
|
||||
client.exec_command("sync", timeout=10)
|
||||
time.sleep(2)
|
||||
|
||||
# ═══════════════════════════════════════════
|
||||
# STEP 5: Flash firmware (sysupgrade)
|
||||
# ═══════════════════════════════════════════
|
||||
if status_cb:
|
||||
status_cb("Flashing firmware (sysupgrade)...")
|
||||
|
||||
try:
|
||||
# Capture output by redirecting to a file in /tmp first, or read from stdout
|
||||
# Use -F to force upgrade and bypass "Image metadata not present" error for uImage files
|
||||
stdin, stdout, stderr = client.exec_command(f"sysupgrade -F -v -n {remote_path} > /tmp/sysup.log 2>&1")
|
||||
|
||||
# Wait up to 4 seconds to see if it immediately fails
|
||||
# Sysupgrade takes time and normally drops the connection, so if it finishes in < 4s, it's an error.
|
||||
time.sleep(4)
|
||||
|
||||
if stdout.channel.exit_status_ready():
|
||||
exit_code = stdout.channel.recv_exit_status()
|
||||
|
||||
# Fetch the error log
|
||||
_, log_out, _ = client.exec_command("cat /tmp/sysup.log")
|
||||
err_msg = log_out.read().decode("utf-8", errors="ignore").strip()
|
||||
|
||||
return f"FAIL: sysupgrade terminated early (Code {exit_code}). Details:\n{err_msg}"
|
||||
|
||||
except Exception:
|
||||
# Connection drop during sysupgrade is exactly what we expect if it succeeds
|
||||
pass
|
||||
|
||||
if status_cb:
|
||||
status_cb("Rebooting...")
|
||||
time.sleep(3)
|
||||
|
||||
return "DONE"
|
||||
|
||||
except Exception as e:
|
||||
return f"FAIL: {e}"
|
||||
finally:
|
||||
try:
|
||||
client.close()
|
||||
except Exception:
|
||||
pass
|
||||
102
core/workers.py
Normal file
102
core/workers.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from PyQt6.QtCore import QThread, pyqtSignal
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
# We need these imports inside workers since they use them
|
||||
from core.scanner import scan_network
|
||||
from core.flasher import flash_device
|
||||
from core.ssh_flasher import flash_device_ssh
|
||||
from utils.network import _resolve_hostname
|
||||
|
||||
class ScanThread(QThread):
|
||||
"""Run network scan in a background thread so UI doesn't freeze."""
|
||||
finished = pyqtSignal(list)
|
||||
error = pyqtSignal(str)
|
||||
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
|
||||
stage = pyqtSignal(str) # current scan phase
|
||||
|
||||
def __init__(self, network):
|
||||
super().__init__()
|
||||
self.network = network
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
def on_ping_progress(done, total):
|
||||
self.scan_progress.emit(done, total)
|
||||
|
||||
def on_stage(s):
|
||||
self.stage.emit(s)
|
||||
|
||||
results = scan_network(self.network,
|
||||
progress_cb=on_ping_progress,
|
||||
stage_cb=on_stage)
|
||||
# Resolve hostnames in parallel
|
||||
self.stage.emit("hostname")
|
||||
with ThreadPoolExecutor(max_workers=50) as executor:
|
||||
future_to_dev = {
|
||||
executor.submit(_resolve_hostname, d["ip"]): d
|
||||
for d in results
|
||||
}
|
||||
for future in as_completed(future_to_dev):
|
||||
dev = future_to_dev[future]
|
||||
try:
|
||||
dev["name"] = future.result(timeout=3)
|
||||
except Exception:
|
||||
dev["name"] = ""
|
||||
self.finished.emit(results)
|
||||
except Exception as e:
|
||||
self.error.emit(str(e))
|
||||
|
||||
|
||||
class FlashThread(QThread):
|
||||
"""Run firmware flash in background so UI stays responsive."""
|
||||
device_status = pyqtSignal(int, str) # index, status message
|
||||
device_done = pyqtSignal(int, str) # index, result
|
||||
all_done = pyqtSignal()
|
||||
|
||||
def __init__(self, devices, firmware_path, max_workers=10,
|
||||
method="api", ssh_user="root", ssh_password="admin123a",
|
||||
ssh_backup_password="", set_passwd=False):
|
||||
super().__init__()
|
||||
self.devices = devices
|
||||
self.firmware_path = firmware_path
|
||||
self.max_workers = max_workers
|
||||
self.method = method
|
||||
self.ssh_user = ssh_user
|
||||
self.ssh_password = ssh_password
|
||||
self.ssh_backup_password = ssh_backup_password
|
||||
self.set_passwd = set_passwd
|
||||
|
||||
def run(self):
|
||||
def _flash_one(i, dev):
|
||||
try:
|
||||
def on_status(msg):
|
||||
self.device_status.emit(i, msg)
|
||||
|
||||
if self.method == "ssh":
|
||||
result = flash_device_ssh(
|
||||
dev["ip"], self.firmware_path,
|
||||
user=self.ssh_user,
|
||||
password=self.ssh_password,
|
||||
backup_password=self.ssh_backup_password,
|
||||
set_passwd=self.set_passwd,
|
||||
status_cb=on_status
|
||||
)
|
||||
else:
|
||||
result = flash_device(
|
||||
dev["ip"], self.firmware_path,
|
||||
status_cb=on_status
|
||||
)
|
||||
self.device_done.emit(i, result)
|
||||
except Exception as e:
|
||||
self.device_done.emit(i, f"FAIL: {e}")
|
||||
|
||||
# Use configured max_workers (0 = unlimited = one per device)
|
||||
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
futures = []
|
||||
for i, dev in enumerate(self.devices):
|
||||
futures.append(executor.submit(_flash_one, i, dev))
|
||||
for f in futures:
|
||||
f.result()
|
||||
|
||||
self.all_done.emit()
|
||||
205
debug_full.py
205
debug_full.py
@@ -1,205 +0,0 @@
|
||||
"""
|
||||
Debug Flash — chạy đầy đủ 3 bước flash và log chi tiết từng bước.
|
||||
|
||||
Usage:
|
||||
python debug_full.py <IP> <firmware.bin>
|
||||
python debug_full.py 192.168.11.17 V3.0.6p5.bin
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import requests
|
||||
import re
|
||||
|
||||
|
||||
def debug_flash(ip, firmware_path, username="root", password=""):
|
||||
base_url = f"http://{ip}"
|
||||
session = requests.Session()
|
||||
|
||||
print(f"{'='*65}")
|
||||
print(f" Full Flash Debug — {ip}")
|
||||
print(f" Firmware: {os.path.basename(firmware_path)}")
|
||||
print(f" Size: {os.path.getsize(firmware_path) / 1024 / 1024:.2f} MB")
|
||||
print(f"{'='*65}")
|
||||
|
||||
# ═══════════════════════════════════
|
||||
# STEP 1: Login
|
||||
# ═══════════════════════════════════
|
||||
print(f"\n{'─'*65}")
|
||||
print(f" STEP 1: Login")
|
||||
print(f"{'─'*65}")
|
||||
|
||||
login_url = f"{base_url}/cgi-bin/luci"
|
||||
|
||||
print(f" → GET {login_url}")
|
||||
try:
|
||||
r = session.get(login_url, timeout=10)
|
||||
print(f" Status: {r.status_code}")
|
||||
except Exception as e:
|
||||
print(f" ❌ Cannot connect: {e}")
|
||||
return
|
||||
|
||||
# Detect form fields
|
||||
if 'name="luci_username"' in r.text:
|
||||
login_data = {"luci_username": username, "luci_password": password}
|
||||
print(f" Fields: luci_username / luci_password")
|
||||
else:
|
||||
login_data = {"username": username, "password": password}
|
||||
print(f" Fields: username / password")
|
||||
|
||||
print(f"\n → POST {login_url}")
|
||||
print(f" Data: {login_data}")
|
||||
resp = session.post(login_url, data=login_data,
|
||||
timeout=10, allow_redirects=True)
|
||||
|
||||
print(f" Status: {resp.status_code}")
|
||||
print(f" Cookies: {dict(session.cookies)}")
|
||||
|
||||
# Extract stok
|
||||
stok = None
|
||||
for source in [resp.url, resp.text]:
|
||||
match = re.search(r";stok=([a-f0-9]+)", source)
|
||||
if match:
|
||||
stok = match.group(1)
|
||||
break
|
||||
if not stok:
|
||||
for hist in resp.history:
|
||||
match = re.search(r";stok=([a-f0-9]+)",
|
||||
hist.headers.get("Location", ""))
|
||||
if match:
|
||||
stok = match.group(1)
|
||||
break
|
||||
|
||||
print(f" stok: {stok}")
|
||||
has_cookie = "sysauth" in str(session.cookies)
|
||||
print(f" sysauth: {'✅ YES' if has_cookie else '❌ NO'}")
|
||||
|
||||
if not stok and not has_cookie:
|
||||
print(f"\n ❌ LOGIN FAILED — no stok, no sysauth cookie")
|
||||
return
|
||||
|
||||
print(f"\n ✅ LOGIN OK")
|
||||
|
||||
# Build flash URL
|
||||
if stok:
|
||||
flash_url = f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops"
|
||||
else:
|
||||
flash_url = f"{base_url}/cgi-bin/luci/admin/system/flashops"
|
||||
|
||||
# ═══════════════════════════════════
|
||||
# STEP 2: Upload firmware
|
||||
# ═══════════════════════════════════
|
||||
print(f"\n{'─'*65}")
|
||||
print(f" STEP 2: Upload firmware")
|
||||
print(f"{'─'*65}")
|
||||
|
||||
print(f" → POST {flash_url}")
|
||||
print(f" File field: image")
|
||||
print(f" File name: {os.path.basename(firmware_path)}")
|
||||
print(f" keep: NOT sent (unchecked)")
|
||||
print(f" Uploading...")
|
||||
|
||||
filename = os.path.basename(firmware_path)
|
||||
with open(firmware_path, "rb") as f:
|
||||
resp = session.post(
|
||||
flash_url,
|
||||
data={}, # No keep = uncheck Keep Settings
|
||||
files={"image": (filename, f, "application/octet-stream")},
|
||||
timeout=300,
|
||||
)
|
||||
|
||||
print(f" Status: {resp.status_code}")
|
||||
print(f" URL: {resp.url}")
|
||||
|
||||
# Check response content
|
||||
resp_lower = resp.text.lower()
|
||||
has_verify = "verify" in resp_lower
|
||||
has_proceed = "proceed" in resp_lower
|
||||
has_checksum = "checksum" in resp_lower
|
||||
has_image_form = 'name="image"' in resp.text and 'type="file"' in resp.text
|
||||
|
||||
print(f"\n Response analysis:")
|
||||
print(f" Has 'verify': {'✅' if has_verify else '❌'}")
|
||||
print(f" Has 'proceed': {'✅' if has_proceed else '❌'}")
|
||||
print(f" Has 'checksum': {'✅' if has_checksum else '❌'}")
|
||||
print(f" Has flash form: {'⚠️ (upload ignored!)' if has_image_form else '✅ (not flash form)'}")
|
||||
|
||||
# Extract checksum from verification page
|
||||
checksum = re.search(r'Checksum:\s*<code>([a-f0-9]+)</code>', resp.text)
|
||||
size_info = re.search(r'Size:\s*([\d.]+\s*MB)', resp.text)
|
||||
if checksum:
|
||||
print(f" Checksum: {checksum.group(1)}")
|
||||
if size_info:
|
||||
print(f" Size: {size_info.group(1)}")
|
||||
|
||||
# Check for keep settings status
|
||||
if "will be erased" in resp.text:
|
||||
print(f" Config: Will be ERASED ✅ (keep=off)")
|
||||
elif "will be kept" in resp.text:
|
||||
print(f" Config: Will be KEPT ⚠️ (keep=on)")
|
||||
|
||||
if not has_verify or not has_proceed:
|
||||
print(f"\n ❌ Did NOT get verification page!")
|
||||
# Show cleaned response
|
||||
text = re.sub(r'<[^>]+>', ' ', resp.text)
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
print(f" Response: {text[:500]}")
|
||||
return
|
||||
|
||||
# Show the Proceed form
|
||||
print(f"\n Proceed form (from HTML):")
|
||||
forms = re.findall(r'<form[^>]*>(.*?)</form>', resp.text, re.DOTALL)
|
||||
for i, form_body in enumerate(forms):
|
||||
if 'value="Proceed"' in form_body or 'step' in form_body:
|
||||
inputs = re.findall(r'<input[^>]*/?\s*>', form_body)
|
||||
for inp in inputs:
|
||||
print(f" {inp.strip()}")
|
||||
|
||||
print(f"\n ✅ UPLOAD OK — Verification page received")
|
||||
|
||||
# ═══════════════════════════════════
|
||||
# STEP 3: Proceed (confirm flash)
|
||||
# ═══════════════════════════════════
|
||||
print(f"\n{'─'*65}")
|
||||
print(f" STEP 3: Proceed (confirm flash)")
|
||||
print(f"{'─'*65}")
|
||||
|
||||
confirm_data = {
|
||||
"step": "2",
|
||||
"keep": "",
|
||||
}
|
||||
|
||||
print(f" → POST {flash_url}")
|
||||
print(f" Data: {confirm_data}")
|
||||
|
||||
try:
|
||||
resp = session.post(flash_url, data=confirm_data, timeout=300)
|
||||
print(f" Status: {resp.status_code}")
|
||||
# Show cleaned response
|
||||
text = re.sub(r'<[^>]+>', ' ', resp.text)
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
print(f" Response: {text[:300]}")
|
||||
except requests.ConnectionError:
|
||||
print(f" ✅ Connection dropped — device is REBOOTING!")
|
||||
except requests.ReadTimeout:
|
||||
print(f" ✅ Timeout — device is REBOOTING!")
|
||||
|
||||
print(f"\n{'='*65}")
|
||||
print(f" 🎉 FLASH COMPLETE")
|
||||
print(f"{'='*65}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python debug_full.py <IP> <firmware.bin>")
|
||||
print("Example: python debug_full.py 192.168.11.17 V3.0.6p5.bin")
|
||||
sys.exit(1)
|
||||
|
||||
ip = sys.argv[1]
|
||||
fw = sys.argv[2]
|
||||
|
||||
if not os.path.exists(fw):
|
||||
print(f"❌ File not found: {fw}")
|
||||
sys.exit(1)
|
||||
|
||||
debug_flash(ip, fw)
|
||||
60
docs/load_fw_ssh_docs.md
Normal file
60
docs/load_fw_ssh_docs.md
Normal file
@@ -0,0 +1,60 @@
|
||||
# Tài liệu Kỹ thuật: Nạp Firmware qua SSH (`core/ssh_flasher.py`)
|
||||
|
||||
Module `ssh_flasher.py` chịu trách nhiệm nạp Firmware lên các thiết bị OpenWrt nạp mới hoặc nạp lại thông qua hai giao thức Telnet và SSH. Nó được thiết kế với độ an toàn cao để xử lý đa luồng (ví dụ: quét và nạp hàng loạt thiết bị cùng lúc), có cơ chế tự động thử lại (Retry) và cơ chế dự phòng mật khẩu khi thiết bị bị kẹt mật khẩu cũ.
|
||||
|
||||
## 1. Sơ đồ Luồng Hoạt Động (Operational Flow)
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Bắt đầu Flash SSH] -->|Jitter Delay 0.1s - 1.5s| B{Có yêu cầu Set Password?}
|
||||
B -- Không --> F[Bước 1: Kết nối SSH]
|
||||
B -- Có --> C[Thử kết nối Telnet port 23]
|
||||
|
||||
C -- Thành công --> C1[Gửi lệnh `passwd` > admin123a] --> F
|
||||
C -- Lỗi / Timeout / Bị từ chối --> D[Thử SSH với mật khẩu rỗng]
|
||||
|
||||
D -- Thành công --> D1[Gửi lệnh `passwd` > admin123a] --> F
|
||||
D -- Lỗi 'Authentication Failed' --> E[Thử SSH với Backup Password]
|
||||
|
||||
E -- Thành công --> E1[Gửi lệnh `passwd` > admin123a] --> F
|
||||
E -- Thất bại --> E2[Thử SSH với Password cấu hình]
|
||||
|
||||
E2 -- Thành công --> F
|
||||
E2 -- Thất bại --> Z[DỪNG LẠI - Báo Lỗi]
|
||||
|
||||
F -->|Kết nối SSH thành công sau Retry 3 lần| G[Bước 2: Push Firmware SCP vào /tmp/]
|
||||
G -->|Tối đa 3 lần Retry| H[Bước 3: Xác minh File `test -f`]
|
||||
H --> I[Bước 4: Đồng bộ bộ nhớ `sync`]
|
||||
I --> J[Bước 5: Gọi `sysupgrade -F -v -n /tmp/...`]
|
||||
J --> K{Kết nối bị đứt?}
|
||||
K -- Có --> L[Khởi động lại thành công - Báo DONE]
|
||||
K -- Không --> M[Báo lỗi Sysupgrade]
|
||||
```
|
||||
|
||||
## 2. Phân Tích Logic Cốt Lõi
|
||||
|
||||
### 2.1 Cơ Chế Dự Phòng Mật Khẩu Đa Tầng (Fallback Mechanism)
|
||||
|
||||
Điểm mạnh của thuật toán là khả năng "lì lợm" lấy được quyền Root để đặt mật khẩu cho thiết bị. Các lớp dự phòng bao gồm:
|
||||
|
||||
1. **Telnet (Port 23):** Router OpenWrt khi mới xuất xưởng hoặc ngay sau khi Factory Reset sẽ chặn kết nối SSH bằng tài khoản `root` mất gốc, nhưng lại mở Telnet không có mật khẩu. Script sẽ luôn thử bẻ khóa qua đường này đầu tiên. Bất kỳ ngoại lệ nào phát sinh ở luồng này (Refused, Timeout, Broken Pipe) cũng sẽ kích hoạt fallback.
|
||||
2. **SSH (Mật khẩu rỗng):** Nếu Telnet bị đóng cửa (chứng tỏ thiết bị đã được kích hoạt sơ), phần mềm thử mở cổng 22 với User `root` và mật khẩu rỗng `""`.
|
||||
3. **SSH (Backup Password):** Nếu mật khẩu rỗng bị từ chối bằng `AuthenticationException`, script định nhận diện máy đã bị kẹt một pass được cài đặt trước đó và chèn `backup_password` ra thử nghiệm.
|
||||
4. **SSH (Target Password):** Nếu trượt tới lớp thứ 3, script thử nốt với chính mật khẩu mục tiêu của dự án (mặc định: `admin123a`) để chặn trường hợp thiết bị đã đổi Pass thành công từ lần trước nhưng quá trình nạp Firmware chưa diễn ra.
|
||||
|
||||
### 2.2 Xử Lý Đa Luồng Chống Nghẽn (Concurrency Limits)
|
||||
|
||||
Thư viện `paramiko` thường dễ đổ vỡ cấu trúc và báo lỗi `[Errno 64] Host is down` hoặc `[Errno 32] Broken Pipe` khi có vài chục luồng (threads) cùng mở port đập vào Network Stack OS (hoặc Switch/Router) ở đúng một chớp mắt. Lỗi này là dạng Transient (có tính tạm thời). Do đó hệ thống được nâng cấp:
|
||||
|
||||
- **Jitter (Độ trễ ngẫu nhiên):** Sử dụng `time.sleep(random.uniform(0.1, 1.5))` trước khi chọc vào thiết bị. Sự sai lệch mili-giây này phân tán chùm yêu cầu dội vào mạng, tránh tự gây ra một cuộc tấn công từ chối dịch vụ (DDoS) nội bộ.
|
||||
- **Vòng lặp (Retry Hooks):** Quá trình đăng nhập (`_create_ssh_client`) cũng như upload payload (`SCPClient.put`) đều được bọc trong bộ đếm thử lại cực đỉnh (thử lại 3 lần sau mỗi 1-2 giây nghỉ). Việc này triệt hạ 99% các lỗi truyền TCP/IP và Timeout.
|
||||
|
||||
### 2.3 Cơ Chế Khôi Phục File (RAM Disk SCP)
|
||||
|
||||
- Firmware được tống thẳng tới phân vùng ảo RAM cục bộ đích (ví dụ: `/tmp/<tên_file>.bin`) thông qua giao thức SCP. Ghi file trực tiếp vào RAM cho phép đạt tốc độ tuyệt đối nhanh và tránh gây hao mòn sinh học lên FlashNOR của router.
|
||||
- Gửi lệnh `sync` qua kênh Shell để ép thiết bị lưu transaction xuống vùng nhớ cứng, chuẩn bị môi trường ổn định nhất.
|
||||
|
||||
### 2.4 Thủ Thuật Gọi `sysupgrade -F`
|
||||
|
||||
- OpenWrt đời mới khá kén phần Metadata của Firmware Image. Cờ `-F` cởi trói kiểm duyệt đó và ép hệ điều hành nạp đè File bất chấp sự vắng mặt của metadata uImage. Cờ `-n` khiến hệ thống mất sách cấu hình cũ (Clean Flash).
|
||||
- Lệnh `sysupgrade` sẽ cắn xén cả phần OS dưới lõi máy sau đó Tắt mạng đột ngột. Do đó, script được lập trình để xử lý Exception: Sự kiện **MẤT KẾT NỐI CHỦ ĐỘNG** trong khu vực này là dấu hiệu ăn mừng của việc cài đặt thành công thay vì báo hỏng kết nối.
|
||||
59
docs/update_fw_docs.md
Normal file
59
docs/update_fw_docs.md
Normal file
@@ -0,0 +1,59 @@
|
||||
# Tài liệu Kỹ thuật: Cập Nhật Firmware (Update FW)
|
||||
|
||||
Quy trình `Update FW` là phương thức tối ưu hóa để nâng cấp Firmware mới vào các thiết bị OpenWrt Live (đã và đang chạy sẵn Firmware của hệ sinh thái MiraV3). Cơ chế này dùng chính giao thức SSH thông qua `core/ssh_flasher.py` nhưng lược bỏ hoàn toàn các bước dò tìm và thiết lập mật khẩu thừa thãi.
|
||||
|
||||
## 1. Sơ đồ Luồng Hoạt Động (Operational Flow)
|
||||
|
||||
```mermaid
|
||||
graph TD
|
||||
A[Bắt đầu Update FW] --> B{Kiểm tra danh sách IP}
|
||||
B -- Có IP lạ != 192.168.11.102 --> C[Bật MessageBox cảnh báo]
|
||||
C -- Người dùng No --> Z[HỦY BỎ]
|
||||
C -- Người dùng Yes --> D
|
||||
B -- Chỉ có 192.168.11.102 --> D
|
||||
|
||||
D[Truyền tham số cấu hình tĩnh] --> E[ssh_user: root<br>ssh_password: admin123a<br>set_passwd: False]
|
||||
|
||||
E -->|Jitter Delay 0.1s - 1.5s| F[Kết nối thẳng SSH Port 22]
|
||||
|
||||
F -- Thành công --> G[Push Firmware SCP vào /tmp/]
|
||||
F -- Lỗi / Sai Pass --> Y[DỪNG LẠI - Báo Lỗi]
|
||||
|
||||
G -->|Tối đa 3 lần Retry| H[Xác minh File `test -f`]
|
||||
H --> I[Đồng bộ bộ nhớ `sync`]
|
||||
I --> J[Gọi Sysupgrade: `sysupgrade -F -v -n /tmp/...`]
|
||||
J --> K{Kết nối bị đứt?}
|
||||
K -- Có --> L[Khởi động lại thành công - Báo DONE]
|
||||
K -- Không --> M[Báo lỗi Sysupgrade]
|
||||
```
|
||||
|
||||
## 2. Phân Tích Logic Cốt Lõi
|
||||
|
||||
### 2.1 Cơ Chế Bỏ Qua Telnet (Skip Set Password)
|
||||
|
||||
Điểm khác biệt lớn nhất của luồng Update FW so với Nạp Mới:
|
||||
|
||||
- Thông qua UI `Chế độ Flash > Update Firmware`, hệ thống hiểu rằng thiết bị đích đã từng được cài đặt hệ sinh thái MiraV3 và chắc chắn đã sở hữu lớp bảo mật SSH cơ bản (root/admin123a).
|
||||
- Module `main.py` tự động kích hoạt cờ tuỳ chọn ngầm `set_passwd = False` khi gọi `FlashThread`.
|
||||
- **Bỏ Bầu Trời Khởi Động:** Hệ thống ngay lập tức rẽ ngang vào luồng SSH trên Port 22. Tool bỏ qua hoàn toàn quy trình truy vấn cổng Telnet (23) chậm chạp cũng như các lệnh Set Password (vốn mất thêm từ 3-5 giây chờ đợi). Tốc độ tổng thể tăng tốc cực đáng kể, phục vụ trực tiếp cho quá trình Mass-Update.
|
||||
|
||||
### 2.2 Ràng Buộc An Toàn (Safety Checks)
|
||||
|
||||
Được kiểm soát từ tầng UI (`main.py`) để chống Flash nhầm hàng hoạt:
|
||||
|
||||
- Khi người dùng chọn chế độ Update, script quét qua danh sách MAC/IP đang tích chọn. Chỉ duy nhất thiết bị có IP cấu hình cứng `192.168.11.102` mới được âm thầm qua cửa (được xem là IP mặc định an toàn cho việc test Firmware).
|
||||
- Nếu phát hiện **Bất kỳ IP lạ nào** (ví dụ: 192.168.1.5, 192.168.11.9), hệ thống lập tức chặn quy trình Flasher lại và bật MessageBox cảnh báo màu đỏ hỏi kỹ thuật viên xác nhận lại.
|
||||
|
||||
### 2.3 Quá Trình Sysupgrade (Hủy diệt và Tái sinh)
|
||||
|
||||
Cũng giống như luồng nạp mới, quá trình xả file dùng chung tệp lệnh: `sysupgrade -F -v -n /tmp/<tên_file>`
|
||||
|
||||
- **Cờ `-n`**: Clean Flash sạch trơn, không giữ Configuration rác của bản cũ.
|
||||
- **Cờ `-F` (Force)**: Ép hệ điều hành OpenWrt bỏ qua bước kiểm tra Image Metadata tại Local. Tuỳ chọn này sống còn đối với các tệp tin Firmware trần trụi (như build Raw `tim_uImage`) để tránh việc sysupgrade từ chối file, văng lỗi "Image metadata not present" và ngắt ngang tiến trình.
|
||||
- Kịch bản hoàn hảo nhất của quá trình này lại chính là **bị đứt kết nối mạng đột ngột**. Script đã bẫy một khoảng thời gian chờ (time.sleep) để canh việc Server SSH sập nguồn. Đứt mạng nghĩa là Kernel chuẩn bị Reboot nạp File, và hệ thống sẽ bắt Catch đánh giá là Thành Công (`DONE`). Ngược lại, nếu Sysupgrade thất bại, nó sẽ phun ra Log và giữ nguyên kết nối, lúc đó Script sẽ chụp lại Error Code và in ra UI.
|
||||
|
||||
---
|
||||
|
||||
## 3. Trình Gỡ Lỗi Nhanh (Debugging Update flow)
|
||||
|
||||
Lưu ý: Bạn đã xóa các file script debug đơn lẻ khỏi thư mục gốc (`debug_ssh.py`). Hiện tại luồng Update FW nên được gỡ lỗi trực tiếp bằng cách in Print/Log ra terminal của lệnh `./run.sh` lúc chạy App UI. Mọi tiến độ như "Đang kết nối SSH", "Đồng bộ file" được hiển thị minh bạch tại cột **Status** trên giao diện chính.
|
||||
768
main.py
768
main.py
@@ -11,7 +11,8 @@ from PyQt6.QtWidgets import (
|
||||
QVBoxLayout, QHBoxLayout, QFileDialog, QTableWidget,
|
||||
QTableWidgetItem, QLabel, QProgressBar,
|
||||
QMessageBox, QGroupBox, QHeaderView, QLineEdit,
|
||||
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy
|
||||
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy,
|
||||
QComboBox
|
||||
)
|
||||
from PyQt6.QtCore import (
|
||||
Qt, QThread, pyqtSignal, QPropertyAnimation, QAbstractAnimation,
|
||||
@@ -21,501 +22,40 @@ from PyQt6.QtGui import QFont, QColor, QIcon, QAction
|
||||
|
||||
import datetime
|
||||
|
||||
from scanner import scan_network
|
||||
from flasher import flash_device
|
||||
from core.scanner import scan_network
|
||||
from core.flasher import flash_device
|
||||
from core.ssh_flasher import flash_device_ssh
|
||||
from core.workers import ScanThread, FlashThread
|
||||
|
||||
from utils.network import _resolve_hostname, get_default_network
|
||||
from utils.system import resource_path, get_machine_info
|
||||
|
||||
class CollapsibleGroupBox(QGroupBox):
|
||||
def __init__(self, title="", parent=None):
|
||||
super().__init__(title, parent)
|
||||
self.setCheckable(True)
|
||||
self.setChecked(True)
|
||||
|
||||
self.animation = QPropertyAnimation(self, b"maximumHeight")
|
||||
self.animation.setDuration(200)
|
||||
|
||||
# Connect the toggled signal to our animation function
|
||||
self.toggled.connect(self._toggle_animation)
|
||||
from ui.components import CollapsibleGroupBox
|
||||
from ui.styles import STYLE
|
||||
|
||||
self._full_height = 0
|
||||
|
||||
def set_content_layout(self, layout):
|
||||
# We need a wrapper widget to hold the layout
|
||||
self.content_widget = QWidget()
|
||||
self.content_widget.setStyleSheet("background-color: transparent;")
|
||||
self.content_widget.setLayout(layout)
|
||||
|
||||
main_layout = QVBoxLayout()
|
||||
main_layout.setContentsMargins(0, 0, 0, 0)
|
||||
main_layout.addWidget(self.content_widget)
|
||||
self.setLayout(main_layout)
|
||||
|
||||
def _toggle_animation(self, checked):
|
||||
if not hasattr(self, 'content_widget'):
|
||||
return
|
||||
|
||||
if checked:
|
||||
# Expand: show content first, then animate
|
||||
self.content_widget.setVisible(True)
|
||||
target_height = self.sizeHint().height()
|
||||
self.animation.stop()
|
||||
self.animation.setStartValue(self.height())
|
||||
self.animation.setEndValue(target_height)
|
||||
self.animation.finished.connect(self._on_expand_finished)
|
||||
self.animation.start()
|
||||
else:
|
||||
# Collapse
|
||||
self.animation.stop()
|
||||
self.animation.setStartValue(self.height())
|
||||
self.animation.setEndValue(32)
|
||||
self.animation.finished.connect(self._on_collapse_finished)
|
||||
self.animation.start()
|
||||
|
||||
def _on_expand_finished(self):
|
||||
# Remove height constraint so content can grow dynamically
|
||||
self.setMaximumHeight(16777215)
|
||||
try:
|
||||
self.animation.finished.disconnect(self._on_expand_finished)
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
def _on_collapse_finished(self):
|
||||
if not self.isChecked():
|
||||
self.content_widget.setVisible(False)
|
||||
try:
|
||||
self.animation.finished.disconnect(self._on_collapse_finished)
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
|
||||
def _resolve_hostname(ip):
|
||||
"""Reverse DNS lookup for a single IP."""
|
||||
try:
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
|
||||
def get_local_ip():
|
||||
"""Get the local IP address of this machine."""
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 80))
|
||||
ip = s.getsockname()[0]
|
||||
s.close()
|
||||
return ip
|
||||
except Exception:
|
||||
return "N/A"
|
||||
|
||||
|
||||
def get_default_network(ip):
|
||||
"""Guess the /24 network from local IP."""
|
||||
try:
|
||||
parts = ip.split(".")
|
||||
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
|
||||
except Exception:
|
||||
return "192.168.1.0/24"
|
||||
|
||||
|
||||
def resource_path(relative_path):
|
||||
""" Get absolute path to resource, works for dev and for PyInstaller """
|
||||
try:
|
||||
base_path = sys._MEIPASS
|
||||
except Exception:
|
||||
base_path = os.path.abspath(".")
|
||||
return os.path.join(base_path, relative_path)
|
||||
|
||||
|
||||
def get_machine_info():
|
||||
"""Collect machine info."""
|
||||
hostname = socket.gethostname()
|
||||
local_ip = get_local_ip()
|
||||
os_info = f"{platform.system()} {platform.release()}"
|
||||
mac_addr = "N/A"
|
||||
|
||||
try:
|
||||
import uuid
|
||||
mac = uuid.getnode()
|
||||
mac_addr = ":".join(
|
||||
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"hostname": hostname,
|
||||
"ip": local_ip,
|
||||
"os": os_info,
|
||||
"mac": mac_addr,
|
||||
}
|
||||
|
||||
|
||||
# ── Stylesheet ──────────────────────────────────────────────
|
||||
|
||||
STYLE = """
|
||||
QWidget {
|
||||
background-color: #1a1b2e;
|
||||
color: #e2e8f0;
|
||||
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
QGroupBox {
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
margin-top: 10px;
|
||||
padding: 20px 8px 6px 8px;
|
||||
font-weight: bold;
|
||||
color: #7eb8f7;
|
||||
background-color: #1e2035;
|
||||
}
|
||||
|
||||
QGroupBox::title {
|
||||
subcontrol-origin: margin;
|
||||
subcontrol-position: top left;
|
||||
left: 14px;
|
||||
top: 5px;
|
||||
padding: 0px 8px;
|
||||
background-color: transparent;
|
||||
}
|
||||
|
||||
QGroupBox::indicator {
|
||||
width: 14px;
|
||||
height: 14px;
|
||||
border-radius: 4px;
|
||||
border: 1px solid #3d4a6b;
|
||||
background-color: #13141f;
|
||||
margin-top: 5px;
|
||||
}
|
||||
|
||||
QGroupBox::indicator:unchecked {
|
||||
background-color: #13141f;
|
||||
}
|
||||
|
||||
QGroupBox::indicator:checked {
|
||||
background-color: #3b82f6;
|
||||
border-color: #3b82f6;
|
||||
}
|
||||
|
||||
QLabel#title {
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
color: #7eb8f7;
|
||||
letter-spacing: 1px;
|
||||
}
|
||||
|
||||
QLabel#info {
|
||||
color: #94a3b8;
|
||||
font-size: 11px;
|
||||
}
|
||||
|
||||
QPushButton {
|
||||
background-color: #2d3352;
|
||||
border: 1px solid #3d4a6b;
|
||||
border-radius: 6px;
|
||||
padding: 4px 12px;
|
||||
color: #e2e8f0;
|
||||
font-weight: 600;
|
||||
min-height: 24px;
|
||||
}
|
||||
|
||||
QPushButton:hover {
|
||||
background-color: #3d4a6b;
|
||||
border-color: #7eb8f7;
|
||||
color: #ffffff;
|
||||
}
|
||||
|
||||
QPushButton:pressed {
|
||||
background-color: #1a2040;
|
||||
}
|
||||
|
||||
QPushButton#scan {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #1a56db, stop:1 #1e66f5);
|
||||
border-color: #1a56db;
|
||||
color: #ffffff;
|
||||
}
|
||||
|
||||
QPushButton#scan:hover {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #2563eb, stop:1 #3b82f6);
|
||||
}
|
||||
|
||||
QPushButton#flash {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #15803d, stop:1 #16a34a);
|
||||
border-color: #15803d;
|
||||
color: #ffffff;
|
||||
font-size: 13px;
|
||||
min-height: 30px;
|
||||
}
|
||||
|
||||
QPushButton#flash:hover {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #16a34a, stop:1 #22c55e);
|
||||
}
|
||||
|
||||
QTableWidget {
|
||||
background-color: #13141f;
|
||||
alternate-background-color: #1a1b2e;
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
gridline-color: #2d3748;
|
||||
selection-background-color: #2d3a5a;
|
||||
selection-color: #e2e8f0;
|
||||
}
|
||||
|
||||
QTableWidget::item {
|
||||
padding: 2px 6px;
|
||||
border: none;
|
||||
}
|
||||
|
||||
QTableWidget::item:selected {
|
||||
background-color: #2d3a5a;
|
||||
color: #7eb8f7;
|
||||
}
|
||||
|
||||
QHeaderView::section {
|
||||
background-color: #1e2035;
|
||||
color: #7eb8f7;
|
||||
border: none;
|
||||
border-bottom: 2px solid #3b82f6;
|
||||
border-right: 1px solid #2d3748;
|
||||
padding: 4px 6px;
|
||||
font-weight: bold;
|
||||
font-size: 11px;
|
||||
letter-spacing: 0.5px;
|
||||
text-transform: uppercase;
|
||||
}
|
||||
|
||||
QHeaderView::section:last {
|
||||
border-right: none;
|
||||
}
|
||||
|
||||
QProgressBar {
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 6px;
|
||||
text-align: center;
|
||||
background-color: #13141f;
|
||||
color: #e2e8f0;
|
||||
height: 20px;
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
QProgressBar::chunk {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #3b82f6, stop:1 #7eb8f7);
|
||||
border-radius: 7px;
|
||||
}
|
||||
|
||||
QProgressBar#scan_bar {
|
||||
border: 1px solid #374151;
|
||||
border-radius: 5px;
|
||||
text-align: center;
|
||||
background-color: #13141f;
|
||||
color: #fbbf24;
|
||||
height: 16px;
|
||||
font-size: 10px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
QProgressBar#scan_bar::chunk {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #d97706, stop:1 #fbbf24);
|
||||
border-radius: 4px;
|
||||
}
|
||||
|
||||
QLineEdit {
|
||||
background-color: #13141f;
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
padding: 7px 12px;
|
||||
color: #e2e8f0;
|
||||
selection-background-color: #2d3a5a;
|
||||
}
|
||||
|
||||
QLineEdit:focus {
|
||||
border-color: #3b82f6;
|
||||
background-color: #161727;
|
||||
}
|
||||
|
||||
QScrollBar:vertical {
|
||||
background: #13141f;
|
||||
width: 10px;
|
||||
border-radius: 5px;
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical {
|
||||
background: #3d4a6b;
|
||||
border-radius: 5px;
|
||||
min-height: 30px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical:hover {
|
||||
background: #7eb8f7;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical:pressed {
|
||||
background: #3b82f6;
|
||||
}
|
||||
|
||||
QScrollBar::add-line:vertical,
|
||||
QScrollBar::sub-line:vertical {
|
||||
height: 0px;
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar::add-page:vertical,
|
||||
QScrollBar::sub-page:vertical {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar:horizontal {
|
||||
background: #13141f;
|
||||
height: 10px;
|
||||
border-radius: 5px;
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal {
|
||||
background: #3d4a6b;
|
||||
border-radius: 5px;
|
||||
min-width: 30px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal:hover {
|
||||
background: #7eb8f7;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal:pressed {
|
||||
background: #3b82f6;
|
||||
}
|
||||
|
||||
QScrollBar::add-line:horizontal,
|
||||
QScrollBar::sub-line:horizontal {
|
||||
width: 0px;
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar::add-page:horizontal,
|
||||
QScrollBar::sub-page:horizontal {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QCheckBox {
|
||||
spacing: 6px;
|
||||
color: #94a3b8;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
QCheckBox::indicator {
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
border-radius: 4px;
|
||||
border: 1px solid #3d4a6b;
|
||||
background-color: #13141f;
|
||||
}
|
||||
|
||||
QCheckBox::indicator:checked {
|
||||
background-color: #3b82f6;
|
||||
border-color: #3b82f6;
|
||||
}
|
||||
|
||||
QCheckBox::indicator:hover {
|
||||
border-color: #7eb8f7;
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
class ScanThread(QThread):
|
||||
"""Run network scan in a background thread so UI doesn't freeze."""
|
||||
finished = pyqtSignal(list)
|
||||
error = pyqtSignal(str)
|
||||
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
|
||||
stage = pyqtSignal(str) # current scan phase
|
||||
|
||||
def __init__(self, network):
|
||||
super().__init__()
|
||||
self.network = network
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
def on_ping_progress(done, total):
|
||||
self.scan_progress.emit(done, total)
|
||||
|
||||
def on_stage(s):
|
||||
self.stage.emit(s)
|
||||
|
||||
results = scan_network(self.network,
|
||||
progress_cb=on_ping_progress,
|
||||
stage_cb=on_stage)
|
||||
# Resolve hostnames in parallel
|
||||
self.stage.emit("hostname")
|
||||
with ThreadPoolExecutor(max_workers=50) as executor:
|
||||
future_to_dev = {
|
||||
executor.submit(_resolve_hostname, d["ip"]): d
|
||||
for d in results
|
||||
}
|
||||
for future in as_completed(future_to_dev):
|
||||
dev = future_to_dev[future]
|
||||
try:
|
||||
dev["name"] = future.result(timeout=3)
|
||||
except Exception:
|
||||
dev["name"] = ""
|
||||
self.finished.emit(results)
|
||||
except Exception as e:
|
||||
self.error.emit(str(e))
|
||||
|
||||
|
||||
class FlashThread(QThread):
|
||||
"""Run firmware flash in background so UI stays responsive."""
|
||||
device_status = pyqtSignal(int, str) # index, status message
|
||||
device_done = pyqtSignal(int, str) # index, result
|
||||
all_done = pyqtSignal()
|
||||
|
||||
def __init__(self, devices, firmware_path, max_workers=10):
|
||||
super().__init__()
|
||||
self.devices = devices
|
||||
self.firmware_path = firmware_path
|
||||
self.max_workers = max_workers
|
||||
|
||||
def run(self):
|
||||
def _flash_one(i, dev):
|
||||
try:
|
||||
def on_status(msg):
|
||||
self.device_status.emit(i, msg)
|
||||
|
||||
result = flash_device(
|
||||
dev["ip"], self.firmware_path,
|
||||
status_cb=on_status
|
||||
)
|
||||
self.device_done.emit(i, result)
|
||||
except Exception as e:
|
||||
self.device_done.emit(i, f"FAIL: {e}")
|
||||
|
||||
# Use configured max_workers (0 = unlimited = one per device)
|
||||
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
|
||||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||||
futures = []
|
||||
for i, dev in enumerate(self.devices):
|
||||
futures.append(executor.submit(_flash_one, i, dev))
|
||||
for f in futures:
|
||||
f.result()
|
||||
|
||||
self.all_done.emit()
|
||||
|
||||
|
||||
class App(QWidget):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.setWindowTitle("MiraV3 Firmware Loader")
|
||||
self.setWindowTitle("Mira Firmware Loader")
|
||||
self.setWindowIcon(QIcon(resource_path("icon.ico")))
|
||||
# Data state
|
||||
self.local_ip = ""
|
||||
self.gateway_ip = ""
|
||||
self.firmware = None
|
||||
self.all_devices = [] # all scan results (unfiltered)
|
||||
self.devices = [] # currently displayed (filtered)
|
||||
self.all_devices = [] # raw list from scanner
|
||||
self.devices = [] # filtered list for table
|
||||
self.flashed_macs = {} # MAC addresses flashed successfully in session (MAC -> timestamp)
|
||||
self.scan_thread = None
|
||||
|
||||
info = get_machine_info()
|
||||
@@ -527,12 +67,12 @@ class App(QWidget):
|
||||
layout.setContentsMargins(8, 6, 8, 6)
|
||||
|
||||
# ── Title ──
|
||||
title = QLabel("⚡ MiraV3 Firmware Loader")
|
||||
title = QLabel("⚡ Mira Firmware Loader")
|
||||
title.setObjectName("title")
|
||||
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
layout.addWidget(title)
|
||||
|
||||
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v1.0.0")
|
||||
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v1.1.0")
|
||||
copyright_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
copyright_label.setStyleSheet(
|
||||
"color: #9399b2; font-size: 11px; font-weight: 500; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto;"
|
||||
@@ -593,11 +133,11 @@ class App(QWidget):
|
||||
self.net_input.setPlaceholderText("e.g. 192.168.4.0/24")
|
||||
net_row.addWidget(self.net_input, 1)
|
||||
|
||||
btn_scan = QPushButton("🔍 Scan LAN")
|
||||
btn_scan.setObjectName("scan")
|
||||
btn_scan.clicked.connect(self.scan)
|
||||
btn_scan.setFixedWidth(110)
|
||||
net_row.addWidget(btn_scan)
|
||||
self.btn_scan = QPushButton("🔍 Scan LAN")
|
||||
self.btn_scan.setObjectName("scan")
|
||||
self.btn_scan.clicked.connect(self.scan)
|
||||
self.btn_scan.setFixedWidth(110)
|
||||
net_row.addWidget(self.btn_scan)
|
||||
scan_layout.addLayout(net_row)
|
||||
|
||||
self.scan_progress_bar = QProgressBar()
|
||||
@@ -621,14 +161,20 @@ class App(QWidget):
|
||||
dev_layout.setContentsMargins(4, 4, 4, 4)
|
||||
|
||||
self.table = QTableWidget()
|
||||
self.table.setColumnCount(5)
|
||||
self.table.setHorizontalHeaderLabels(["", "IP", "Name", "MAC", "Status"])
|
||||
self.table.setColumnCount(4)
|
||||
self.table.setHorizontalHeaderLabels(["", "IP", "MAC", "Status"])
|
||||
self.table.setAlternatingRowColors(True)
|
||||
header = self.table.horizontalHeader()
|
||||
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(0, 40)
|
||||
for col in range(1, 5):
|
||||
header.setSectionResizeMode(col, QHeaderView.ResizeMode.Stretch)
|
||||
|
||||
# IP and MAC can be fixed/stretch, Status to stretch to cover full info
|
||||
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Interactive)
|
||||
header.resizeSection(1, 120)
|
||||
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive)
|
||||
header.resizeSection(2, 140)
|
||||
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
|
||||
|
||||
self.table.verticalHeader().setVisible(False)
|
||||
self.table.setSelectionBehavior(
|
||||
QTableWidget.SelectionBehavior.SelectRows
|
||||
@@ -641,6 +187,11 @@ class App(QWidget):
|
||||
filter_row.addWidget(self.device_count_label)
|
||||
filter_row.addStretch()
|
||||
|
||||
btn_history = QPushButton("📋 Flash History")
|
||||
btn_history.clicked.connect(self._show_history)
|
||||
btn_history.setStyleSheet("background-color: #313244; color: #cdd6f4;")
|
||||
filter_row.addWidget(btn_history)
|
||||
|
||||
btn_select_all = QPushButton("☑ Select All")
|
||||
btn_select_all.clicked.connect(self._select_all_devices)
|
||||
filter_row.addWidget(btn_select_all)
|
||||
@@ -659,30 +210,137 @@ class App(QWidget):
|
||||
layout.addWidget(dev_group, stretch=1)
|
||||
|
||||
# ── Flash Controls ──
|
||||
flash_group = QGroupBox("🚀 Flash")
|
||||
flash_group = QGroupBox("🚀 Flash Controls")
|
||||
flash_layout = QVBoxLayout()
|
||||
flash_layout.setSpacing(4)
|
||||
flash_layout.setContentsMargins(4, 4, 4, 4)
|
||||
flash_layout.setSpacing(12)
|
||||
flash_layout.setContentsMargins(10, 15, 10, 12)
|
||||
|
||||
self.progress = QProgressBar()
|
||||
self.progress.setFormat("%v / %m devices (%p%)")
|
||||
self.progress.setMinimumHeight(22)
|
||||
flash_layout.addWidget(self.progress)
|
||||
|
||||
# Mode selector row (Nạp mới vs Update)
|
||||
mode_row = QHBoxLayout()
|
||||
mode_lbl = QLabel("Flash Mode:")
|
||||
mode_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
|
||||
mode_row.addWidget(mode_lbl)
|
||||
|
||||
self.mode_combo = QComboBox()
|
||||
self.mode_combo.addItem("⚡ New Flash (Raw / Factory Reset)", "new")
|
||||
self.mode_combo.addItem("🔄 Update Firmware (Pre-installed)", "update")
|
||||
self.mode_combo.setMinimumWidth(380)
|
||||
self.mode_combo.setMinimumHeight(35)
|
||||
self.mode_combo.setStyleSheet("""
|
||||
QComboBox {
|
||||
background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 4px;
|
||||
padding: 4px 10px; color: #ffffff; font-size: 14px; font-weight: bold;
|
||||
}
|
||||
""")
|
||||
self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
|
||||
mode_row.addWidget(self.mode_combo)
|
||||
mode_row.addStretch()
|
||||
flash_layout.addLayout(mode_row)
|
||||
|
||||
# Container cho cấu hình tuỳ chọn "Nạp Mới FW"
|
||||
self.method_container = QWidget()
|
||||
method_layout = QVBoxLayout(self.method_container)
|
||||
method_layout.setContentsMargins(0, 0, 0, 0)
|
||||
method_layout.setSpacing(10)
|
||||
|
||||
# Method selector row
|
||||
method_row = QHBoxLayout()
|
||||
method_lbl = QLabel("Method:")
|
||||
method_lbl.setStyleSheet("font-size: 13px; font-weight: bold;")
|
||||
method_row.addWidget(method_lbl)
|
||||
|
||||
self.method_combo = QComboBox()
|
||||
self.method_combo.addItem("🌐 API (LuCI - Recommended)", "api")
|
||||
self.method_combo.addItem("💻 SSH (paramiko/scp)", "ssh")
|
||||
self.method_combo.setMinimumWidth(220)
|
||||
self.method_combo.setMinimumHeight(32)
|
||||
self.method_combo.setStyleSheet("""
|
||||
QComboBox {
|
||||
background-color: #1e1e2e; border: 1px solid #3d4a6b; border-radius: 4px;
|
||||
padding: 4px 10px; color: #ffffff; font-size: 13px; font-weight: bold;
|
||||
}
|
||||
""")
|
||||
self.method_combo.currentIndexChanged.connect(self._on_method_changed)
|
||||
method_row.addWidget(self.method_combo)
|
||||
method_row.addStretch()
|
||||
method_layout.addLayout(method_row)
|
||||
|
||||
# SSH credentials (hidden by default)
|
||||
self.ssh_creds_widget = QWidget()
|
||||
self.ssh_creds_widget.setStyleSheet("background-color: #11121d; border-radius: 6px; border: 1px dashed #3d4a6b;")
|
||||
ssh_creds_layout = QVBoxLayout(self.ssh_creds_widget)
|
||||
ssh_creds_layout.setContentsMargins(15, 12, 15, 12)
|
||||
ssh_creds_layout.setSpacing(10)
|
||||
|
||||
ssh_row1 = QHBoxLayout()
|
||||
ssh_lbl1 = QLabel("SSH User:")
|
||||
ssh_lbl1.setStyleSheet("font-size: 12px; font-weight: bold;")
|
||||
ssh_row1.addWidget(ssh_lbl1)
|
||||
self.ssh_user_input = QLineEdit("root")
|
||||
self.ssh_user_input.setFixedWidth(130)
|
||||
str_qlineedit = "QLineEdit { background-color: #1a1b2e; font-size: 13px; padding: 4px; border: 1px solid #3d4a6b; color: #ffffff; }"
|
||||
self.ssh_user_input.setStyleSheet(str_qlineedit)
|
||||
ssh_row1.addWidget(self.ssh_user_input)
|
||||
|
||||
ssh_row1.addSpacing(25)
|
||||
|
||||
ssh_lbl2 = QLabel("SSH Password:")
|
||||
ssh_lbl2.setStyleSheet("font-size: 12px; font-weight: bold;")
|
||||
ssh_row1.addWidget(ssh_lbl2)
|
||||
self.ssh_pass_input = QLineEdit("admin123a")
|
||||
self.ssh_pass_input.setEchoMode(QLineEdit.EchoMode.Password)
|
||||
self.ssh_pass_input.setFixedWidth(130)
|
||||
self.ssh_pass_input.setStyleSheet(str_qlineedit)
|
||||
ssh_row1.addWidget(self.ssh_pass_input)
|
||||
ssh_row1.addStretch()
|
||||
ssh_creds_layout.addLayout(ssh_row1)
|
||||
|
||||
self.set_passwd_cb = QCheckBox("Set password before flash (passwd → admin123a)")
|
||||
self.set_passwd_cb.setChecked(True)
|
||||
self.set_passwd_cb.setStyleSheet("color: #94a3b8; font-size: 12px; font-weight: bold;")
|
||||
ssh_creds_layout.addWidget(self.set_passwd_cb)
|
||||
|
||||
self.ssh_creds_widget.setVisible(False)
|
||||
method_layout.addWidget(self.ssh_creds_widget)
|
||||
|
||||
flash_layout.addWidget(self.method_container)
|
||||
|
||||
# Warning UI for Update Mode
|
||||
self.update_warning_lbl = QLabel("⚠️ FW Update Mode: Forced to SSH (root/admin123a). Target IP [192.168.11.102]. Other IPs require confirmation.")
|
||||
self.update_warning_lbl.setStyleSheet("color: #f38ba8; font-size: 13px; font-weight: bold; padding: 8px; border: 1px dotted #f38ba8;")
|
||||
self.update_warning_lbl.setWordWrap(True)
|
||||
self.update_warning_lbl.setVisible(False)
|
||||
flash_layout.addWidget(self.update_warning_lbl)
|
||||
|
||||
# Parallel count row
|
||||
parallel_row = QHBoxLayout()
|
||||
parallel_row.addWidget(QLabel("Concurrent devices:"))
|
||||
parallel_lbl = QLabel("Concurrent devices:")
|
||||
parallel_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
|
||||
parallel_row.addWidget(parallel_lbl)
|
||||
|
||||
self.parallel_spin = QSpinBox()
|
||||
self.parallel_spin.setRange(0, 100)
|
||||
self.parallel_spin.setValue(10)
|
||||
self.parallel_spin.setSpecialValueText("Unlimited")
|
||||
self.parallel_spin.setSpecialValueText("0 (Unlimited)")
|
||||
self.parallel_spin.setToolTip("0 = unlimited (all devices at once)")
|
||||
self.parallel_spin.setFixedWidth(80)
|
||||
self.parallel_spin.setMinimumWidth(160)
|
||||
self.parallel_spin.setMinimumHeight(35)
|
||||
self.parallel_spin.setStyleSheet("""
|
||||
QSpinBox { font-size: 15px; font-weight: bold; text-align: center; }
|
||||
""")
|
||||
parallel_row.addWidget(self.parallel_spin)
|
||||
parallel_row.addStretch()
|
||||
flash_layout.addLayout(parallel_row)
|
||||
|
||||
btn_flash = QPushButton("⚡ Flash Selected Devices")
|
||||
btn_flash = QPushButton("⚡ FLASH SELECTED DEVICES")
|
||||
btn_flash.setObjectName("flash")
|
||||
btn_flash.setMinimumHeight(45)
|
||||
btn_flash.setStyleSheet("QPushButton#flash { font-size: 16px; font-weight: bold; letter-spacing: 1px; }")
|
||||
btn_flash.clicked.connect(self.flash_all)
|
||||
flash_layout.addWidget(btn_flash)
|
||||
|
||||
@@ -724,27 +382,42 @@ class App(QWidget):
|
||||
self.table.setRowCount(len(self.devices))
|
||||
|
||||
for i, d in enumerate(self.devices):
|
||||
mac_str = d["mac"].upper()
|
||||
|
||||
# Checkbox column
|
||||
cb_item = QTableWidgetItem()
|
||||
cb_item.setCheckState(Qt.CheckState.Checked)
|
||||
|
||||
# If device MAC is already flashed, mark it
|
||||
is_already_flashed = mac_str in self.flashed_macs
|
||||
if is_already_flashed and d["status"] in ["READY", ""]:
|
||||
d["status"] = "Already Flashed"
|
||||
|
||||
# Checkbox logic
|
||||
if is_already_flashed:
|
||||
cb_item.setCheckState(Qt.CheckState.Unchecked)
|
||||
else:
|
||||
cb_item.setCheckState(Qt.CheckState.Checked)
|
||||
|
||||
cb_item.setFlags(cb_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
|
||||
self.table.setItem(i, 0, cb_item)
|
||||
|
||||
ip_item = QTableWidgetItem(d["ip"])
|
||||
name_item = QTableWidgetItem(d.get("name", ""))
|
||||
mac_item = QTableWidgetItem(d["mac"].upper())
|
||||
mac_item = QTableWidgetItem(mac_str)
|
||||
status_item = QTableWidgetItem(d["status"])
|
||||
|
||||
if is_already_flashed:
|
||||
status_item.setForeground(QColor("#a6e3a1"))
|
||||
ip_item.setForeground(QColor("#a6e3a1"))
|
||||
|
||||
# Dim gateway & self if showing all
|
||||
if d["ip"] in {self.local_ip, self.gateway_ip}:
|
||||
for item in [ip_item, name_item, mac_item, status_item]:
|
||||
for item in [ip_item, mac_item, status_item]:
|
||||
item.setForeground(QColor("#4a5568"))
|
||||
cb_item.setCheckState(Qt.CheckState.Unchecked)
|
||||
|
||||
self.table.setItem(i, 1, ip_item)
|
||||
self.table.setItem(i, 2, name_item)
|
||||
self.table.setItem(i, 3, mac_item)
|
||||
self.table.setItem(i, 4, status_item)
|
||||
self.table.setItem(i, 2, mac_item)
|
||||
self.table.setItem(i, 3, status_item)
|
||||
|
||||
total = len(self.devices)
|
||||
hidden = len(self.all_devices) - total
|
||||
@@ -756,9 +429,12 @@ class App(QWidget):
|
||||
def _select_all_devices(self):
|
||||
"""Check all device checkboxes."""
|
||||
for i in range(self.table.rowCount()):
|
||||
item = self.table.item(i, 0)
|
||||
if item:
|
||||
item.setCheckState(Qt.CheckState.Checked)
|
||||
mac = self.table.item(i, 2).text().strip()
|
||||
# Prevent checking if already flashed or is a gateway (optional but good UI logic)
|
||||
if mac not in self.flashed_macs:
|
||||
item = self.table.item(i, 0)
|
||||
if item:
|
||||
item.setCheckState(Qt.CheckState.Checked)
|
||||
|
||||
def _deselect_all_devices(self):
|
||||
"""Uncheck all device checkboxes."""
|
||||
@@ -766,6 +442,21 @@ class App(QWidget):
|
||||
item = self.table.item(i, 0)
|
||||
if item:
|
||||
item.setCheckState(Qt.CheckState.Unchecked)
|
||||
|
||||
def _show_history(self):
|
||||
"""Show list of successfully flashed MACs this session."""
|
||||
if not self.flashed_macs:
|
||||
QMessageBox.information(self, "Flash History", "No successful flashes during this session.")
|
||||
else:
|
||||
# Sort by MAC and format with timestamp
|
||||
history_lines = []
|
||||
for mac in sorted(self.flashed_macs.keys()):
|
||||
time_str = self.flashed_macs[mac]
|
||||
history_lines.append(f"[{time_str}] {mac}")
|
||||
|
||||
macs_str = "\n".join(history_lines)
|
||||
msg = f"Successfully flashed devices in this session ({len(self.flashed_macs)}):\n\n{macs_str}"
|
||||
QMessageBox.information(self, "Flash History", msg)
|
||||
|
||||
# ── Actions ──
|
||||
|
||||
@@ -801,6 +492,7 @@ class App(QWidget):
|
||||
self.scan_progress_bar.setRange(0, 0)
|
||||
self.scan_progress_bar.setFormat(" Starting...")
|
||||
self.scan_progress_bar.setVisible(True)
|
||||
self.btn_scan.setEnabled(False)
|
||||
|
||||
self.scan_thread = ScanThread(network)
|
||||
self.scan_thread.finished.connect(self._on_scan_done)
|
||||
@@ -811,6 +503,7 @@ class App(QWidget):
|
||||
|
||||
def _on_scan_done(self, results):
|
||||
self.scan_progress_bar.setVisible(False)
|
||||
self.btn_scan.setEnabled(True)
|
||||
self.all_devices = []
|
||||
|
||||
for dev in results:
|
||||
@@ -853,12 +546,29 @@ class App(QWidget):
|
||||
|
||||
def _on_scan_error(self, error_msg):
|
||||
self.scan_progress_bar.setVisible(False)
|
||||
self.btn_scan.setEnabled(True)
|
||||
self.scan_status.setText("❌ Scan failed")
|
||||
self.scan_status.setStyleSheet("color: #f38ba8;")
|
||||
QMessageBox.critical(
|
||||
self, "Scan Error", f"Failed to scan network:\n{error_msg}"
|
||||
)
|
||||
|
||||
def _on_mode_changed(self, index):
|
||||
"""Show/hide method based on selected mode."""
|
||||
mode = self.mode_combo.currentData()
|
||||
if mode == "update":
|
||||
self.method_container.setVisible(False)
|
||||
self.update_warning_lbl.setVisible(True)
|
||||
else:
|
||||
self.method_container.setVisible(True)
|
||||
self.update_warning_lbl.setVisible(False)
|
||||
self._on_method_changed(self.method_combo.currentIndex())
|
||||
|
||||
def _on_method_changed(self, index):
|
||||
"""Show/hide SSH credentials based on selected method."""
|
||||
method = self.method_combo.currentData()
|
||||
self.ssh_creds_widget.setVisible(method == "ssh")
|
||||
|
||||
def _get_selected_devices(self):
|
||||
"""Return list of (table_row_index, device_dict) for checked devices."""
|
||||
selected = []
|
||||
@@ -905,10 +615,44 @@ class App(QWidget):
|
||||
self._flash_row_map[idx] = row
|
||||
flash_devices.append(dev)
|
||||
|
||||
# Determine flash method and SSH credentials
|
||||
mode = self.mode_combo.currentData()
|
||||
|
||||
if mode == "update":
|
||||
# Hỏi xác nhận nếu có device nào khác 192.168.11.102
|
||||
strange_ips = [dev["ip"] for _, dev in selected if dev["ip"] != "192.168.11.102"]
|
||||
if strange_ips:
|
||||
msg = (f"⚠️ Detected IP(s) other than [192.168.11.102] in your selection:\n"
|
||||
f"{', '.join(strange_ips[:3])}{'...' if len(strange_ips) > 3 else ''}\n\n"
|
||||
f"Are you SURE you want to update firmware on these devices?")
|
||||
reply = QMessageBox.question(
|
||||
self, "Confirm Update Target", msg,
|
||||
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
|
||||
QMessageBox.StandardButton.No
|
||||
)
|
||||
if reply == QMessageBox.StandardButton.No:
|
||||
return
|
||||
method = "ssh"
|
||||
ssh_user = "root"
|
||||
ssh_password = "admin123a"
|
||||
ssh_backup_password = "admin"
|
||||
set_passwd = False
|
||||
else:
|
||||
method = self.method_combo.currentData()
|
||||
ssh_user = self.ssh_user_input.text().strip() or "root"
|
||||
ssh_password = self.ssh_pass_input.text() or "admin123a"
|
||||
ssh_backup_password = "admin123a"
|
||||
set_passwd = self.set_passwd_cb.isChecked() if method == "ssh" else False
|
||||
|
||||
# Run flashing in background thread so UI doesn't freeze
|
||||
self.flash_thread = FlashThread(
|
||||
flash_devices, self.firmware,
|
||||
max_workers=self.parallel_spin.value()
|
||||
max_workers=self.parallel_spin.value(),
|
||||
method=method,
|
||||
ssh_user=ssh_user,
|
||||
ssh_password=ssh_password,
|
||||
ssh_backup_password=ssh_backup_password,
|
||||
set_passwd=set_passwd
|
||||
)
|
||||
self.flash_thread.device_status.connect(self._on_flash_status)
|
||||
self.flash_thread.device_done.connect(self._on_flash_done)
|
||||
@@ -918,7 +662,7 @@ class App(QWidget):
|
||||
def _on_flash_status(self, index, msg):
|
||||
"""Update status column while flashing."""
|
||||
row = self._flash_row_map.get(index, index)
|
||||
self.table.setItem(row, 4, QTableWidgetItem(f"⏳ {msg}"))
|
||||
self.table.setItem(row, 3, QTableWidgetItem(f"⏳ {msg}"))
|
||||
|
||||
def _on_flash_done(self, index, result):
|
||||
"""One device finished flashing."""
|
||||
@@ -926,6 +670,14 @@ class App(QWidget):
|
||||
if result.startswith("DONE"):
|
||||
item = QTableWidgetItem(f"✅ {result}")
|
||||
item.setForeground(QColor("#a6e3a1"))
|
||||
|
||||
# Save MAC to history with current timestamp
|
||||
mac_item = self.table.item(row, 2)
|
||||
if mac_item:
|
||||
import datetime
|
||||
now_str = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
self.flashed_macs[mac_item.text().strip()] = now_str
|
||||
|
||||
# Auto-uncheck so it won't be flashed again
|
||||
cb = self.table.item(row, 0)
|
||||
if cb:
|
||||
@@ -933,7 +685,7 @@ class App(QWidget):
|
||||
else:
|
||||
item = QTableWidgetItem(f"❌ {result}")
|
||||
item.setForeground(QColor("#f38ba8"))
|
||||
self.table.setItem(row, 4, item)
|
||||
self.table.setItem(row, 3, item)
|
||||
self.progress.setValue(self.progress.value() + 1)
|
||||
|
||||
def _on_flash_all_done(self):
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
PyQt6
|
||||
scapy
|
||||
requests
|
||||
paramiko
|
||||
scp
|
||||
|
||||
11
run.bat
11
run.bat
@@ -3,14 +3,15 @@ REM IoT Firmware Loader - Windows launcher
|
||||
|
||||
cd /d "%~dp0"
|
||||
|
||||
if exist "venv" (
|
||||
call venv\Scripts\activate.bat
|
||||
) else (
|
||||
if not exist "venv" (
|
||||
echo Creating virtual environment...
|
||||
python -m venv venv
|
||||
call venv\Scripts\activate.bat
|
||||
pip install PyQt6 scapy requests
|
||||
)
|
||||
|
||||
call venv\Scripts\activate.bat
|
||||
|
||||
echo Checking and installing required packages...
|
||||
pip install -r requirements.txt --quiet
|
||||
|
||||
echo Starting IoT Firmware Loader...
|
||||
python main.py
|
||||
|
||||
64
ui/components.py
Normal file
64
ui/components.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from PyQt6.QtWidgets import QGroupBox, QWidget, QVBoxLayout
|
||||
from PyQt6.QtCore import QPropertyAnimation
|
||||
|
||||
class CollapsibleGroupBox(QGroupBox):
|
||||
def __init__(self, title="", parent=None):
|
||||
super().__init__(title, parent)
|
||||
self.setCheckable(True)
|
||||
self.setChecked(True)
|
||||
|
||||
self.animation = QPropertyAnimation(self, b"maximumHeight")
|
||||
self.animation.setDuration(200)
|
||||
|
||||
# Connect the toggled signal to our animation function
|
||||
self.toggled.connect(self._toggle_animation)
|
||||
|
||||
self._full_height = 0
|
||||
|
||||
def set_content_layout(self, layout):
|
||||
# We need a wrapper widget to hold the layout
|
||||
self.content_widget = QWidget()
|
||||
self.content_widget.setStyleSheet("background-color: transparent;")
|
||||
self.content_widget.setLayout(layout)
|
||||
|
||||
main_layout = QVBoxLayout()
|
||||
main_layout.setContentsMargins(0, 0, 0, 0)
|
||||
main_layout.addWidget(self.content_widget)
|
||||
self.setLayout(main_layout)
|
||||
|
||||
def _toggle_animation(self, checked):
|
||||
if not hasattr(self, 'content_widget'):
|
||||
return
|
||||
|
||||
if checked:
|
||||
# Expand: show content first, then animate
|
||||
self.content_widget.setVisible(True)
|
||||
target_height = self.sizeHint().height()
|
||||
self.animation.stop()
|
||||
self.animation.setStartValue(self.height())
|
||||
self.animation.setEndValue(target_height)
|
||||
self.animation.finished.connect(self._on_expand_finished)
|
||||
self.animation.start()
|
||||
else:
|
||||
# Collapse
|
||||
self.animation.stop()
|
||||
self.animation.setStartValue(self.height())
|
||||
self.animation.setEndValue(32)
|
||||
self.animation.finished.connect(self._on_collapse_finished)
|
||||
self.animation.start()
|
||||
|
||||
def _on_expand_finished(self):
|
||||
# Remove height constraint so content can grow dynamically
|
||||
self.setMaximumHeight(16777215)
|
||||
try:
|
||||
self.animation.finished.disconnect(self._on_expand_finished)
|
||||
except TypeError:
|
||||
pass
|
||||
|
||||
def _on_collapse_finished(self):
|
||||
if not self.isChecked():
|
||||
self.content_widget.setVisible(False)
|
||||
try:
|
||||
self.animation.finished.disconnect(self._on_collapse_finished)
|
||||
except TypeError:
|
||||
pass
|
||||
275
ui/styles.py
Normal file
275
ui/styles.py
Normal file
@@ -0,0 +1,275 @@
|
||||
STYLE = """
|
||||
QWidget {
|
||||
background-color: #1a1b2e;
|
||||
color: #e2e8f0;
|
||||
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
QGroupBox {
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
margin-top: 10px;
|
||||
padding: 20px 8px 6px 8px;
|
||||
font-weight: bold;
|
||||
color: #7eb8f7;
|
||||
background-color: #1e2035;
|
||||
}
|
||||
|
||||
QGroupBox::title {
|
||||
subcontrol-origin: margin;
|
||||
subcontrol-position: top left;
|
||||
left: 14px;
|
||||
top: 5px;
|
||||
padding: 0px 8px;
|
||||
background-color: transparent;
|
||||
}
|
||||
|
||||
QGroupBox::indicator {
|
||||
width: 14px;
|
||||
height: 14px;
|
||||
border-radius: 4px;
|
||||
border: 1px solid #3d4a6b;
|
||||
background-color: #13141f;
|
||||
margin-top: 5px;
|
||||
}
|
||||
|
||||
QGroupBox::indicator:unchecked {
|
||||
background-color: #13141f;
|
||||
}
|
||||
|
||||
QGroupBox::indicator:checked {
|
||||
background-color: #3b82f6;
|
||||
border-color: #3b82f6;
|
||||
}
|
||||
|
||||
QLabel#title {
|
||||
font-size: 16px;
|
||||
font-weight: bold;
|
||||
color: #7eb8f7;
|
||||
letter-spacing: 1px;
|
||||
}
|
||||
|
||||
QLabel#info {
|
||||
color: #94a3b8;
|
||||
font-size: 11px;
|
||||
}
|
||||
|
||||
QPushButton {
|
||||
background-color: #2d3352;
|
||||
border: 1px solid #3d4a6b;
|
||||
border-radius: 6px;
|
||||
padding: 4px 12px;
|
||||
color: #e2e8f0;
|
||||
font-weight: 600;
|
||||
min-height: 24px;
|
||||
}
|
||||
|
||||
QPushButton:hover {
|
||||
background-color: #3d4a6b;
|
||||
border-color: #7eb8f7;
|
||||
color: #ffffff;
|
||||
}
|
||||
|
||||
QPushButton:pressed {
|
||||
background-color: #1a2040;
|
||||
}
|
||||
|
||||
QPushButton#scan {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #1a56db, stop:1 #1e66f5);
|
||||
border-color: #1a56db;
|
||||
color: #ffffff;
|
||||
}
|
||||
|
||||
QPushButton#scan:hover {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #2563eb, stop:1 #3b82f6);
|
||||
}
|
||||
|
||||
QPushButton#flash {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #15803d, stop:1 #16a34a);
|
||||
border-color: #15803d;
|
||||
color: #ffffff;
|
||||
font-size: 13px;
|
||||
min-height: 30px;
|
||||
}
|
||||
|
||||
QPushButton#flash:hover {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #16a34a, stop:1 #22c55e);
|
||||
}
|
||||
|
||||
QTableWidget {
|
||||
background-color: #13141f;
|
||||
alternate-background-color: #1a1b2e;
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
gridline-color: #2d3748;
|
||||
selection-background-color: #2d3a5a;
|
||||
selection-color: #e2e8f0;
|
||||
}
|
||||
|
||||
QTableWidget::item {
|
||||
padding: 2px 6px;
|
||||
border: none;
|
||||
}
|
||||
|
||||
QTableWidget::item:selected {
|
||||
background-color: #2d3a5a;
|
||||
color: #7eb8f7;
|
||||
}
|
||||
|
||||
QHeaderView::section {
|
||||
background-color: #1e2035;
|
||||
color: #7eb8f7;
|
||||
border: none;
|
||||
border-bottom: 2px solid #3b82f6;
|
||||
border-right: 1px solid #2d3748;
|
||||
padding: 4px 6px;
|
||||
font-weight: bold;
|
||||
font-size: 11px;
|
||||
letter-spacing: 0.5px;
|
||||
text-transform: uppercase;
|
||||
}
|
||||
|
||||
QHeaderView::section:last {
|
||||
border-right: none;
|
||||
}
|
||||
|
||||
QProgressBar {
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 6px;
|
||||
text-align: center;
|
||||
background-color: #13141f;
|
||||
color: #e2e8f0;
|
||||
height: 20px;
|
||||
font-size: 11px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
QProgressBar::chunk {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #3b82f6, stop:1 #7eb8f7);
|
||||
border-radius: 7px;
|
||||
}
|
||||
|
||||
QProgressBar#scan_bar {
|
||||
border: 1px solid #374151;
|
||||
border-radius: 5px;
|
||||
text-align: center;
|
||||
background-color: #13141f;
|
||||
color: #fbbf24;
|
||||
height: 16px;
|
||||
font-size: 10px;
|
||||
font-weight: 600;
|
||||
}
|
||||
|
||||
QProgressBar#scan_bar::chunk {
|
||||
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
||||
stop:0 #d97706, stop:1 #fbbf24);
|
||||
border-radius: 4px;
|
||||
}
|
||||
|
||||
QLineEdit {
|
||||
background-color: #13141f;
|
||||
border: 1px solid #2d3748;
|
||||
border-radius: 8px;
|
||||
padding: 7px 12px;
|
||||
color: #e2e8f0;
|
||||
selection-background-color: #2d3a5a;
|
||||
}
|
||||
|
||||
QLineEdit:focus {
|
||||
border-color: #3b82f6;
|
||||
background-color: #161727;
|
||||
}
|
||||
|
||||
QScrollBar:vertical {
|
||||
background: #13141f;
|
||||
width: 10px;
|
||||
border-radius: 5px;
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical {
|
||||
background: #3d4a6b;
|
||||
border-radius: 5px;
|
||||
min-height: 30px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical:hover {
|
||||
background: #7eb8f7;
|
||||
}
|
||||
|
||||
QScrollBar::handle:vertical:pressed {
|
||||
background: #3b82f6;
|
||||
}
|
||||
|
||||
QScrollBar::add-line:vertical,
|
||||
QScrollBar::sub-line:vertical {
|
||||
height: 0px;
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar::add-page:vertical,
|
||||
QScrollBar::sub-page:vertical {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar:horizontal {
|
||||
background: #13141f;
|
||||
height: 10px;
|
||||
border-radius: 5px;
|
||||
margin: 2px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal {
|
||||
background: #3d4a6b;
|
||||
border-radius: 5px;
|
||||
min-width: 30px;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal:hover {
|
||||
background: #7eb8f7;
|
||||
}
|
||||
|
||||
QScrollBar::handle:horizontal:pressed {
|
||||
background: #3b82f6;
|
||||
}
|
||||
|
||||
QScrollBar::add-line:horizontal,
|
||||
QScrollBar::sub-line:horizontal {
|
||||
width: 0px;
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QScrollBar::add-page:horizontal,
|
||||
QScrollBar::sub-page:horizontal {
|
||||
background: transparent;
|
||||
}
|
||||
|
||||
QCheckBox {
|
||||
spacing: 6px;
|
||||
color: #94a3b8;
|
||||
font-size: 12px;
|
||||
}
|
||||
|
||||
QCheckBox::indicator {
|
||||
width: 16px;
|
||||
height: 16px;
|
||||
border-radius: 4px;
|
||||
border: 1px solid #3d4a6b;
|
||||
background-color: #13141f;
|
||||
}
|
||||
|
||||
QCheckBox::indicator:checked {
|
||||
background-color: #3b82f6;
|
||||
border-color: #3b82f6;
|
||||
}
|
||||
|
||||
QCheckBox::indicator:hover {
|
||||
border-color: #7eb8f7;
|
||||
}
|
||||
"""
|
||||
27
utils/network.py
Normal file
27
utils/network.py
Normal file
@@ -0,0 +1,27 @@
|
||||
import socket
|
||||
|
||||
def _resolve_hostname(ip):
|
||||
"""Reverse DNS lookup for a single IP."""
|
||||
try:
|
||||
return socket.gethostbyaddr(ip)[0]
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def get_local_ip():
|
||||
"""Get the local IP address of this machine."""
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
s.connect(("8.8.8.8", 80))
|
||||
ip = s.getsockname()[0]
|
||||
s.close()
|
||||
return ip
|
||||
except Exception:
|
||||
return "N/A"
|
||||
|
||||
def get_default_network(ip):
|
||||
"""Guess the /24 network from local IP."""
|
||||
try:
|
||||
parts = ip.split(".")
|
||||
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
|
||||
except Exception:
|
||||
return "192.168.1.0/24"
|
||||
36
utils/system.py
Normal file
36
utils/system.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
import sys
|
||||
import platform
|
||||
import socket
|
||||
from utils.network import get_local_ip
|
||||
|
||||
def resource_path(relative_path):
|
||||
""" Get absolute path to resource, works for dev and for PyInstaller """
|
||||
try:
|
||||
base_path = sys._MEIPASS
|
||||
except Exception:
|
||||
base_path = os.path.abspath(".")
|
||||
return os.path.join(base_path, relative_path)
|
||||
|
||||
def get_machine_info():
|
||||
"""Collect machine info."""
|
||||
hostname = socket.gethostname()
|
||||
local_ip = get_local_ip()
|
||||
os_info = f"{platform.system()} {platform.release()}"
|
||||
mac_addr = "N/A"
|
||||
|
||||
try:
|
||||
import uuid
|
||||
mac = uuid.getnode()
|
||||
mac_addr = ":".join(
|
||||
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return {
|
||||
"hostname": hostname,
|
||||
"ip": local_ip,
|
||||
"os": os_info,
|
||||
"mac": mac_addr,
|
||||
}
|
||||
Reference in New Issue
Block a user