import sys import os import socket import ipaddress import platform from concurrent.futures import ThreadPoolExecutor, as_completed from PyQt6.QtWidgets import ( QApplication, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QFileDialog, QTableWidget, QTableWidgetItem, QLabel, QProgressBar, QMessageBox, QGroupBox, QHeaderView, QLineEdit, QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy, QComboBox ) from PyQt6.QtCore import ( Qt, QThread, pyqtSignal, QPropertyAnimation, QAbstractAnimation, QParallelAnimationGroup, QRect ) from PyQt6.QtGui import QFont, QColor, QIcon, QAction import datetime from core.scanner import scan_network from core.workers import ScanThread from core.flash_new_worker import NewFlashThread from core.flash_update_worker import UpdateFlashThread from utils.network import _resolve_hostname, get_default_network from utils.system import resource_path, get_machine_info, get_version from ui.components import CollapsibleGroupBox from ui.styles import STYLE class App(QWidget): def __init__(self): super().__init__() self.setWindowTitle(f"Mira Firmware Loader v{get_version()}") self.setWindowIcon(QIcon(resource_path("icon.ico"))) # Data state self.local_ip = "" self.gateway_ip = "" self.firmware = None self.all_devices = [] # raw list from scanner self.devices = [] # filtered list for table self.flashed_macs = {} # MAC addresses flashed successfully in session (MAC -> timestamp) self.scan_thread = None info = get_machine_info() self.local_ip = info["ip"] self.gateway_ip = self._guess_gateway(self.local_ip) layout = QVBoxLayout() layout.setSpacing(4) layout.setContentsMargins(8, 6, 8, 6) # ── Title ── title = QLabel("⚡ Mira Firmware Loader") title.setObjectName("title") title.setAlignment(Qt.AlignmentFlag.AlignCenter) layout.addWidget(title) copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v{get_version()}") copyright_label.setAlignment(Qt.AlignmentFlag.AlignCenter) copyright_label.setStyleSheet( "color: #9399b2; font-size: 11px; font-weight: 500; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto;" ) layout.addWidget(copyright_label) # ── Machine Info Group ── info_group = CollapsibleGroupBox("🖥 Machine Info") info_layout = QVBoxLayout() info_layout.setSpacing(2) info_layout.setContentsMargins(0, 0, 0, 0) row1 = QHBoxLayout() row1.addWidget(self._info_label("Hostname:")) row1.addWidget(self._info_value(info["hostname"])) row1.addStretch() row1.addWidget(self._info_label("IP:")) row1.addWidget(self._info_value(info["ip"])) info_layout.addLayout(row1) row2 = QHBoxLayout() row2.addWidget(self._info_label("OS:")) row2.addWidget(self._info_value(info["os"])) row2.addStretch() row2.addWidget(self._info_label("MAC:")) row2.addWidget(self._info_value(info["mac"])) info_layout.addLayout(row2) info_group.set_content_layout(info_layout) layout.addWidget(info_group) # ── Firmware Selection ── fw_group = CollapsibleGroupBox("📦 Firmware") fw_layout = QHBoxLayout() fw_layout.setContentsMargins(0, 0, 0, 0) self.fw_label = QLabel("No firmware selected") self.fw_label.setObjectName("info") self.fw_label.setWordWrap(True) fw_layout.addWidget(self.fw_label, 1) btn_fw = QPushButton("📁 Select File") btn_fw.clicked.connect(self.select_fw) btn_fw.setFixedWidth(110) fw_layout.addWidget(btn_fw) fw_group.set_content_layout(fw_layout) layout.addWidget(fw_group) # ── Network Scan ── scan_group = CollapsibleGroupBox("📡 Network Scan") scan_layout = QVBoxLayout() scan_layout.setContentsMargins(0, 0, 0, 0) net_row = QHBoxLayout() net_row.addWidget(QLabel("Network:")) self.net_input = QLineEdit(get_default_network(self.local_ip)) self.net_input.setPlaceholderText("e.g. 192.168.4.0/24") net_row.addWidget(self.net_input, 1) self.btn_scan = QPushButton("🔍 Scan LAN") self.btn_scan.setObjectName("scan") self.btn_scan.clicked.connect(self.scan) self.btn_scan.setFixedWidth(110) net_row.addWidget(self.btn_scan) scan_layout.addLayout(net_row) self.scan_progress_bar = QProgressBar() self.scan_progress_bar.setObjectName("scan_bar") self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setFormat("") self.scan_progress_bar.setVisible(False) scan_layout.addWidget(self.scan_progress_bar) self.scan_status = QLabel("") self.scan_status.setObjectName("info") scan_layout.addWidget(self.scan_status) scan_group.set_content_layout(scan_layout) layout.addWidget(scan_group) # ── Device Table ── dev_group = QGroupBox("📋 Devices Found") dev_layout = QVBoxLayout() dev_layout.setSpacing(4) dev_layout.setContentsMargins(4, 4, 4, 4) self.table = QTableWidget() self.table.setColumnCount(4) self.table.setHorizontalHeaderLabels(["", "IP", "MAC", "Status"]) self.table.setAlternatingRowColors(True) header = self.table.horizontalHeader() header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed) header.resizeSection(0, 40) # IP and MAC can be fixed/stretch, Status to stretch to cover full info header.setSectionResizeMode(1, QHeaderView.ResizeMode.Interactive) header.resizeSection(1, 120) header.setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive) header.resizeSection(2, 140) header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch) self.table.verticalHeader().setVisible(False) self.table.setSelectionBehavior( QTableWidget.SelectionBehavior.SelectRows ) dev_layout.addWidget(self.table) filter_row = QHBoxLayout() self.device_count_label = QLabel("Total: 0 devices") self.device_count_label.setObjectName("info") filter_row.addWidget(self.device_count_label) filter_row.addStretch() btn_history = QPushButton("📋 Flash History") btn_history.clicked.connect(self._show_history) btn_history.setStyleSheet("background-color: #313244; color: #cdd6f4;") filter_row.addWidget(btn_history) btn_select_all = QPushButton("☑ Select All") btn_select_all.clicked.connect(self._select_all_devices) filter_row.addWidget(btn_select_all) btn_deselect_all = QPushButton("☐ Deselect All") btn_deselect_all.clicked.connect(self._deselect_all_devices) filter_row.addWidget(btn_deselect_all) self.show_all_cb = QCheckBox("Show all (include gateway & self)") self.show_all_cb.setChecked(False) self.show_all_cb.stateChanged.connect(self._refresh_table) filter_row.addWidget(self.show_all_cb) dev_layout.addLayout(filter_row) dev_group.setLayout(dev_layout) layout.addWidget(dev_group, stretch=1) # ── Flash Controls ── flash_group = QGroupBox("🚀 Flash Controls") flash_layout = QVBoxLayout() flash_layout.setSpacing(12) flash_layout.setContentsMargins(10, 15, 10, 12) self.progress = QProgressBar() self.progress.setFormat("%v / %m devices (%p%)") self.progress.setMinimumHeight(22) flash_layout.addWidget(self.progress) # Mode selector row (Nạp mới vs Update) mode_row = QHBoxLayout() mode_lbl = QLabel("Flash Mode:") mode_lbl.setStyleSheet("font-size: 14px; font-weight: bold;") mode_row.addWidget(mode_lbl) self.mode_combo = QComboBox() self.mode_combo.addItem("⚡ New Flash (Raw / Factory Reset)", "new") self.mode_combo.addItem("🔄 Update Firmware (Pre-installed)", "update") self.mode_combo.setMinimumWidth(380) self.mode_combo.setMinimumHeight(35) self.mode_combo.setStyleSheet(""" QComboBox { background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 4px; padding: 4px 10px; color: #ffffff; font-size: 14px; font-weight: bold; } """) self.mode_combo.currentIndexChanged.connect(self._on_mode_changed) mode_row.addWidget(self.mode_combo) mode_row.addStretch() flash_layout.addLayout(mode_row) # Container cho cấu hình tuỳ chọn "Nạp Mới FW" self.method_container = QWidget() method_layout = QVBoxLayout(self.method_container) method_layout.setContentsMargins(0, 0, 0, 0) method_layout.setSpacing(10) # Method selector row method_row = QHBoxLayout() method_lbl = QLabel("Method:") method_lbl.setStyleSheet("font-size: 13px; font-weight: bold;") method_row.addWidget(method_lbl) self.method_combo = QComboBox() self.method_combo.addItem("🌐 API (LuCI - Recommended)", "api") self.method_combo.addItem("💻 SSH (paramiko/scp)", "ssh") self.method_combo.setMinimumWidth(220) self.method_combo.setMinimumHeight(32) self.method_combo.setStyleSheet(""" QComboBox { background-color: #1e1e2e; border: 1px solid #3d4a6b; border-radius: 4px; padding: 4px 10px; color: #ffffff; font-size: 13px; font-weight: bold; } """) self.method_combo.currentIndexChanged.connect(self._on_method_changed) method_row.addWidget(self.method_combo) method_row.addStretch() method_layout.addLayout(method_row) # SSH credentials (hidden by default) self.ssh_creds_widget = QWidget() self.ssh_creds_widget.setStyleSheet("background-color: #11121d; border-radius: 6px; border: 1px dashed #3d4a6b;") ssh_creds_layout = QVBoxLayout(self.ssh_creds_widget) ssh_creds_layout.setContentsMargins(15, 12, 15, 12) ssh_creds_layout.setSpacing(10) ssh_row1 = QHBoxLayout() ssh_lbl1 = QLabel("SSH User:") ssh_lbl1.setStyleSheet("font-size: 12px; font-weight: bold;") ssh_row1.addWidget(ssh_lbl1) self.ssh_user_input = QLineEdit("root") self.ssh_user_input.setFixedWidth(130) str_qlineedit = "QLineEdit { background-color: #1a1b2e; font-size: 13px; padding: 4px; border: 1px solid #3d4a6b; color: #ffffff; }" self.ssh_user_input.setStyleSheet(str_qlineedit) ssh_row1.addWidget(self.ssh_user_input) ssh_row1.addSpacing(25) ssh_lbl2 = QLabel("SSH Password:") ssh_lbl2.setStyleSheet("font-size: 12px; font-weight: bold;") ssh_row1.addWidget(ssh_lbl2) self.ssh_pass_input = QLineEdit("admin123a") self.ssh_pass_input.setEchoMode(QLineEdit.EchoMode.Password) self.ssh_pass_input.setFixedWidth(130) self.ssh_pass_input.setStyleSheet(str_qlineedit) ssh_row1.addWidget(self.ssh_pass_input) ssh_row1.addStretch() ssh_creds_layout.addLayout(ssh_row1) self.set_passwd_cb = QCheckBox("Set password before flash (passwd → admin123a)") self.set_passwd_cb.setChecked(True) self.set_passwd_cb.setStyleSheet("color: #94a3b8; font-size: 12px; font-weight: bold;") ssh_creds_layout.addWidget(self.set_passwd_cb) self.ssh_creds_widget.setVisible(False) method_layout.addWidget(self.ssh_creds_widget) flash_layout.addWidget(self.method_container) # Warning UI for Update Mode self.update_warning_lbl = QLabel("⚠️ FW Update Mode: Forced to SSH. Target IP [192.168.11.102]. Other IPs require confirmation.") self.update_warning_lbl.setStyleSheet("color: #f38ba8; font-size: 13px; font-weight: bold; padding: 8px; border: 1px dotted #f38ba8;") self.update_warning_lbl.setWordWrap(True) self.update_warning_lbl.setVisible(False) flash_layout.addWidget(self.update_warning_lbl) # Parallel count row parallel_row = QHBoxLayout() parallel_lbl = QLabel("Concurrent devices:") parallel_lbl.setStyleSheet("font-size: 14px; font-weight: bold;") parallel_row.addWidget(parallel_lbl) self.parallel_spin = QSpinBox() self.parallel_spin.setRange(0, 100) self.parallel_spin.setValue(10) self.parallel_spin.setSpecialValueText("0 (Unlimited)") self.parallel_spin.setToolTip("0 = unlimited (all devices at once)") self.parallel_spin.setMinimumWidth(160) self.parallel_spin.setMinimumHeight(35) self.parallel_spin.setStyleSheet(""" QSpinBox { font-size: 15px; font-weight: bold; text-align: center; } """) parallel_row.addWidget(self.parallel_spin) parallel_row.addStretch() flash_layout.addLayout(parallel_row) self.btn_flash = QPushButton("⚡ FLASH SELECTED DEVICES") self.btn_flash.setObjectName("flash") self.btn_flash.setMinimumHeight(45) self.btn_flash.setStyleSheet("QPushButton#flash { font-size: 16px; font-weight: bold; letter-spacing: 1px; }") self.btn_flash.clicked.connect(self.flash_all) flash_layout.addWidget(self.btn_flash) flash_group.setLayout(flash_layout) layout.addWidget(flash_group) self.setLayout(layout) # ── Helpers ── def _guess_gateway(self, ip): """Guess gateway IP (x.x.x.1) from local IP.""" try: parts = ip.split(".") return f"{parts[0]}.{parts[1]}.{parts[2]}.1" except Exception: return "" def _info_label(self, text): lbl = QLabel(text) lbl.setObjectName("info") return lbl def _info_value(self, text): lbl = QLabel(text) lbl.setStyleSheet("color: #cdd6f4; font-weight: bold;") return lbl def _get_filtered_devices(self): """Return devices filtered based on show_all checkbox.""" if self.show_all_cb.isChecked(): return list(self.all_devices) excluded = {self.local_ip, self.gateway_ip} return [d for d in self.all_devices if d["ip"] not in excluded] def _refresh_table(self): """Re-populate table based on current filter state.""" self.devices = self._get_filtered_devices() self.table.setRowCount(len(self.devices)) for i, d in enumerate(self.devices): mac_str = d["mac"].upper() # Checkbox column cb_item = QTableWidgetItem() # If device MAC is already flashed, mark it is_already_flashed = mac_str in self.flashed_macs if is_already_flashed and d["status"] in ["READY", ""]: d["status"] = "Already Flashed" # Checkbox logic if is_already_flashed: cb_item.setCheckState(Qt.CheckState.Unchecked) else: cb_item.setCheckState(Qt.CheckState.Checked) cb_item.setFlags(cb_item.flags() | Qt.ItemFlag.ItemIsUserCheckable) self.table.setItem(i, 0, cb_item) ip_item = QTableWidgetItem(d["ip"]) mac_item = QTableWidgetItem(mac_str) status_item = QTableWidgetItem(d["status"]) if is_already_flashed: status_item.setForeground(QColor("#a6e3a1")) ip_item.setForeground(QColor("#a6e3a1")) # Dim gateway & self if showing all if d["ip"] in {self.local_ip, self.gateway_ip}: for item in [ip_item, mac_item, status_item]: item.setForeground(QColor("#4a5568")) cb_item.setCheckState(Qt.CheckState.Unchecked) self.table.setItem(i, 1, ip_item) self.table.setItem(i, 2, mac_item) self.table.setItem(i, 3, status_item) total = len(self.devices) hidden = len(self.all_devices) - total label = f"Total: {total} devices" if hidden > 0: label += f" (hiding {hidden})" self.device_count_label.setText(label) def _select_all_devices(self): """Check all device checkboxes.""" for i in range(self.table.rowCount()): mac = self.table.item(i, 2).text().strip() # Prevent checking if already flashed or is a gateway (optional but good UI logic) if mac not in self.flashed_macs: item = self.table.item(i, 0) if item: item.setCheckState(Qt.CheckState.Checked) def _deselect_all_devices(self): """Uncheck all device checkboxes.""" for i in range(self.table.rowCount()): item = self.table.item(i, 0) if item: item.setCheckState(Qt.CheckState.Unchecked) def _show_history(self): """Show list of successfully flashed MACs this session.""" if not self.flashed_macs: QMessageBox.information(self, "Flash History", "No successful flashes during this session.") else: # Sort by MAC and format with timestamp history_lines = [] for mac in sorted(self.flashed_macs.keys()): time_str = self.flashed_macs[mac] history_lines.append(f"[{time_str}] {mac}") macs_str = "\n".join(history_lines) msg = f"Successfully flashed devices in this session ({len(self.flashed_macs)}):\n\n{macs_str}" QMessageBox.information(self, "Flash History", msg) # ── Actions ── def select_fw(self): file, _ = QFileDialog.getOpenFileName( self, "Select Firmware", "", "Firmware Files (*.bin *.hex *.uf2);;All Files (*)" ) if file: self.firmware = file name = file.split("/")[-1] self.fw_label.setText(f"✅ {name}") self.fw_label.setStyleSheet("color: #a6e3a1;") def scan(self): network_str = self.net_input.text().strip() try: network = ipaddress.ip_network(network_str, strict=False) except ValueError: QMessageBox.warning( self, "Invalid Network", f"'{network_str}' is not a valid network.\n" "Example: 192.168.4.0/24" ) return self.scan_status.setText("⏳ Preparing scan...") self.scan_status.setStyleSheet("color: #f9e2af;") self.table.setRowCount(0) self.devices = [] self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setFormat(" Starting...") self.scan_progress_bar.setVisible(True) self.btn_scan.setEnabled(False) self.scan_thread = ScanThread(network) self.scan_thread.finished.connect(self._on_scan_done) self.scan_thread.error.connect(self._on_scan_error) self.scan_thread.scan_progress.connect(self._on_scan_progress) self.scan_thread.stage.connect(self._on_scan_stage) self.scan_thread.start() def _on_scan_done(self, results): self.scan_progress_bar.setVisible(False) self.btn_scan.setEnabled(True) self.all_devices = [] for dev in results: dev["status"] = "READY" self.all_devices.append(dev) self.all_devices.sort(key=lambda d: ipaddress.ip_address(d["ip"])) # Apply filter and populate table self._refresh_table() total_all = len(self.all_devices) total_shown = len(self.devices) self.scan_status.setText(f"✅ Scan complete — {total_all} found, {total_shown} shown") self.scan_status.setStyleSheet("color: #a6e3a1;") def _on_scan_stage(self, stage): if stage == "ping": self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m") self.scan_status.setText("⏳ Pinging all hosts...") self.scan_status.setStyleSheet("color: #f9e2af;") elif stage == "arp": self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setFormat(" Reading ARP cache...") self.scan_status.setText("⏳ Reading ARP cache...") elif stage == "scapy": self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setFormat(" ARP broadcast scan...") self.scan_status.setText("⏳ Running ARP broadcast scan...") elif stage == "hostname": self.scan_progress_bar.setRange(0, 0) self.scan_progress_bar.setFormat(" Resolving hostnames...") self.scan_status.setText("⏳ Resolving hostnames...") def _on_scan_progress(self, done, total): self.scan_progress_bar.setRange(0, total) self.scan_progress_bar.setValue(done) self.scan_status.setText(f"⏳ Pinging hosts... {done} / {total}") self.scan_status.setStyleSheet("color: #f9e2af;") def _on_scan_error(self, error_msg): self.scan_progress_bar.setVisible(False) self.btn_scan.setEnabled(True) self.scan_status.setText("❌ Scan failed") self.scan_status.setStyleSheet("color: #f38ba8;") QMessageBox.critical( self, "Scan Error", f"Failed to scan network:\n{error_msg}" ) def _on_mode_changed(self, index): """Show/hide method based on selected mode.""" mode = self.mode_combo.currentData() if mode == "update": self.method_container.setVisible(False) self.update_warning_lbl.setVisible(True) else: self.method_container.setVisible(True) self.update_warning_lbl.setVisible(False) self._on_method_changed(self.method_combo.currentIndex()) def _on_method_changed(self, index): """SSH credentials are always hidden; credentials are hardcoded.""" self.ssh_creds_widget.setVisible(False) def _get_selected_devices(self): """Return list of (table_row_index, device_dict) for checked devices.""" selected = [] for i in range(self.table.rowCount()): item = self.table.item(i, 0) if item and item.checkState() == Qt.CheckState.Checked: if i < len(self.devices): selected.append((i, self.devices[i])) return selected def flash_all(self): if not self.firmware: QMessageBox.warning( self, "No Firmware", "Please select a firmware file first." ) return if not self.devices: QMessageBox.information( self, "No Devices", "No devices found to flash.\n" "Please scan the network first." ) return selected = self._get_selected_devices() if not selected: QMessageBox.information( self, "No Selection", "No devices selected.\n" "Check the boxes next to the devices you want to flash." ) return total = len(selected) self.progress.setMaximum(total) self.progress.setValue(0) # Build list with row indices for UI updates self._flash_row_map = {} flash_devices = [] for idx, (row, dev) in enumerate(selected): self._flash_row_map[idx] = row flash_devices.append(dev) # Determine flash method and SSH credentials mode = self.mode_combo.currentData() if mode == "update": # Hỏi xác nhận nếu có device nào khác 192.168.11.102 strange_ips = [dev["ip"] for _, dev in selected if dev["ip"] != "192.168.11.102"] if strange_ips: msg = (f"⚠️ Detected IP(s) other than [192.168.11.102] in your selection:\n" f"{', '.join(strange_ips[:3])}{'...' if len(strange_ips) > 3 else ''}\n\n" f"Are you SURE you want to update firmware on these devices?") reply = QMessageBox.question( self, "Confirm Update Target", msg, QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No, QMessageBox.StandardButton.No ) if reply == QMessageBox.StandardButton.No: return method = "ssh" ssh_user = "root" ssh_password = "admin123a" ssh_backup_password = "admin" set_passwd = False else: method = self.method_combo.currentData() ssh_user = "root" ssh_password = "admin123a" ssh_backup_password = "admin123a" set_passwd = True if method == "ssh" else False # Chọn đúng worker theo mode và chạy trong background thread max_w = self.parallel_spin.value() if mode == "update": self.flash_thread = UpdateFlashThread( flash_devices, self.firmware, max_workers=max_w, ) else: self.flash_thread = NewFlashThread( flash_devices, self.firmware, max_workers=max_w, method=method, ssh_user=ssh_user, ssh_password=ssh_password, ssh_backup_password=ssh_backup_password, set_passwd=set_passwd, ) self.flash_thread.device_status.connect(self._on_flash_status) self.flash_thread.device_done.connect(self._on_flash_done) self.flash_thread.all_done.connect(self._on_flash_all_done) # Disable flash button trong khi đang flash self.btn_flash.setEnabled(False) self.flash_thread.start() def _on_flash_status(self, index, msg): """Update status column while flashing.""" row = self._flash_row_map.get(index, index) self.table.setItem(row, 3, QTableWidgetItem(f"⏳ {msg}")) def _on_flash_done(self, index, result): """One device finished flashing.""" row = self._flash_row_map.get(index, index) if result.startswith("DONE"): item = QTableWidgetItem(f"✅ {result}") item.setForeground(QColor("#a6e3a1")) # Save MAC to history with current timestamp mac_item = self.table.item(row, 2) if mac_item: import datetime now_str = datetime.datetime.now().strftime("%H:%M:%S") self.flashed_macs[mac_item.text().strip()] = now_str # Auto-uncheck so it won't be flashed again cb = self.table.item(row, 0) if cb: cb.setCheckState(Qt.CheckState.Unchecked) else: item = QTableWidgetItem(f"❌ {result}") item.setForeground(QColor("#f38ba8")) self.table.setItem(row, 3, item) self.progress.setValue(self.progress.value() + 1) def _on_flash_all_done(self): """All flashing complete.""" self.btn_flash.setEnabled(True) QMessageBox.information(self, "Flash Complete", "All devices have been processed.") if __name__ == "__main__": app = QApplication(sys.argv) app.setStyleSheet(STYLE) window = App() # 60% height, limited width (750px), centered screen = app.primaryScreen().availableGeometry() w = min(750, screen.width()) h = int(screen.height() * 0.6) x = (screen.width() - w) // 2 y = screen.y() + (screen.height() - h) // 2 window.setGeometry(x, y, w, h) window.show() sys.exit(app.exec())