diff --git a/MiraV3_Firmware_Loader.exe b/MiraV3_Firmware_Loader.exe deleted file mode 100644 index 906cd33..0000000 Binary files a/MiraV3_Firmware_Loader.exe and /dev/null differ diff --git a/README.md b/README.md index e90a9ef..74d04ed 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,20 @@ Công cụ desktop dùng để **scan, phát hiện và flash firmware hàng lo ## 📁 Cấu trúc dự án -``` +```text iot_fw_loader/ -├── main.py # UI chính (PyQt6) + Điều phối luồng và xử lý đa luồng -├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy) -├── flasher.py # Upload firmware và tự động hóa qua giao diện OpenWrt LuCI +├── main.py # UI chính (PyQt6) +├── core/ # Các thành phần cốt lõi và xử lý đa luồng +│ ├── workers.py # Quản lý luồng dùng chung (ScanThread, FlashThread) +│ ├── scanner.py # Quét thiết bị mạng đa lớp (Ping sweep + ARP + Scapy) +│ ├── flasher.py # Flash firmware và tự động hóa qua OpenWrt LuCI bằng API +│ └── ssh_flasher.py # Load/Update firmware qua đường dẫn SSH +├── ui/ # Các component thiết kế giao diện +│ ├── components.py # Custom Qt Widgets (CollapsibleGroupBox, etc.) +│ └── styles.py # Các cấu hình Stylesheet +├── utils/ # Các hàm helper tiện ích +│ ├── network.py # Các hàm xử lý IP, Hostname +│ └── system.py # Các hàm lấy thông tin máy và resources ├── run.sh # Script khởi chạy (macOS/Linux) ├── run.bat # Script khởi chạy (Windows) ├── build_windows.bat # Script đóng gói thành file .exe độc lập (Windows) @@ -27,7 +36,7 @@ iot_fw_loader/ - Python 3.9+ - Các thư viện: `PyQt6`, `scapy`, `requests`, `pyinstaller` (để build). -- *Trên Windows:* Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng). +- _Trên Windows:_ Cần cài đặt [Npcap](https://npcap.com/) để `scapy` có thể quét ARP ở chế độ sâu (không bắt buộc, có fallback dự phòng). ### Khởi chạy nhanh (Môi trường Dev) @@ -46,6 +55,7 @@ run.bat > Script tự tạo `venv` và cài dependencies nếu chưa có. ### 📦 Build ra file chạy độc lập (.exe) cho Windows + Chạy script sau để tự động đóng gói ứng dụng thành 1 file `.exe` duy nhất (không cần cài Python trên máy đích): ```bat @@ -58,7 +68,7 @@ File nhận được sẽ nằm ở: `dist\IoT_Firmware_Loader.exe`. ## 🏗 Kiến trúc hệ thống -``` +```text ┌─────────────────────────────────────────────────────────────┐ │ main.py (UI) │ │ ┌───────────┐ ┌───────────┐ ┌────────────────────────────┐ │ @@ -99,7 +109,7 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát 1. **Ping Sweep:** Gửi gói tin Ping song song (tối đa 50 threads) để đánh thức thiết bị và điền IP/MAC vào bảng ARP Cache của hệ điều hành. 2. **ARP Table Fallback:** Đọc bảng ARP nội bộ của OS (`arp -a`) bằng Regex. Hoạt động đa nền tảng (Windows/macOS/Linux) mà không cần quyền Admin/Root. 3. **Scapy ARP (Tính năng nâng cao):** Gửi các gói tin ARP Broadcast trực tiếp để đảm bảo bao phủ gap nếu có. Yêu cầu quyền Root trên Linux/macOS hoặc Npcap trên Windows. -*Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị.* + _Ngoài ra, công cụ tự động dò Hostname (`socket.gethostbyaddr`) song song để lấy tên thiết bị._ ### 2. Giao diện thiết bị (Device Table) @@ -110,11 +120,11 @@ Module `scanner.py` sử dụng chiến lược 3 lớp để đảm bảo phát Từ phiên bản hiện tại, phương thức ESP32 OTA không còn được áp dụng, thay vào đó module `flasher.py` tự động hóa quá trình update của router **OpenWrt (Barrier Breaker 14.07)**: -* **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`. -* **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`. -* **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có). -* **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application. -* 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh. +- **Bước 1:** Gửi `POST` chứa thông tin đăng nhập (username, password mặc định là root/trống, hoặc luci_username tuỳ thuộc firmware) để lấy session cookie `stok`. +- **Bước 2:** Đẩy file firmware `*.bin` dạng `multipart/form-data` lên `/cgi-bin/luci/;stok=.../admin/system/flashops`. +- **Bước 3:** Gửi lệnh tiếp tục (Kèm tuỳ chọn giữ cấu hình `keep=on` nếu có). +- **Bước 4:** Thiết bị xác nhận và tự khởi động lại. Cập nhật Status trên Application. +- 🛠 Hỗ trợ `ThreadPoolExecutor` để Flash đồng thời nhiều thiết bị, với lựa chọn số luồng đồng thời tuỳ chỉnh. --- diff --git a/flasher.py b/core/flasher.py similarity index 100% rename from flasher.py rename to core/flasher.py diff --git a/scanner.py b/core/scanner.py similarity index 100% rename from scanner.py rename to core/scanner.py diff --git a/ssh_flasher.py b/core/ssh_flasher.py similarity index 100% rename from ssh_flasher.py rename to core/ssh_flasher.py diff --git a/core/workers.py b/core/workers.py new file mode 100644 index 0000000..b3b36f6 --- /dev/null +++ b/core/workers.py @@ -0,0 +1,100 @@ +from PyQt6.QtCore import QThread, pyqtSignal +from concurrent.futures import ThreadPoolExecutor, as_completed + +# We need these imports inside workers since they use them +from core.scanner import scan_network +from core.flasher import flash_device +from core.ssh_flasher import flash_device_ssh +from utils.network import _resolve_hostname + +class ScanThread(QThread): + """Run network scan in a background thread so UI doesn't freeze.""" + finished = pyqtSignal(list) + error = pyqtSignal(str) + scan_progress = pyqtSignal(int, int) # done, total (ping sweep) + stage = pyqtSignal(str) # current scan phase + + def __init__(self, network): + super().__init__() + self.network = network + + def run(self): + try: + def on_ping_progress(done, total): + self.scan_progress.emit(done, total) + + def on_stage(s): + self.stage.emit(s) + + results = scan_network(self.network, + progress_cb=on_ping_progress, + stage_cb=on_stage) + # Resolve hostnames in parallel + self.stage.emit("hostname") + with ThreadPoolExecutor(max_workers=50) as executor: + future_to_dev = { + executor.submit(_resolve_hostname, d["ip"]): d + for d in results + } + for future in as_completed(future_to_dev): + dev = future_to_dev[future] + try: + dev["name"] = future.result(timeout=3) + except Exception: + dev["name"] = "" + self.finished.emit(results) + except Exception as e: + self.error.emit(str(e)) + + +class FlashThread(QThread): + """Run firmware flash in background so UI stays responsive.""" + device_status = pyqtSignal(int, str) # index, status message + device_done = pyqtSignal(int, str) # index, result + all_done = pyqtSignal() + + def __init__(self, devices, firmware_path, max_workers=10, + method="api", ssh_user="root", ssh_password="admin123a", + set_passwd=False): + super().__init__() + self.devices = devices + self.firmware_path = firmware_path + self.max_workers = max_workers + self.method = method + self.ssh_user = ssh_user + self.ssh_password = ssh_password + self.set_passwd = set_passwd + + def run(self): + def _flash_one(i, dev): + try: + def on_status(msg): + self.device_status.emit(i, msg) + + if self.method == "ssh": + result = flash_device_ssh( + dev["ip"], self.firmware_path, + user=self.ssh_user, + password=self.ssh_password, + set_passwd=self.set_passwd, + status_cb=on_status + ) + else: + result = flash_device( + dev["ip"], self.firmware_path, + status_cb=on_status + ) + self.device_done.emit(i, result) + except Exception as e: + self.device_done.emit(i, f"FAIL: {e}") + + # Use configured max_workers (0 = unlimited = one per device) + workers = self.max_workers if self.max_workers > 0 else len(self.devices) + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] + for i, dev in enumerate(self.devices): + futures.append(executor.submit(_flash_one, i, dev)) + for f in futures: + f.result() + + self.all_done.emit() diff --git a/debug_full.py b/debug_full.py deleted file mode 100644 index fede718..0000000 --- a/debug_full.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -Debug Flash — chạy đầy đủ 3 bước flash và log chi tiết từng bước. - -Usage: - python debug_full.py - python debug_full.py 192.168.11.17 V3.0.6p5.bin -""" - -import sys -import os -import requests -import re - - -def debug_flash(ip, firmware_path, username="root", password=""): - base_url = f"http://{ip}" - session = requests.Session() - - print(f"{'='*65}") - print(f" Full Flash Debug — {ip}") - print(f" Firmware: {os.path.basename(firmware_path)}") - print(f" Size: {os.path.getsize(firmware_path) / 1024 / 1024:.2f} MB") - print(f"{'='*65}") - - # ═══════════════════════════════════ - # STEP 1: Login - # ═══════════════════════════════════ - print(f"\n{'─'*65}") - print(f" STEP 1: Login") - print(f"{'─'*65}") - - login_url = f"{base_url}/cgi-bin/luci" - - print(f" → GET {login_url}") - try: - r = session.get(login_url, timeout=10) - print(f" Status: {r.status_code}") - except Exception as e: - print(f" ❌ Cannot connect: {e}") - return - - # Detect form fields - if 'name="luci_username"' in r.text: - login_data = {"luci_username": username, "luci_password": password} - print(f" Fields: luci_username / luci_password") - else: - login_data = {"username": username, "password": password} - print(f" Fields: username / password") - - print(f"\n → POST {login_url}") - print(f" Data: {login_data}") - resp = session.post(login_url, data=login_data, - timeout=10, allow_redirects=True) - - print(f" Status: {resp.status_code}") - print(f" Cookies: {dict(session.cookies)}") - - # Extract stok - stok = None - for source in [resp.url, resp.text]: - match = re.search(r";stok=([a-f0-9]+)", source) - if match: - stok = match.group(1) - break - if not stok: - for hist in resp.history: - match = re.search(r";stok=([a-f0-9]+)", - hist.headers.get("Location", "")) - if match: - stok = match.group(1) - break - - print(f" stok: {stok}") - has_cookie = "sysauth" in str(session.cookies) - print(f" sysauth: {'✅ YES' if has_cookie else '❌ NO'}") - - if not stok and not has_cookie: - print(f"\n ❌ LOGIN FAILED — no stok, no sysauth cookie") - return - - print(f"\n ✅ LOGIN OK") - - # Build flash URL - if stok: - flash_url = f"{base_url}/cgi-bin/luci/;stok={stok}/admin/system/flashops" - else: - flash_url = f"{base_url}/cgi-bin/luci/admin/system/flashops" - - # ═══════════════════════════════════ - # STEP 2: Upload firmware - # ═══════════════════════════════════ - print(f"\n{'─'*65}") - print(f" STEP 2: Upload firmware") - print(f"{'─'*65}") - - print(f" → POST {flash_url}") - print(f" File field: image") - print(f" File name: {os.path.basename(firmware_path)}") - print(f" keep: NOT sent (unchecked)") - print(f" Uploading...") - - filename = os.path.basename(firmware_path) - with open(firmware_path, "rb") as f: - resp = session.post( - flash_url, - data={}, # No keep = uncheck Keep Settings - files={"image": (filename, f, "application/octet-stream")}, - timeout=300, - ) - - print(f" Status: {resp.status_code}") - print(f" URL: {resp.url}") - - # Check response content - resp_lower = resp.text.lower() - has_verify = "verify" in resp_lower - has_proceed = "proceed" in resp_lower - has_checksum = "checksum" in resp_lower - has_image_form = 'name="image"' in resp.text and 'type="file"' in resp.text - - print(f"\n Response analysis:") - print(f" Has 'verify': {'✅' if has_verify else '❌'}") - print(f" Has 'proceed': {'✅' if has_proceed else '❌'}") - print(f" Has 'checksum': {'✅' if has_checksum else '❌'}") - print(f" Has flash form: {'⚠️ (upload ignored!)' if has_image_form else '✅ (not flash form)'}") - - # Extract checksum from verification page - checksum = re.search(r'Checksum:\s*([a-f0-9]+)', resp.text) - size_info = re.search(r'Size:\s*([\d.]+\s*MB)', resp.text) - if checksum: - print(f" Checksum: {checksum.group(1)}") - if size_info: - print(f" Size: {size_info.group(1)}") - - # Check for keep settings status - if "will be erased" in resp.text: - print(f" Config: Will be ERASED ✅ (keep=off)") - elif "will be kept" in resp.text: - print(f" Config: Will be KEPT ⚠️ (keep=on)") - - if not has_verify or not has_proceed: - print(f"\n ❌ Did NOT get verification page!") - # Show cleaned response - text = re.sub(r'<[^>]+>', ' ', resp.text) - text = re.sub(r'\s+', ' ', text).strip() - print(f" Response: {text[:500]}") - return - - # Show the Proceed form - print(f"\n Proceed form (from HTML):") - forms = re.findall(r']*>(.*?)', resp.text, re.DOTALL) - for i, form_body in enumerate(forms): - if 'value="Proceed"' in form_body or 'step' in form_body: - inputs = re.findall(r']*/?\s*>', form_body) - for inp in inputs: - print(f" {inp.strip()}") - - print(f"\n ✅ UPLOAD OK — Verification page received") - - # ═══════════════════════════════════ - # STEP 3: Proceed (confirm flash) - # ═══════════════════════════════════ - print(f"\n{'─'*65}") - print(f" STEP 3: Proceed (confirm flash)") - print(f"{'─'*65}") - - confirm_data = { - "step": "2", - "keep": "", - } - - print(f" → POST {flash_url}") - print(f" Data: {confirm_data}") - - try: - resp = session.post(flash_url, data=confirm_data, timeout=300) - print(f" Status: {resp.status_code}") - # Show cleaned response - text = re.sub(r'<[^>]+>', ' ', resp.text) - text = re.sub(r'\s+', ' ', text).strip() - print(f" Response: {text[:300]}") - except requests.ConnectionError: - print(f" ✅ Connection dropped — device is REBOOTING!") - except requests.ReadTimeout: - print(f" ✅ Timeout — device is REBOOTING!") - - print(f"\n{'='*65}") - print(f" 🎉 FLASH COMPLETE") - print(f"{'='*65}") - - -if __name__ == "__main__": - if len(sys.argv) < 3: - print("Usage: python debug_full.py ") - print("Example: python debug_full.py 192.168.11.17 V3.0.6p5.bin") - sys.exit(1) - - ip = sys.argv[1] - fw = sys.argv[2] - - if not os.path.exists(fw): - print(f"❌ File not found: {fw}") - sys.exit(1) - - debug_flash(ip, fw) diff --git a/debug_ssh.py b/debug_ssh.py deleted file mode 100644 index b837442..0000000 --- a/debug_ssh.py +++ /dev/null @@ -1,67 +0,0 @@ -# python debug_ssh.py 192.168.11.xxx "3.0.6p5.bin" admin123a -import sys -import logging -import paramiko -from ssh_flasher import flash_device_ssh - -def main(): - print("="*50) - print("🔧 CÔNG CỤ DEBUG SSH FLASH CHO MIRAV3") - print("="*50) - - if len(sys.argv) < 3: - print("\nSử dụng: source venv/bin/activate && python debug_ssh.py [MAT_KHAU_SSH] [--update]") - print("Ví dụ: python debug_ssh.py 192.168.11.102 ./3.0.6p5.bin admin123a --update") - sys.exit(1) - - args = sys.argv[1:] - is_update_mode = "--update" in args - if is_update_mode: - args.remove("--update") - - test_ip = args[0] - test_fw = args[1] - test_pass = args[2] if len(args) > 2 else "admin123a" - - print(f"\n[+] Thông số đầu vào:") - print(f" - IP Thiết bị : {test_ip}") - print(f" - Firmware : {test_fw}") - print(f" - Pass SSH : {test_pass}") - print(f" - Phân luồng : {'UPDATE FW (set_passwd=False)' if is_update_mode else 'NẠP MỚI FW (set_passwd=True)'}\n") - - print("[*] BẬT CHẾ ĐỘ LOG CHI TIẾT CỦA PARAMIKO/SSH...") - # Bật log xuất trực tiếp ra màn hình console để dễ nhìn lỗi - logging.getLogger("paramiko").setLevel(logging.DEBUG) - console_handler = logging.StreamHandler(sys.stdout) - console_handler.setFormatter(logging.Formatter('%(asctime)s [SSH_LOG] %(message)s')) - logging.getLogger("paramiko").addHandler(console_handler) - - print("\n" + "-"*50) - print("BẮT ĐẦU QUÁ TRÌNH THỰC THI...") - print("-"*50 + "\n") - - def print_status(msg): - print(f"\n---> TRẠNG THÁI APP: {msg}") - - # Gọi hàm flash (có bật tính năng tự đổi passwd về admin123a trước nếu cần) - try: - result = flash_device_ssh( - ip=test_ip, - firmware_path=test_fw, - password=test_pass, - set_passwd=not is_update_mode, - status_cb=print_status - ) - - print("\n" + "="*50) - if result == "DONE": - print("🟢 KẾT QUẢ: THÀNH CÔNG (DONE)") - else: - print(f"🔴 KẾT QUẢ: THẤT BẠI - {result}") - print("="*50) - - except Exception as e: - print(f"\n🔴 ERROR UNHANDLED EXCEPTION: {e}") - -if __name__ == "__main__": - main() diff --git a/main.py b/main.py index 6ca78ee..421c7dc 100644 --- a/main.py +++ b/main.py @@ -22,506 +22,25 @@ from PyQt6.QtGui import QFont, QColor, QIcon, QAction import datetime -from scanner import scan_network -from flasher import flash_device -from ssh_flasher import flash_device_ssh +from core.scanner import scan_network +from core.flasher import flash_device +from core.ssh_flasher import flash_device_ssh +from core.workers import ScanThread, FlashThread +from utils.network import _resolve_hostname, get_default_network +from utils.system import resource_path, get_machine_info -class CollapsibleGroupBox(QGroupBox): - def __init__(self, title="", parent=None): - super().__init__(title, parent) - self.setCheckable(True) - self.setChecked(True) - self.animation = QPropertyAnimation(self, b"maximumHeight") - self.animation.setDuration(200) - # Connect the toggled signal to our animation function - self.toggled.connect(self._toggle_animation) +from ui.components import CollapsibleGroupBox +from ui.styles import STYLE - self._full_height = 0 - def set_content_layout(self, layout): - # We need a wrapper widget to hold the layout - self.content_widget = QWidget() - self.content_widget.setStyleSheet("background-color: transparent;") - self.content_widget.setLayout(layout) - - main_layout = QVBoxLayout() - main_layout.setContentsMargins(0, 0, 0, 0) - main_layout.addWidget(self.content_widget) - self.setLayout(main_layout) - - def _toggle_animation(self, checked): - if not hasattr(self, 'content_widget'): - return - if checked: - # Expand: show content first, then animate - self.content_widget.setVisible(True) - target_height = self.sizeHint().height() - self.animation.stop() - self.animation.setStartValue(self.height()) - self.animation.setEndValue(target_height) - self.animation.finished.connect(self._on_expand_finished) - self.animation.start() - else: - # Collapse - self.animation.stop() - self.animation.setStartValue(self.height()) - self.animation.setEndValue(32) - self.animation.finished.connect(self._on_collapse_finished) - self.animation.start() - def _on_expand_finished(self): - # Remove height constraint so content can grow dynamically - self.setMaximumHeight(16777215) - try: - self.animation.finished.disconnect(self._on_expand_finished) - except TypeError: - pass - def _on_collapse_finished(self): - if not self.isChecked(): - self.content_widget.setVisible(False) - try: - self.animation.finished.disconnect(self._on_collapse_finished) - except TypeError: - pass -def _resolve_hostname(ip): - """Reverse DNS lookup for a single IP.""" - try: - return socket.gethostbyaddr(ip)[0] - except Exception: - return "" - - -def get_local_ip(): - """Get the local IP address of this machine.""" - try: - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - ip = s.getsockname()[0] - s.close() - return ip - except Exception: - return "N/A" - - -def get_default_network(ip): - """Guess the /24 network from local IP.""" - try: - parts = ip.split(".") - return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24" - except Exception: - return "192.168.1.0/24" - - -def resource_path(relative_path): - """ Get absolute path to resource, works for dev and for PyInstaller """ - try: - base_path = sys._MEIPASS - except Exception: - base_path = os.path.abspath(".") - return os.path.join(base_path, relative_path) - - -def get_machine_info(): - """Collect machine info.""" - hostname = socket.gethostname() - local_ip = get_local_ip() - os_info = f"{platform.system()} {platform.release()}" - mac_addr = "N/A" - - try: - import uuid - mac = uuid.getnode() - mac_addr = ":".join( - f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6)) - ) - except Exception: - pass - - return { - "hostname": hostname, - "ip": local_ip, - "os": os_info, - "mac": mac_addr, - } - - -# ── Stylesheet ────────────────────────────────────────────── - -STYLE = """ -QWidget { - background-color: #1a1b2e; - color: #e2e8f0; - font-family: 'Segoe UI', 'SF Pro Display', sans-serif; - font-size: 12px; -} - -QGroupBox { - border: 1px solid #2d3748; - border-radius: 8px; - margin-top: 10px; - padding: 20px 8px 6px 8px; - font-weight: bold; - color: #7eb8f7; - background-color: #1e2035; -} - -QGroupBox::title { - subcontrol-origin: margin; - subcontrol-position: top left; - left: 14px; - top: 5px; - padding: 0px 8px; - background-color: transparent; -} - -QGroupBox::indicator { - width: 14px; - height: 14px; - border-radius: 4px; - border: 1px solid #3d4a6b; - background-color: #13141f; - margin-top: 5px; -} - -QGroupBox::indicator:unchecked { - background-color: #13141f; -} - -QGroupBox::indicator:checked { - background-color: #3b82f6; - border-color: #3b82f6; -} - -QLabel#title { - font-size: 16px; - font-weight: bold; - color: #7eb8f7; - letter-spacing: 1px; -} - -QLabel#info { - color: #94a3b8; - font-size: 11px; -} - -QPushButton { - background-color: #2d3352; - border: 1px solid #3d4a6b; - border-radius: 6px; - padding: 4px 12px; - color: #e2e8f0; - font-weight: 600; - min-height: 24px; -} - -QPushButton:hover { - background-color: #3d4a6b; - border-color: #7eb8f7; - color: #ffffff; -} - -QPushButton:pressed { - background-color: #1a2040; -} - -QPushButton#scan { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #1a56db, stop:1 #1e66f5); - border-color: #1a56db; - color: #ffffff; -} - -QPushButton#scan:hover { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #2563eb, stop:1 #3b82f6); -} - -QPushButton#flash { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #15803d, stop:1 #16a34a); - border-color: #15803d; - color: #ffffff; - font-size: 13px; - min-height: 30px; -} - -QPushButton#flash:hover { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #16a34a, stop:1 #22c55e); -} - -QTableWidget { - background-color: #13141f; - alternate-background-color: #1a1b2e; - border: 1px solid #2d3748; - border-radius: 8px; - gridline-color: #2d3748; - selection-background-color: #2d3a5a; - selection-color: #e2e8f0; -} - -QTableWidget::item { - padding: 2px 6px; - border: none; -} - -QTableWidget::item:selected { - background-color: #2d3a5a; - color: #7eb8f7; -} - -QHeaderView::section { - background-color: #1e2035; - color: #7eb8f7; - border: none; - border-bottom: 2px solid #3b82f6; - border-right: 1px solid #2d3748; - padding: 4px 6px; - font-weight: bold; - font-size: 11px; - letter-spacing: 0.5px; - text-transform: uppercase; -} - -QHeaderView::section:last { - border-right: none; -} - -QProgressBar { - border: 1px solid #2d3748; - border-radius: 6px; - text-align: center; - background-color: #13141f; - color: #e2e8f0; - height: 20px; - font-size: 11px; - font-weight: 600; -} - -QProgressBar::chunk { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #3b82f6, stop:1 #7eb8f7); - border-radius: 7px; -} - -QProgressBar#scan_bar { - border: 1px solid #374151; - border-radius: 5px; - text-align: center; - background-color: #13141f; - color: #fbbf24; - height: 16px; - font-size: 10px; - font-weight: 600; -} - -QProgressBar#scan_bar::chunk { - background: qlineargradient(x1:0, y1:0, x2:1, y2:0, - stop:0 #d97706, stop:1 #fbbf24); - border-radius: 4px; -} - -QLineEdit { - background-color: #13141f; - border: 1px solid #2d3748; - border-radius: 8px; - padding: 7px 12px; - color: #e2e8f0; - selection-background-color: #2d3a5a; -} - -QLineEdit:focus { - border-color: #3b82f6; - background-color: #161727; -} - -QScrollBar:vertical { - background: #13141f; - width: 10px; - border-radius: 5px; - margin: 2px; -} - -QScrollBar::handle:vertical { - background: #3d4a6b; - border-radius: 5px; - min-height: 30px; -} - -QScrollBar::handle:vertical:hover { - background: #7eb8f7; -} - -QScrollBar::handle:vertical:pressed { - background: #3b82f6; -} - -QScrollBar::add-line:vertical, -QScrollBar::sub-line:vertical { - height: 0px; - background: transparent; -} - -QScrollBar::add-page:vertical, -QScrollBar::sub-page:vertical { - background: transparent; -} - -QScrollBar:horizontal { - background: #13141f; - height: 10px; - border-radius: 5px; - margin: 2px; -} - -QScrollBar::handle:horizontal { - background: #3d4a6b; - border-radius: 5px; - min-width: 30px; -} - -QScrollBar::handle:horizontal:hover { - background: #7eb8f7; -} - -QScrollBar::handle:horizontal:pressed { - background: #3b82f6; -} - -QScrollBar::add-line:horizontal, -QScrollBar::sub-line:horizontal { - width: 0px; - background: transparent; -} - -QScrollBar::add-page:horizontal, -QScrollBar::sub-page:horizontal { - background: transparent; -} - -QCheckBox { - spacing: 6px; - color: #94a3b8; - font-size: 12px; -} - -QCheckBox::indicator { - width: 16px; - height: 16px; - border-radius: 4px; - border: 1px solid #3d4a6b; - background-color: #13141f; -} - -QCheckBox::indicator:checked { - background-color: #3b82f6; - border-color: #3b82f6; -} - -QCheckBox::indicator:hover { - border-color: #7eb8f7; -} -""" - - -class ScanThread(QThread): - """Run network scan in a background thread so UI doesn't freeze.""" - finished = pyqtSignal(list) - error = pyqtSignal(str) - scan_progress = pyqtSignal(int, int) # done, total (ping sweep) - stage = pyqtSignal(str) # current scan phase - - def __init__(self, network): - super().__init__() - self.network = network - - def run(self): - try: - def on_ping_progress(done, total): - self.scan_progress.emit(done, total) - - def on_stage(s): - self.stage.emit(s) - - results = scan_network(self.network, - progress_cb=on_ping_progress, - stage_cb=on_stage) - # Resolve hostnames in parallel - self.stage.emit("hostname") - with ThreadPoolExecutor(max_workers=50) as executor: - future_to_dev = { - executor.submit(_resolve_hostname, d["ip"]): d - for d in results - } - for future in as_completed(future_to_dev): - dev = future_to_dev[future] - try: - dev["name"] = future.result(timeout=3) - except Exception: - dev["name"] = "" - self.finished.emit(results) - except Exception as e: - self.error.emit(str(e)) - - -class FlashThread(QThread): - """Run firmware flash in background so UI stays responsive.""" - device_status = pyqtSignal(int, str) # index, status message - device_done = pyqtSignal(int, str) # index, result - all_done = pyqtSignal() - - def __init__(self, devices, firmware_path, max_workers=10, - method="api", ssh_user="root", ssh_password="admin123a", - set_passwd=False): - super().__init__() - self.devices = devices - self.firmware_path = firmware_path - self.max_workers = max_workers - self.method = method - self.ssh_user = ssh_user - self.ssh_password = ssh_password - self.set_passwd = set_passwd - - def run(self): - def _flash_one(i, dev): - try: - def on_status(msg): - self.device_status.emit(i, msg) - - if self.method == "ssh": - result = flash_device_ssh( - dev["ip"], self.firmware_path, - user=self.ssh_user, - password=self.ssh_password, - set_passwd=self.set_passwd, - status_cb=on_status - ) - else: - result = flash_device( - dev["ip"], self.firmware_path, - status_cb=on_status - ) - self.device_done.emit(i, result) - except Exception as e: - self.device_done.emit(i, f"FAIL: {e}") - - # Use configured max_workers (0 = unlimited = one per device) - workers = self.max_workers if self.max_workers > 0 else len(self.devices) - with ThreadPoolExecutor(max_workers=workers) as executor: - futures = [] - for i, dev in enumerate(self.devices): - futures.append(executor.submit(_flash_one, i, dev)) - for f in futures: - f.result() - - self.all_done.emit() class App(QWidget): diff --git a/ui/components.py b/ui/components.py new file mode 100644 index 0000000..b1f840f --- /dev/null +++ b/ui/components.py @@ -0,0 +1,64 @@ +from PyQt6.QtWidgets import QGroupBox, QWidget, QVBoxLayout +from PyQt6.QtCore import QPropertyAnimation + +class CollapsibleGroupBox(QGroupBox): + def __init__(self, title="", parent=None): + super().__init__(title, parent) + self.setCheckable(True) + self.setChecked(True) + + self.animation = QPropertyAnimation(self, b"maximumHeight") + self.animation.setDuration(200) + + # Connect the toggled signal to our animation function + self.toggled.connect(self._toggle_animation) + + self._full_height = 0 + + def set_content_layout(self, layout): + # We need a wrapper widget to hold the layout + self.content_widget = QWidget() + self.content_widget.setStyleSheet("background-color: transparent;") + self.content_widget.setLayout(layout) + + main_layout = QVBoxLayout() + main_layout.setContentsMargins(0, 0, 0, 0) + main_layout.addWidget(self.content_widget) + self.setLayout(main_layout) + + def _toggle_animation(self, checked): + if not hasattr(self, 'content_widget'): + return + + if checked: + # Expand: show content first, then animate + self.content_widget.setVisible(True) + target_height = self.sizeHint().height() + self.animation.stop() + self.animation.setStartValue(self.height()) + self.animation.setEndValue(target_height) + self.animation.finished.connect(self._on_expand_finished) + self.animation.start() + else: + # Collapse + self.animation.stop() + self.animation.setStartValue(self.height()) + self.animation.setEndValue(32) + self.animation.finished.connect(self._on_collapse_finished) + self.animation.start() + + def _on_expand_finished(self): + # Remove height constraint so content can grow dynamically + self.setMaximumHeight(16777215) + try: + self.animation.finished.disconnect(self._on_expand_finished) + except TypeError: + pass + + def _on_collapse_finished(self): + if not self.isChecked(): + self.content_widget.setVisible(False) + try: + self.animation.finished.disconnect(self._on_collapse_finished) + except TypeError: + pass diff --git a/ui/styles.py b/ui/styles.py new file mode 100644 index 0000000..6831fbc --- /dev/null +++ b/ui/styles.py @@ -0,0 +1,275 @@ +STYLE = """ +QWidget { + background-color: #1a1b2e; + color: #e2e8f0; + font-family: 'Segoe UI', 'SF Pro Display', sans-serif; + font-size: 12px; +} + +QGroupBox { + border: 1px solid #2d3748; + border-radius: 8px; + margin-top: 10px; + padding: 20px 8px 6px 8px; + font-weight: bold; + color: #7eb8f7; + background-color: #1e2035; +} + +QGroupBox::title { + subcontrol-origin: margin; + subcontrol-position: top left; + left: 14px; + top: 5px; + padding: 0px 8px; + background-color: transparent; +} + +QGroupBox::indicator { + width: 14px; + height: 14px; + border-radius: 4px; + border: 1px solid #3d4a6b; + background-color: #13141f; + margin-top: 5px; +} + +QGroupBox::indicator:unchecked { + background-color: #13141f; +} + +QGroupBox::indicator:checked { + background-color: #3b82f6; + border-color: #3b82f6; +} + +QLabel#title { + font-size: 16px; + font-weight: bold; + color: #7eb8f7; + letter-spacing: 1px; +} + +QLabel#info { + color: #94a3b8; + font-size: 11px; +} + +QPushButton { + background-color: #2d3352; + border: 1px solid #3d4a6b; + border-radius: 6px; + padding: 4px 12px; + color: #e2e8f0; + font-weight: 600; + min-height: 24px; +} + +QPushButton:hover { + background-color: #3d4a6b; + border-color: #7eb8f7; + color: #ffffff; +} + +QPushButton:pressed { + background-color: #1a2040; +} + +QPushButton#scan { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #1a56db, stop:1 #1e66f5); + border-color: #1a56db; + color: #ffffff; +} + +QPushButton#scan:hover { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #2563eb, stop:1 #3b82f6); +} + +QPushButton#flash { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #15803d, stop:1 #16a34a); + border-color: #15803d; + color: #ffffff; + font-size: 13px; + min-height: 30px; +} + +QPushButton#flash:hover { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #16a34a, stop:1 #22c55e); +} + +QTableWidget { + background-color: #13141f; + alternate-background-color: #1a1b2e; + border: 1px solid #2d3748; + border-radius: 8px; + gridline-color: #2d3748; + selection-background-color: #2d3a5a; + selection-color: #e2e8f0; +} + +QTableWidget::item { + padding: 2px 6px; + border: none; +} + +QTableWidget::item:selected { + background-color: #2d3a5a; + color: #7eb8f7; +} + +QHeaderView::section { + background-color: #1e2035; + color: #7eb8f7; + border: none; + border-bottom: 2px solid #3b82f6; + border-right: 1px solid #2d3748; + padding: 4px 6px; + font-weight: bold; + font-size: 11px; + letter-spacing: 0.5px; + text-transform: uppercase; +} + +QHeaderView::section:last { + border-right: none; +} + +QProgressBar { + border: 1px solid #2d3748; + border-radius: 6px; + text-align: center; + background-color: #13141f; + color: #e2e8f0; + height: 20px; + font-size: 11px; + font-weight: 600; +} + +QProgressBar::chunk { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #3b82f6, stop:1 #7eb8f7); + border-radius: 7px; +} + +QProgressBar#scan_bar { + border: 1px solid #374151; + border-radius: 5px; + text-align: center; + background-color: #13141f; + color: #fbbf24; + height: 16px; + font-size: 10px; + font-weight: 600; +} + +QProgressBar#scan_bar::chunk { + background: qlineargradient(x1:0, y1:0, x2:1, y2:0, + stop:0 #d97706, stop:1 #fbbf24); + border-radius: 4px; +} + +QLineEdit { + background-color: #13141f; + border: 1px solid #2d3748; + border-radius: 8px; + padding: 7px 12px; + color: #e2e8f0; + selection-background-color: #2d3a5a; +} + +QLineEdit:focus { + border-color: #3b82f6; + background-color: #161727; +} + +QScrollBar:vertical { + background: #13141f; + width: 10px; + border-radius: 5px; + margin: 2px; +} + +QScrollBar::handle:vertical { + background: #3d4a6b; + border-radius: 5px; + min-height: 30px; +} + +QScrollBar::handle:vertical:hover { + background: #7eb8f7; +} + +QScrollBar::handle:vertical:pressed { + background: #3b82f6; +} + +QScrollBar::add-line:vertical, +QScrollBar::sub-line:vertical { + height: 0px; + background: transparent; +} + +QScrollBar::add-page:vertical, +QScrollBar::sub-page:vertical { + background: transparent; +} + +QScrollBar:horizontal { + background: #13141f; + height: 10px; + border-radius: 5px; + margin: 2px; +} + +QScrollBar::handle:horizontal { + background: #3d4a6b; + border-radius: 5px; + min-width: 30px; +} + +QScrollBar::handle:horizontal:hover { + background: #7eb8f7; +} + +QScrollBar::handle:horizontal:pressed { + background: #3b82f6; +} + +QScrollBar::add-line:horizontal, +QScrollBar::sub-line:horizontal { + width: 0px; + background: transparent; +} + +QScrollBar::add-page:horizontal, +QScrollBar::sub-page:horizontal { + background: transparent; +} + +QCheckBox { + spacing: 6px; + color: #94a3b8; + font-size: 12px; +} + +QCheckBox::indicator { + width: 16px; + height: 16px; + border-radius: 4px; + border: 1px solid #3d4a6b; + background-color: #13141f; +} + +QCheckBox::indicator:checked { + background-color: #3b82f6; + border-color: #3b82f6; +} + +QCheckBox::indicator:hover { + border-color: #7eb8f7; +} +""" diff --git a/utils/network.py b/utils/network.py new file mode 100644 index 0000000..a25d983 --- /dev/null +++ b/utils/network.py @@ -0,0 +1,27 @@ +import socket + +def _resolve_hostname(ip): + """Reverse DNS lookup for a single IP.""" + try: + return socket.gethostbyaddr(ip)[0] + except Exception: + return "" + +def get_local_ip(): + """Get the local IP address of this machine.""" + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("8.8.8.8", 80)) + ip = s.getsockname()[0] + s.close() + return ip + except Exception: + return "N/A" + +def get_default_network(ip): + """Guess the /24 network from local IP.""" + try: + parts = ip.split(".") + return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24" + except Exception: + return "192.168.1.0/24" diff --git a/utils/system.py b/utils/system.py new file mode 100644 index 0000000..28a2880 --- /dev/null +++ b/utils/system.py @@ -0,0 +1,36 @@ +import os +import sys +import platform +import socket +from utils.network import get_local_ip + +def resource_path(relative_path): + """ Get absolute path to resource, works for dev and for PyInstaller """ + try: + base_path = sys._MEIPASS + except Exception: + base_path = os.path.abspath(".") + return os.path.join(base_path, relative_path) + +def get_machine_info(): + """Collect machine info.""" + hostname = socket.gethostname() + local_ip = get_local_ip() + os_info = f"{platform.system()} {platform.release()}" + mac_addr = "N/A" + + try: + import uuid + mac = uuid.getnode() + mac_addr = ":".join( + f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6)) + ) + except Exception: + pass + + return { + "hostname": hostname, + "ip": local_ip, + "os": os_info, + "mac": mac_addr, + }