Refactor: Chia nho file main thao thu muc va don dep theo yeu cau

This commit is contained in:
2026-03-08 14:37:27 +07:00
parent ada3440ebc
commit 2c2a78d27c
13 changed files with 532 additions and 773 deletions

Binary file not shown.

View File

@@ -8,11 +8,20 @@ Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng lo
## 📁 Cấu trúc dự án
```
```text
iot_fw_loader/
├── main.py # UI chính (PyQt6) + Điều phối luồng và xử lý đa luồng
├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
├── flasher.py # Upload firmware và tự động hóa qua giao diện OpenWrt LuCI
├── main.py # UI chính (PyQt6)
├── core/ # Các thành phần cốt lõi và xử lý đa luồng
│ ├── workers.py # Quản lý luồng dùng chung (ScanThread, FlashThread)
│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy)
│ ├── flasher.py # Flash firmware và tự động hóa qua OpenWrt LuCI bằng API
│ └── ssh_flasher.py # Load/Update firmware qua đường dẫn SSH
├── ui/ # Các component thiết kế giao diện
│ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox, etc.)
│ └── styles.py # Các cấu hình Stylesheet
├── utils/ # Các hàm helper tiện ích
│ ├── network.py # Các hàm xử lý IP, Hostname
│ └── system.py # Các hàm lấy thông tin máy và resources
├── run.sh # Script khởi chạy (macOS/Linux)
├── run.bat # Script khởi chạy (Windows)
├── build_windows.bat # Script đóng gói thành file .exe độc lập (Windows)
@@ -27,7 +36,7 @@ iot_fw_loader/
- Python 3.9+
- Các thư viện: `PyQt6`, `scapy`, `requests`, `pyinstaller` (để build).
- *Trên Windows:* Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
- _Trên Windows:_ Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng).
### Khởi chạy nhanh (Môi trường Dev)
@@ -46,6 +55,7 @@ run.bat
> Script tự tạo `venv` và cài dependencies nếu chưa có.
### 📦 Build ra file chạy độc lập (.exe) cho Windows
Chạy script sau để tự động đóng gói ứng dụng thành 1 file `.exe` duy nhất (không cần cài Python trên máy đích):
```bat
@@ -58,7 +68,7 @@ File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`.
## 🏗 Kiến trúc hệ thống
```
```text
┌─────────────────────────────────────────────────────────────┐
│ main.py (UI) │
│ ┌───────────┐ ┌───────────┐ ┌────────────────────────────┐ │
@@ -99,7 +109,7 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
1. **Ping Sweep:** Gửi gói tin Ping song song (tối đa 50 threads) để đánh thức thiết bị và điền IP/MAC vào bảng ARP Cache của hệ điều hành.
2. **ARP Table Fallback:** Đọc bảng ARP nội bộ của OS (`arp -a`) bằng Regex. Hoạt động đa nền tảng (Windows/macOS/Linux) mà không cần quyền Admin/Root.
3. **Scapy ARP (Tính năng nâng cao):** Gửi các gói tin ARP Broadcast trực tiếp để đảm bảo bao phủ gap nếu có. Yêu cầu quyền Root trên Linux/macOS hoặc Npcap trên Windows.
*Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị.*
_Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị._
### 2. Giao diện thiết bị (Device Table)
@@ -110,11 +120,11 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát
Từ phiên bản hiện tại, phương thức ESP32 OTA không còn được áp dụng, thay vào đó module `flasher.py` tự động hóa quá trình update của router **OpenWrt (Barrier Breaker 14.07)**:
* **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
* **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
* **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
* **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
* 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
- **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`.
- **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`.
- **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có).
- **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application.
- 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh.
---

100
core/workers.py Normal file
View File

@@ -0,0 +1,100 @@
from PyQt6.QtCore import QThread, pyqtSignal
from concurrent.futures import ThreadPoolExecutor, as_completed
# We need these imports inside workers since they use them
from core.scanner import scan_network
from core.flasher import flash_device
from core.ssh_flasher import flash_device_ssh
from utils.network import _resolve_hostname
class ScanThread(QThread):
"""Run network scan in a background thread so UI doesn't freeze."""
finished = pyqtSignal(list)
error = pyqtSignal(str)
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
stage = pyqtSignal(str) # current scan phase
def __init__(self, network):
super().__init__()
self.network = network
def run(self):
try:
def on_ping_progress(done, total):
self.scan_progress.emit(done, total)
def on_stage(s):
self.stage.emit(s)
results = scan_network(self.network,
progress_cb=on_ping_progress,
stage_cb=on_stage)
# Resolve hostnames in parallel
self.stage.emit("hostname")
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_dev = {
executor.submit(_resolve_hostname, d["ip"]): d
for d in results
}
for future in as_completed(future_to_dev):
dev = future_to_dev[future]
try:
dev["name"] = future.result(timeout=3)
except Exception:
dev["name"] = ""
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class FlashThread(QThread):
"""Run firmware flash in background so UI stays responsive."""
device_status = pyqtSignal(int, str) # index, status message
device_done = pyqtSignal(int, str) # index, result
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10,
method="api", ssh_user="root", ssh_password="admin123a",
set_passwd=False):
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
self.method = method
self.ssh_user = ssh_user
self.ssh_password = ssh_password
self.set_passwd = set_passwd
def run(self):
def _flash_one(i, dev):
try:
def on_status(msg):
self.device_status.emit(i, msg)
if self.method == "ssh":
result = flash_device_ssh(
dev["ip"], self.firmware_path,
user=self.ssh_user,
password=self.ssh_password,
set_passwd=self.set_passwd,
status_cb=on_status
)
else:
result = flash_device(
dev["ip"], self.firmware_path,
status_cb=on_status
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
# Use configured max_workers (0 = unlimited = one per device)
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i, dev in enumerate(self.devices):
futures.append(executor.submit(_flash_one, i, dev))
for f in futures:
f.result()
self.all_done.emit()

View File

@@ -1,205 +0,0 @@
"""
Debug Flash — chạy đầy đủ 3 bước flash và log chi tiết từng bước.
Usage:
python debug_full.py <IP> <firmware.bin>
python debug_full.py 192.168.11.17 V3.0.6p5.bin
"""
import sys
import os
import requests
import re
def debug_flash(ip, firmware_path, username="root", password=""):
base_url = f"http://{ip}"
session = requests.Session()
print(f"{'='*65}")
print(f" Full Flash Debug — {ip}")
print(f" Firmware: {os.path.basename(firmware_path)}")
print(f" Size: {os.path.getsize(firmware_path) / 1024 / 1024:.2f} MB")
print(f"{'='*65}")
# ═══════════════════════════════════
# STEP 1: Login
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 1: Login")
print(f"{''*65}")
login_url = f"{base_url}/cgi-bin/luci"
print(f" → GET {login_url}")
try:
r = session.get(login_url, timeout=10)
print(f" Status: {r.status_code}")
except Exception as e:
print(f" ❌ Cannot connect: {e}")
return
# Detect form fields
if 'name="luci_username"' in r.text:
login_data = {"luci_username": username, "luci_password": password}
print(f" Fields: luci_username / luci_password")
else:
login_data = {"username": username, "password": password}
print(f" Fields: username / password")
print(f"\n → POST {login_url}")
print(f" Data: {login_data}")
resp = session.post(login_url, data=login_data,
timeout=10, allow_redirects=True)
print(f" Status: {resp.status_code}")
print(f" Cookies: {dict(session.cookies)}")
# Extract stok
stok = None
for source in [resp.url, resp.text]:
match = re.search(r";stok=([a-f0-9]+)", source)
if match:
stok = match.group(1)
break
if not stok:
for hist in resp.history:
match = re.search(r";stok=([a-f0-9]+)",
hist.headers.get("Location", ""))
if match:
stok = match.group(1)
break
print(f" stok: {stok}")
has_cookie = "sysauth" in str(session.cookies)
print(f" sysauth: {'✅ YES' if has_cookie else '❌ NO'}")
if not stok and not has_cookie:
print(f"\n ❌ LOGIN FAILED — no stok, no sysauth cookie")
return
print(f"\n ✅ LOGIN OK")
# Build flash URL
if stok:
flash_url = f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops"
else:
flash_url = f"{base_url}/cgi-bin/luci/admin/system/flashops"
# ═══════════════════════════════════
# STEP 2: Upload firmware
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 2: Upload firmware")
print(f"{''*65}")
print(f" → POST {flash_url}")
print(f" File field: image")
print(f" File name: {os.path.basename(firmware_path)}")
print(f" keep: NOT sent (unchecked)")
print(f" Uploading...")
filename = os.path.basename(firmware_path)
with open(firmware_path, "rb") as f:
resp = session.post(
flash_url,
data={}, # No keep = uncheck Keep Settings
files={"image": (filename, f, "application/octet-stream")},
timeout=300,
)
print(f" Status: {resp.status_code}")
print(f" URL: {resp.url}")
# Check response content
resp_lower = resp.text.lower()
has_verify = "verify" in resp_lower
has_proceed = "proceed" in resp_lower
has_checksum = "checksum" in resp_lower
has_image_form = 'name="image"' in resp.text and 'type="file"' in resp.text
print(f"\n Response analysis:")
print(f" Has 'verify': {'' if has_verify else ''}")
print(f" Has 'proceed': {'' if has_proceed else ''}")
print(f" Has 'checksum': {'' if has_checksum else ''}")
print(f" Has flash form: {'⚠️ (upload ignored!)' if has_image_form else '✅ (not flash form)'}")
# Extract checksum from verification page
checksum = re.search(r'Checksum:\s*<code>([a-f0-9]+)</code>', resp.text)
size_info = re.search(r'Size:\s*([\d.]+\s*MB)', resp.text)
if checksum:
print(f" Checksum: {checksum.group(1)}")
if size_info:
print(f" Size: {size_info.group(1)}")
# Check for keep settings status
if "will be erased" in resp.text:
print(f" Config: Will be ERASED ✅ (keep=off)")
elif "will be kept" in resp.text:
print(f" Config: Will be KEPT ⚠️ (keep=on)")
if not has_verify or not has_proceed:
print(f"\n ❌ Did NOT get verification page!")
# Show cleaned response
text = re.sub(r'<[^>]+>', ' ', resp.text)
text = re.sub(r'\s+', ' ', text).strip()
print(f" Response: {text[:500]}")
return
# Show the Proceed form
print(f"\n Proceed form (from HTML):")
forms = re.findall(r'<form[^>]*>(.*?)</form>', resp.text, re.DOTALL)
for i, form_body in enumerate(forms):
if 'value="Proceed"' in form_body or 'step' in form_body:
inputs = re.findall(r'<input[^>]*/?\s*>', form_body)
for inp in inputs:
print(f" {inp.strip()}")
print(f"\n ✅ UPLOAD OK — Verification page received")
# ═══════════════════════════════════
# STEP 3: Proceed (confirm flash)
# ═══════════════════════════════════
print(f"\n{''*65}")
print(f" STEP 3: Proceed (confirm flash)")
print(f"{''*65}")
confirm_data = {
"step": "2",
"keep": "",
}
print(f" → POST {flash_url}")
print(f" Data: {confirm_data}")
try:
resp = session.post(flash_url, data=confirm_data, timeout=300)
print(f" Status: {resp.status_code}")
# Show cleaned response
text = re.sub(r'<[^>]+>', ' ', resp.text)
text = re.sub(r'\s+', ' ', text).strip()
print(f" Response: {text[:300]}")
except requests.ConnectionError:
print(f" ✅ Connection dropped — device is REBOOTING!")
except requests.ReadTimeout:
print(f" ✅ Timeout — device is REBOOTING!")
print(f"\n{'='*65}")
print(f" 🎉 FLASH COMPLETE")
print(f"{'='*65}")
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python debug_full.py <IP> <firmware.bin>")
print("Example: python debug_full.py 192.168.11.17 V3.0.6p5.bin")
sys.exit(1)
ip = sys.argv[1]
fw = sys.argv[2]
if not os.path.exists(fw):
print(f"❌ File not found: {fw}")
sys.exit(1)
debug_flash(ip, fw)

View File

@@ -1,67 +0,0 @@
# python debug_ssh.py 192.168.11.xxx "3.0.6p5.bin" admin123a
import sys
import logging
import paramiko
from ssh_flasher import flash_device_ssh
def main():
print("="*50)
print("🔧 CÔNG CỤ DEBUG SSH FLASH CHO MIRAV3")
print("="*50)
if len(sys.argv) < 3:
print("\nSử dụng: source venv/bin/activate && python debug_ssh.py <IP_THIET_BI> <DUONG_DAN_FIRMWARE> [MAT_KHAU_SSH] [--update]")
print("Ví dụ: python debug_ssh.py 192.168.11.102 ./3.0.6p5.bin admin123a --update")
sys.exit(1)
args = sys.argv[1:]
is_update_mode = "--update" in args
if is_update_mode:
args.remove("--update")
test_ip = args[0]
test_fw = args[1]
test_pass = args[2] if len(args) > 2 else "admin123a"
print(f"\n[+] Thông số đầu vào:")
print(f" - IP Thiết bị : {test_ip}")
print(f" - Firmware : {test_fw}")
print(f" - Pass SSH : {test_pass}")
print(f" - Phân luồng : {'UPDATE FW (set_passwd=False)' if is_update_mode else 'NẠP MỚI FW (set_passwd=True)'}\n")
print("[*] BẬT CHẾ ĐỘ LOG CHI TIẾT CỦA PARAMIKO/SSH...")
# Bật log xuất trực tiếp ra màn hình console để dễ nhìn lỗi
logging.getLogger("paramiko").setLevel(logging.DEBUG)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter('%(asctime)s [SSH_LOG] %(message)s'))
logging.getLogger("paramiko").addHandler(console_handler)
print("\n" + "-"*50)
print("BẮT ĐẦU QUÁ TRÌNH THỰC THI...")
print("-"*50 + "\n")
def print_status(msg):
print(f"\n---> TRẠNG THÁI APP: {msg}")
# Gọi hàm flash (có bật tính năng tự đổi passwd về admin123a trước nếu cần)
try:
result = flash_device_ssh(
ip=test_ip,
firmware_path=test_fw,
password=test_pass,
set_passwd=not is_update_mode,
status_cb=print_status
)
print("\n" + "="*50)
if result == "DONE":
print("🟢 KẾT QUẢ: THÀNH CÔNG (DONE)")
else:
print(f"🔴 KẾT QUẢ: THẤT BẠI - {result}")
print("="*50)
except Exception as e:
print(f"\n🔴 ERROR UNHANDLED EXCEPTION: {e}")
if __name__ == "__main__":
main()

497
main.py
View File

@@ -22,506 +22,25 @@ from PyQt6.QtGui import QFont, QColor, QIcon, QAction
import datetime
from scanner import scan_network
from flasher import flash_device
from ssh_flasher import flash_device_ssh
from core.scanner import scan_network
from core.flasher import flash_device
from core.ssh_flasher import flash_device_ssh
from core.workers import ScanThread, FlashThread
from utils.network import _resolve_hostname, get_default_network
from utils.system import resource_path, get_machine_info
class CollapsibleGroupBox(QGroupBox):
def __init__(self, title="", parent=None):
super().__init__(title, parent)
self.setCheckable(True)
self.setChecked(True)
self.animation = QPropertyAnimation(self, b"maximumHeight")
self.animation.setDuration(200)
# Connect the toggled signal to our animation function
self.toggled.connect(self._toggle_animation)
from ui.components import CollapsibleGroupBox
from ui.styles import STYLE
self._full_height = 0
def set_content_layout(self, layout):
# We need a wrapper widget to hold the layout
self.content_widget = QWidget()
self.content_widget.setStyleSheet("background-color: transparent;")
self.content_widget.setLayout(layout)
main_layout = QVBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(self.content_widget)
self.setLayout(main_layout)
def _toggle_animation(self, checked):
if not hasattr(self, 'content_widget'):
return
if checked:
# Expand: show content first, then animate
self.content_widget.setVisible(True)
target_height = self.sizeHint().height()
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(target_height)
self.animation.finished.connect(self._on_expand_finished)
self.animation.start()
else:
# Collapse
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(32)
self.animation.finished.connect(self._on_collapse_finished)
self.animation.start()
def _on_expand_finished(self):
# Remove height constraint so content can grow dynamically
self.setMaximumHeight(16777215)
try:
self.animation.finished.disconnect(self._on_expand_finished)
except TypeError:
pass
def _on_collapse_finished(self):
if not self.isChecked():
self.content_widget.setVisible(False)
try:
self.animation.finished.disconnect(self._on_collapse_finished)
except TypeError:
pass
def _resolve_hostname(ip):
"""Reverse DNS lookup for a single IP."""
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ""
def get_local_ip():
"""Get the local IP address of this machine."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "N/A"
def get_default_network(ip):
"""Guess the /24 network from local IP."""
try:
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
except Exception:
return "192.168.1.0/24"
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_machine_info():
"""Collect machine info."""
hostname = socket.gethostname()
local_ip = get_local_ip()
os_info = f"{platform.system()} {platform.release()}"
mac_addr = "N/A"
try:
import uuid
mac = uuid.getnode()
mac_addr = ":".join(
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
)
except Exception:
pass
return {
"hostname": hostname,
"ip": local_ip,
"os": os_info,
"mac": mac_addr,
}
# ── Stylesheet ──────────────────────────────────────────────
STYLE = """
QWidget {
background-color: #1a1b2e;
color: #e2e8f0;
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
font-size: 12px;
}
QGroupBox {
border: 1px solid #2d3748;
border-radius: 8px;
margin-top: 10px;
padding: 20px 8px 6px 8px;
font-weight: bold;
color: #7eb8f7;
background-color: #1e2035;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 14px;
top: 5px;
padding: 0px 8px;
background-color: transparent;
}
QGroupBox::indicator {
width: 14px;
height: 14px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
margin-top: 5px;
}
QGroupBox::indicator:unchecked {
background-color: #13141f;
}
QGroupBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QLabel#title {
font-size: 16px;
font-weight: bold;
color: #7eb8f7;
letter-spacing: 1px;
}
QLabel#info {
color: #94a3b8;
font-size: 11px;
}
QPushButton {
background-color: #2d3352;
border: 1px solid #3d4a6b;
border-radius: 6px;
padding: 4px 12px;
color: #e2e8f0;
font-weight: 600;
min-height: 24px;
}
QPushButton:hover {
background-color: #3d4a6b;
border-color: #7eb8f7;
color: #ffffff;
}
QPushButton:pressed {
background-color: #1a2040;
}
QPushButton#scan {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1a56db, stop:1 #1e66f5);
border-color: #1a56db;
color: #ffffff;
}
QPushButton#scan:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #2563eb, stop:1 #3b82f6);
}
QPushButton#flash {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #15803d, stop:1 #16a34a);
border-color: #15803d;
color: #ffffff;
font-size: 13px;
min-height: 30px;
}
QPushButton#flash:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #16a34a, stop:1 #22c55e);
}
QTableWidget {
background-color: #13141f;
alternate-background-color: #1a1b2e;
border: 1px solid #2d3748;
border-radius: 8px;
gridline-color: #2d3748;
selection-background-color: #2d3a5a;
selection-color: #e2e8f0;
}
QTableWidget::item {
padding: 2px 6px;
border: none;
}
QTableWidget::item:selected {
background-color: #2d3a5a;
color: #7eb8f7;
}
QHeaderView::section {
background-color: #1e2035;
color: #7eb8f7;
border: none;
border-bottom: 2px solid #3b82f6;
border-right: 1px solid #2d3748;
padding: 4px 6px;
font-weight: bold;
font-size: 11px;
letter-spacing: 0.5px;
text-transform: uppercase;
}
QHeaderView::section:last {
border-right: none;
}
QProgressBar {
border: 1px solid #2d3748;
border-radius: 6px;
text-align: center;
background-color: #13141f;
color: #e2e8f0;
height: 20px;
font-size: 11px;
font-weight: 600;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #3b82f6, stop:1 #7eb8f7);
border-radius: 7px;
}
QProgressBar#scan_bar {
border: 1px solid #374151;
border-radius: 5px;
text-align: center;
background-color: #13141f;
color: #fbbf24;
height: 16px;
font-size: 10px;
font-weight: 600;
}
QProgressBar#scan_bar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #d97706, stop:1 #fbbf24);
border-radius: 4px;
}
QLineEdit {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 8px;
padding: 7px 12px;
color: #e2e8f0;
selection-background-color: #2d3a5a;
}
QLineEdit:focus {
border-color: #3b82f6;
background-color: #161727;
}
QScrollBar:vertical {
background: #13141f;
width: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:vertical {
background: #3d4a6b;
border-radius: 5px;
min-height: 30px;
}
QScrollBar::handle:vertical:hover {
background: #7eb8f7;
}
QScrollBar::handle:vertical:pressed {
background: #3b82f6;
}
QScrollBar::add-line:vertical,
QScrollBar::sub-line:vertical {
height: 0px;
background: transparent;
}
QScrollBar::add-page:vertical,
QScrollBar::sub-page:vertical {
background: transparent;
}
QScrollBar:horizontal {
background: #13141f;
height: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:horizontal {
background: #3d4a6b;
border-radius: 5px;
min-width: 30px;
}
QScrollBar::handle:horizontal:hover {
background: #7eb8f7;
}
QScrollBar::handle:horizontal:pressed {
background: #3b82f6;
}
QScrollBar::add-line:horizontal,
QScrollBar::sub-line:horizontal {
width: 0px;
background: transparent;
}
QScrollBar::add-page:horizontal,
QScrollBar::sub-page:horizontal {
background: transparent;
}
QCheckBox {
spacing: 6px;
color: #94a3b8;
font-size: 12px;
}
QCheckBox::indicator {
width: 16px;
height: 16px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
}
QCheckBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QCheckBox::indicator:hover {
border-color: #7eb8f7;
}
"""
class ScanThread(QThread):
"""Run network scan in a background thread so UI doesn't freeze."""
finished = pyqtSignal(list)
error = pyqtSignal(str)
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
stage = pyqtSignal(str) # current scan phase
def __init__(self, network):
super().__init__()
self.network = network
def run(self):
try:
def on_ping_progress(done, total):
self.scan_progress.emit(done, total)
def on_stage(s):
self.stage.emit(s)
results = scan_network(self.network,
progress_cb=on_ping_progress,
stage_cb=on_stage)
# Resolve hostnames in parallel
self.stage.emit("hostname")
with ThreadPoolExecutor(max_workers=50) as executor:
future_to_dev = {
executor.submit(_resolve_hostname, d["ip"]): d
for d in results
}
for future in as_completed(future_to_dev):
dev = future_to_dev[future]
try:
dev["name"] = future.result(timeout=3)
except Exception:
dev["name"] = ""
self.finished.emit(results)
except Exception as e:
self.error.emit(str(e))
class FlashThread(QThread):
"""Run firmware flash in background so UI stays responsive."""
device_status = pyqtSignal(int, str) # index, status message
device_done = pyqtSignal(int, str) # index, result
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10,
method="api", ssh_user="root", ssh_password="admin123a",
set_passwd=False):
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
self.method = method
self.ssh_user = ssh_user
self.ssh_password = ssh_password
self.set_passwd = set_passwd
def run(self):
def _flash_one(i, dev):
try:
def on_status(msg):
self.device_status.emit(i, msg)
if self.method == "ssh":
result = flash_device_ssh(
dev["ip"], self.firmware_path,
user=self.ssh_user,
password=self.ssh_password,
set_passwd=self.set_passwd,
status_cb=on_status
)
else:
result = flash_device(
dev["ip"], self.firmware_path,
status_cb=on_status
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
# Use configured max_workers (0 = unlimited = one per device)
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = []
for i, dev in enumerate(self.devices):
futures.append(executor.submit(_flash_one, i, dev))
for f in futures:
f.result()
self.all_done.emit()
class App(QWidget):

64
ui/components.py Normal file
View File

@@ -0,0 +1,64 @@
from PyQt6.QtWidgets import QGroupBox, QWidget, QVBoxLayout
from PyQt6.QtCore import QPropertyAnimation
class CollapsibleGroupBox(QGroupBox):
def __init__(self, title="", parent=None):
super().__init__(title, parent)
self.setCheckable(True)
self.setChecked(True)
self.animation = QPropertyAnimation(self, b"maximumHeight")
self.animation.setDuration(200)
# Connect the toggled signal to our animation function
self.toggled.connect(self._toggle_animation)
self._full_height = 0
def set_content_layout(self, layout):
# We need a wrapper widget to hold the layout
self.content_widget = QWidget()
self.content_widget.setStyleSheet("background-color: transparent;")
self.content_widget.setLayout(layout)
main_layout = QVBoxLayout()
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(self.content_widget)
self.setLayout(main_layout)
def _toggle_animation(self, checked):
if not hasattr(self, 'content_widget'):
return
if checked:
# Expand: show content first, then animate
self.content_widget.setVisible(True)
target_height = self.sizeHint().height()
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(target_height)
self.animation.finished.connect(self._on_expand_finished)
self.animation.start()
else:
# Collapse
self.animation.stop()
self.animation.setStartValue(self.height())
self.animation.setEndValue(32)
self.animation.finished.connect(self._on_collapse_finished)
self.animation.start()
def _on_expand_finished(self):
# Remove height constraint so content can grow dynamically
self.setMaximumHeight(16777215)
try:
self.animation.finished.disconnect(self._on_expand_finished)
except TypeError:
pass
def _on_collapse_finished(self):
if not self.isChecked():
self.content_widget.setVisible(False)
try:
self.animation.finished.disconnect(self._on_collapse_finished)
except TypeError:
pass

275
ui/styles.py Normal file
View File

@@ -0,0 +1,275 @@
STYLE = """
QWidget {
background-color: #1a1b2e;
color: #e2e8f0;
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
font-size: 12px;
}
QGroupBox {
border: 1px solid #2d3748;
border-radius: 8px;
margin-top: 10px;
padding: 20px 8px 6px 8px;
font-weight: bold;
color: #7eb8f7;
background-color: #1e2035;
}
QGroupBox::title {
subcontrol-origin: margin;
subcontrol-position: top left;
left: 14px;
top: 5px;
padding: 0px 8px;
background-color: transparent;
}
QGroupBox::indicator {
width: 14px;
height: 14px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
margin-top: 5px;
}
QGroupBox::indicator:unchecked {
background-color: #13141f;
}
QGroupBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QLabel#title {
font-size: 16px;
font-weight: bold;
color: #7eb8f7;
letter-spacing: 1px;
}
QLabel#info {
color: #94a3b8;
font-size: 11px;
}
QPushButton {
background-color: #2d3352;
border: 1px solid #3d4a6b;
border-radius: 6px;
padding: 4px 12px;
color: #e2e8f0;
font-weight: 600;
min-height: 24px;
}
QPushButton:hover {
background-color: #3d4a6b;
border-color: #7eb8f7;
color: #ffffff;
}
QPushButton:pressed {
background-color: #1a2040;
}
QPushButton#scan {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #1a56db, stop:1 #1e66f5);
border-color: #1a56db;
color: #ffffff;
}
QPushButton#scan:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #2563eb, stop:1 #3b82f6);
}
QPushButton#flash {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #15803d, stop:1 #16a34a);
border-color: #15803d;
color: #ffffff;
font-size: 13px;
min-height: 30px;
}
QPushButton#flash:hover {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #16a34a, stop:1 #22c55e);
}
QTableWidget {
background-color: #13141f;
alternate-background-color: #1a1b2e;
border: 1px solid #2d3748;
border-radius: 8px;
gridline-color: #2d3748;
selection-background-color: #2d3a5a;
selection-color: #e2e8f0;
}
QTableWidget::item {
padding: 2px 6px;
border: none;
}
QTableWidget::item:selected {
background-color: #2d3a5a;
color: #7eb8f7;
}
QHeaderView::section {
background-color: #1e2035;
color: #7eb8f7;
border: none;
border-bottom: 2px solid #3b82f6;
border-right: 1px solid #2d3748;
padding: 4px 6px;
font-weight: bold;
font-size: 11px;
letter-spacing: 0.5px;
text-transform: uppercase;
}
QHeaderView::section:last {
border-right: none;
}
QProgressBar {
border: 1px solid #2d3748;
border-radius: 6px;
text-align: center;
background-color: #13141f;
color: #e2e8f0;
height: 20px;
font-size: 11px;
font-weight: 600;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #3b82f6, stop:1 #7eb8f7);
border-radius: 7px;
}
QProgressBar#scan_bar {
border: 1px solid #374151;
border-radius: 5px;
text-align: center;
background-color: #13141f;
color: #fbbf24;
height: 16px;
font-size: 10px;
font-weight: 600;
}
QProgressBar#scan_bar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
stop:0 #d97706, stop:1 #fbbf24);
border-radius: 4px;
}
QLineEdit {
background-color: #13141f;
border: 1px solid #2d3748;
border-radius: 8px;
padding: 7px 12px;
color: #e2e8f0;
selection-background-color: #2d3a5a;
}
QLineEdit:focus {
border-color: #3b82f6;
background-color: #161727;
}
QScrollBar:vertical {
background: #13141f;
width: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:vertical {
background: #3d4a6b;
border-radius: 5px;
min-height: 30px;
}
QScrollBar::handle:vertical:hover {
background: #7eb8f7;
}
QScrollBar::handle:vertical:pressed {
background: #3b82f6;
}
QScrollBar::add-line:vertical,
QScrollBar::sub-line:vertical {
height: 0px;
background: transparent;
}
QScrollBar::add-page:vertical,
QScrollBar::sub-page:vertical {
background: transparent;
}
QScrollBar:horizontal {
background: #13141f;
height: 10px;
border-radius: 5px;
margin: 2px;
}
QScrollBar::handle:horizontal {
background: #3d4a6b;
border-radius: 5px;
min-width: 30px;
}
QScrollBar::handle:horizontal:hover {
background: #7eb8f7;
}
QScrollBar::handle:horizontal:pressed {
background: #3b82f6;
}
QScrollBar::add-line:horizontal,
QScrollBar::sub-line:horizontal {
width: 0px;
background: transparent;
}
QScrollBar::add-page:horizontal,
QScrollBar::sub-page:horizontal {
background: transparent;
}
QCheckBox {
spacing: 6px;
color: #94a3b8;
font-size: 12px;
}
QCheckBox::indicator {
width: 16px;
height: 16px;
border-radius: 4px;
border: 1px solid #3d4a6b;
background-color: #13141f;
}
QCheckBox::indicator:checked {
background-color: #3b82f6;
border-color: #3b82f6;
}
QCheckBox::indicator:hover {
border-color: #7eb8f7;
}
"""

27
utils/network.py Normal file
View File

@@ -0,0 +1,27 @@
import socket
def _resolve_hostname(ip):
"""Reverse DNS lookup for a single IP."""
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ""
def get_local_ip():
"""Get the local IP address of this machine."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "N/A"
def get_default_network(ip):
"""Guess the /24 network from local IP."""
try:
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
except Exception:
return "192.168.1.0/24"

36
utils/system.py Normal file
View File

@@ -0,0 +1,36 @@
import os
import sys
import platform
import socket
from utils.network import get_local_ip
def resource_path(relative_path):
""" Get absolute path to resource, works for dev and for PyInstaller """
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def get_machine_info():
"""Collect machine info."""
hostname = socket.gethostname()
local_ip = get_local_ip()
os_info = f"{platform.system()} {platform.release()}"
mac_addr = "N/A"
try:
import uuid
mac = uuid.getnode()
mac_addr = ":".join(
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
)
except Exception:
pass
return {
"hostname": hostname,
"ip": local_ip,
"os": os_info,
"mac": mac_addr,
}