Load FW bằng SSH, Thêm tính năng update FW (SSH)

This commit is contained in:
2026-03-08 14:29:35 +07:00
parent 12c65c1948
commit ada3440ebc
8 changed files with 667 additions and 38 deletions

67
debug_ssh.py Normal file
View File

@@ -0,0 +1,67 @@
# python debug_ssh.py 192.168.11.xxx "3.0.6p5.bin" admin123a
import sys
import logging
import paramiko
from ssh_flasher import flash_device_ssh
def main():
print("="*50)
print("🔧 CÔNG CỤ DEBUG SSH FLASH CHO MIRAV3")
print("="*50)
if len(sys.argv) < 3:
print("\nSử dụng: source venv/bin/activate && python debug_ssh.py <IP_THIET_BI> <DUONG_DAN_FIRMWARE> [MAT_KHAU_SSH] [--update]")
print("Ví dụ: python debug_ssh.py 192.168.11.102 ./3.0.6p5.bin admin123a --update")
sys.exit(1)
args = sys.argv[1:]
is_update_mode = "--update" in args
if is_update_mode:
args.remove("--update")
test_ip = args[0]
test_fw = args[1]
test_pass = args[2] if len(args) > 2 else "admin123a"
print(f"\n[+] Thông số đầu vào:")
print(f" - IP Thiết bị : {test_ip}")
print(f" - Firmware : {test_fw}")
print(f" - Pass SSH : {test_pass}")
print(f" - Phân luồng : {'UPDATE FW (set_passwd=False)' if is_update_mode else 'NẠP MỚI FW (set_passwd=True)'}\n")
print("[*] BẬT CHẾ ĐỘ LOG CHI TIẾT CỦA PARAMIKO/SSH...")
# Bật log xuất trực tiếp ra màn hình console để dễ nhìn lỗi
logging.getLogger("paramiko").setLevel(logging.DEBUG)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter('%(asctime)s [SSH_LOG] %(message)s'))
logging.getLogger("paramiko").addHandler(console_handler)
print("\n" + "-"*50)
print("BẮT ĐẦU QUÁ TRÌNH THỰC THI...")
print("-"*50 + "\n")
def print_status(msg):
print(f"\n---> TRẠNG THÁI APP: {msg}")
# Gọi hàm flash (có bật tính năng tự đổi passwd về admin123a trước nếu cần)
try:
result = flash_device_ssh(
ip=test_ip,
firmware_path=test_fw,
password=test_pass,
set_passwd=not is_update_mode,
status_cb=print_status
)
print("\n" + "="*50)
if result == "DONE":
print("🟢 KẾT QUẢ: THÀNH CÔNG (DONE)")
else:
print(f"🔴 KẾT QUẢ: THẤT BẠI - {result}")
print("="*50)
except Exception as e:
print(f"\n🔴 ERROR UNHANDLED EXCEPTION: {e}")
if __name__ == "__main__":
main()

58
docs/load_fw_ssh_docs.md Normal file
View File

@@ -0,0 +1,58 @@
# Tài liệu Kỹ thuật: Nạp Mới Firmware (LoadFW bằng SSH)
Tính năng **Nạp Mới FW** trong module `ssh_flasher.py` được thiết kế để cấp cứu và cài đặt lại Firmware cho các thiết bị OpenWrt trắng (mới xuất xưởng) hoặc vừa trải qua Hardware Factory Reset.
---
## 🚀 Tính năng Báo Cáo Tiến Độ Lên UI
Tính năng này được thiết kế theo chuẩn hướng sự kiện (Callback). Module liên tục báo cáo dữ liệu thời gian thực lên cột Status của bảng thiết bị:
1. `Checking Telnet port for raw device...` (Nếu thiết bị mới bị khóa SSH)
2. `Setting password via SSH...` / `Password set via Telnet...`
3. `Uploading firmware via SCP...` (Đẩy file vào RAM Disk)
4. `Verifying firmware...`
5. `Syncing filesystem...`
6. `Flashing firmware (sysupgrade)...`
7. `Rebooting...`
---
## 🛠 Flow Hoạt Động Mạch Trắng (Workflows)
Tính năng Flash qua luồng này hoạt động theo một quy trình 5 bước cực kỳ nghiêm ngặt:
### Bước 0: Thiết lập Mật khẩu Đa Kênh qua Cổng 23 (Fallback Mechanism)
- **Vấn đề của OpenWrt:** Router OpenWrt vừa xuất xưởng (chưa có pass) sẽ **CẤM đăng nhập SSH** cho tài khoản `root`. Lúc này, chỉ cổng cục bộ Telnet (Port 23) là mở.
- **Cách Tool vượt qua:**
1. Tool tiên phong chọc thẳng bằng **Giao thức Telnet (Port 23)**.
2. Dùng chuỗi lệnh `passwd` để thiết lập mật khẩu thành `admin123a` 2 lần. Đợi 3s để OpenWrt kịp đánh thức Server Dropbear (Mở cổng 22 SSH).
3. Nếu thiết bị đã có Pass, Tool sẽ mượt mà Rơi xuống nhánh SSH (`old_password` -> `admin123a`).
### Bước 1: Khởi tạo Kết nối SSH
- Sử dụng cờ `AutoAddPolicy` của Python `paramiko` ép kết nối âm thầm chấp nhận mọi Host Certificate mà không bật popup cảnh báo.
### Bước 2: Truyền File bảo mật (SCP vào RAM)
- Thực hiện SCP đẩy BIN firmware (`tim_uImage` hoặc `.bin`) vào phân vùng ảo: `/tmp/<tên_file>.bin`. Đây là khu vực RAM Disk (Ghi siêu tốc, không làm mòn chip nhớ).
### Bước 3 & 4: Xác thực và Đồng bộ (Verify & Sync)
- Gửi lệnh `test -f /tmp/...` để xác nhận file.
- Gọi lệnh `sync` ép File System đẩy transaction xuống vùng nhớ cứng.
### Bước 5: Sysupgrade (Cài Đặt Lõm)
- Chạy lệnh ngầm: `sysupgrade -F -v -n /tmp/<tên_file>`.
- Cờ `-n`: Clean Flash sạch trơn, không giữ Configuration.
- Cờ `-F` (Force): Bỏ qua bộ kiểm tra Image Metadata cho những File thiếu hụt Metadata (uImage).
- **Đứt Kết Nối Đột Ngột** khi script đang đợi (sau khoảng 4s trở đi) sẽ mặc định được coi là tín hiệu Thành Công (Server SSH sập nguồn để Reboot).
---
## 🔎 Trình Gỡ Lỗi (Debugging)
Chạy file `debug_ssh.py` độc lập trên Terminal ở cấp độ TCP Verbose để soi quá trình nạp mới:
`./venv/bin/python debug_ssh.py <IP> <DUONG_DAN_FIRMWARE> <PASS>`

39
docs/update_fw_docs.md Normal file
View File

@@ -0,0 +1,39 @@
# Tài liệu Kỹ thuật: Cập Nhật Firmware (Update FW)
Quy trình `Update FW` là phương thức tối ưu hóa để nâng cấp Firmware mới vào các thiết bị OpenWrt Live (đã và đang chạy sẵn Firmware của hệ sinh thái MiraV3).
---
## 🔁 Cơ Chế Chuyên Cấp: Update Firmware (Live Nodes)
Điểm khác biệt lớn nhất của luồng Update FW so với Nạp Mới:
Gỡ bỏ hoàn toàn sự cồng kềnh, công cụ MiraV3 hỗ trợ tách bạch quy trình tải Firmware mới thông qua UI `Chế độ Flash > Update FW`.
Module `ssh_flasher.py` tự động kích hoạt cờ tuỳ chọn ngầm `set_passwd = False`:
- **Bỏ Bầu Trời Khởi Động (Skip Telnet):** Hệ thống sẽ ngay lập tức thiết lập kết nối đích thị vào luồng SSH trên Port 22 bằng Security Credentials mặc định `root:admin123a`. Tool bỏ qua hoàn toàn quy trình truy vấn cổng Telnet (23) chậm chạp cũng như các lệnh Set Password thừa thãi.
- **Ràng Buộc Chặt Chẽ:** Được kiểm soát từ tầng UI để chống Flash nhầm. Chỉ duy nhất thiết bị có IP cấu hình cứng `192.168.11.102` mới được âm thầm qua cửa, các IP lạ sẽ bị hệ thống chặn lại và bật MessageBox cảnh báo hỏi kỹ thuật viên xác nhận.
Với cơ chế Skip này, Tốc độ tổng thể tăng tốc cực đáng kể, phục vụ trực tiếp cho quá trình Mass-Update hàng loạt các Nodes đang hoạt động trên hệ thống.
---
## 🚀 Tính năng Xử lý Khủng hoảng (Sysupgrade)
Các thiết bị đang sống khi cập nhật File thường có rủi ro từ chối tệp tin cập nhật do thiếu Metadata:
### Bước xả File: Sysupgrade (Hủy diệt và Tái sinh)
- Chạy lệnh xả Kernel ngầm: `sysupgrade -F -v -n /tmp/<tên_file>`.
- Cờ `-n`: Clean Flash sạch trơn, không giữ Configuration rác của bản cũ.
- Cờ `-F` (Force): Ép hệ điều hành OpenWrt bỏ qua bước kiểm tra Image Metadata tại Local. Tuỳ chọn này sống còn đối với các tệp tin Firmware trần trụi (như build Raw `tim_uImage`) để tránh việc sysupgrade từ chối file, văng lỗi "Image check failed" và ngắt ngang tiến trình.
- Điểm đắt giá của logic: Python (`paramiko`) sẽ phân tích kết quả trả về liên tục trong những giây đầu tiên vòng lặp. Nếu phát sinh lỗi thực thi sai định dạng file ở tầng OS Router thì sẽ báo văng Error Catching ngay lập tức ra bảng Table.
- Ngược lại, việc **Đứt Kết Nối Đột Ngột** khi script đang đợi (sau khoảng 4s trở đi) sẽ mặc định được coi là Thành Công. Vì khi Sysupgrade dội Kernel thành công, hệ thống mạng của Router sẽ tự sập để Reboot.
---
## 🔎 Trình Gỡ Lỗi Nhanh (Debugging Update flow)
Có thể Trigger giả lập tiến trình UI Update FW ngay trên cửa sổ Console Terminal để kiểm tra Handshake TCP lỗi ở đâu (nếu có).
_Lệnh (Lưu ý cờ `--update` để Tắt Telnet Fallback):_
`./venv/bin/python debug_ssh.py <IP_192.168.11.102> <DUONG_DAN_FIRMWARE> <PASS> --update`

298
main.py
View File

@@ -11,7 +11,8 @@ from PyQt6.QtWidgets import (
QVBoxLayout, QHBoxLayout, QFileDialog, QTableWidget,
QTableWidgetItem, QLabel, QProgressBar,
QMessageBox, QGroupBox, QHeaderView, QLineEdit,
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy,
QComboBox
)
from PyQt6.QtCore import (
Qt, QThread, pyqtSignal, QPropertyAnimation, QAbstractAnimation,
@@ -23,6 +24,7 @@ import datetime
from scanner import scan_network
from flasher import flash_device
from ssh_flasher import flash_device_ssh
class CollapsibleGroupBox(QGroupBox):
@@ -475,11 +477,17 @@ class FlashThread(QThread):
device_done = pyqtSignal(int, str) # index, result
all_done = pyqtSignal()
def __init__(self, devices, firmware_path, max_workers=10):
def __init__(self, devices, firmware_path, max_workers=10,
method="api", ssh_user="root", ssh_password="admin123a",
set_passwd=False):
super().__init__()
self.devices = devices
self.firmware_path = firmware_path
self.max_workers = max_workers
self.method = method
self.ssh_user = ssh_user
self.ssh_password = ssh_password
self.set_passwd = set_passwd
def run(self):
def _flash_one(i, dev):
@@ -487,10 +495,19 @@ class FlashThread(QThread):
def on_status(msg):
self.device_status.emit(i, msg)
result = flash_device(
dev["ip"], self.firmware_path,
status_cb=on_status
)
if self.method == "ssh":
result = flash_device_ssh(
dev["ip"], self.firmware_path,
user=self.ssh_user,
password=self.ssh_password,
set_passwd=self.set_passwd,
status_cb=on_status
)
else:
result = flash_device(
dev["ip"], self.firmware_path,
status_cb=on_status
)
self.device_done.emit(i, result)
except Exception as e:
self.device_done.emit(i, f"FAIL: {e}")
@@ -513,9 +530,13 @@ class App(QWidget):
super().__init__()
self.setWindowTitle("MiraV3 Firmware Loader")
self.setWindowIcon(QIcon(resource_path("icon.ico")))
# Data state
self.local_ip = ""
self.gateway_ip = ""
self.firmware = None
self.all_devices = [] # all scan results (unfiltered)
self.devices = [] # currently displayed (filtered)
self.all_devices = [] # raw list from scanner
self.devices = [] # filtered list for table
self.flashed_macs = set() # MAC addresses flashed successfully in session
self.scan_thread = None
info = get_machine_info()
@@ -532,7 +553,7 @@ class App(QWidget):
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
layout.addWidget(title)
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v1.0.0")
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software Team. v1.1.0")
copyright_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
copyright_label.setStyleSheet(
"color: #9399b2; font-size: 11px; font-weight: 500; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto;"
@@ -593,11 +614,11 @@ class App(QWidget):
self.net_input.setPlaceholderText("e.g. 192.168.4.0/24")
net_row.addWidget(self.net_input, 1)
btn_scan = QPushButton("🔍 Scan LAN")
btn_scan.setObjectName("scan")
btn_scan.clicked.connect(self.scan)
btn_scan.setFixedWidth(110)
net_row.addWidget(btn_scan)
self.btn_scan = QPushButton("🔍 Scan LAN")
self.btn_scan.setObjectName("scan")
self.btn_scan.clicked.connect(self.scan)
self.btn_scan.setFixedWidth(110)
net_row.addWidget(self.btn_scan)
scan_layout.addLayout(net_row)
self.scan_progress_bar = QProgressBar()
@@ -621,14 +642,20 @@ class App(QWidget):
dev_layout.setContentsMargins(4, 4, 4, 4)
self.table = QTableWidget()
self.table.setColumnCount(5)
self.table.setHorizontalHeaderLabels(["", "IP", "Name", "MAC", "Status"])
self.table.setColumnCount(4)
self.table.setHorizontalHeaderLabels(["", "IP", "MAC", "Status"])
self.table.setAlternatingRowColors(True)
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
header.resizeSection(0, 40)
for col in range(1, 5):
header.setSectionResizeMode(col, QHeaderView.ResizeMode.Stretch)
# IP and MAC can be fixed/stretch, Status to stretch to cover full info
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Interactive)
header.resizeSection(1, 120)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Interactive)
header.resizeSection(2, 140)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
self.table.verticalHeader().setVisible(False)
self.table.setSelectionBehavior(
QTableWidget.SelectionBehavior.SelectRows
@@ -641,6 +668,11 @@ class App(QWidget):
filter_row.addWidget(self.device_count_label)
filter_row.addStretch()
btn_history = QPushButton("📋 Flash History")
btn_history.clicked.connect(self._show_history)
btn_history.setStyleSheet("background-color: #313244; color: #cdd6f4;")
filter_row.addWidget(btn_history)
btn_select_all = QPushButton("☑ Select All")
btn_select_all.clicked.connect(self._select_all_devices)
filter_row.addWidget(btn_select_all)
@@ -659,30 +691,137 @@ class App(QWidget):
layout.addWidget(dev_group, stretch=1)
# ── Flash Controls ──
flash_group = QGroupBox("🚀 Flash")
flash_group = QGroupBox("🚀 Flash Controls")
flash_layout = QVBoxLayout()
flash_layout.setSpacing(4)
flash_layout.setContentsMargins(4, 4, 4, 4)
flash_layout.setSpacing(12)
flash_layout.setContentsMargins(10, 15, 10, 12)
self.progress = QProgressBar()
self.progress.setFormat("%v / %m devices (%p%)")
self.progress.setMinimumHeight(22)
flash_layout.addWidget(self.progress)
# Mode selector row (Nạp mới vs Update)
mode_row = QHBoxLayout()
mode_lbl = QLabel("Flash Mode:")
mode_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
mode_row.addWidget(mode_lbl)
self.mode_combo = QComboBox()
self.mode_combo.addItem("⚡ New Flash (Raw / Factory Reset)", "new")
self.mode_combo.addItem("🔄 Update Firmware (Pre-installed)", "update")
self.mode_combo.setMinimumWidth(380)
self.mode_combo.setMinimumHeight(35)
self.mode_combo.setStyleSheet("""
QComboBox {
background-color: #2d3352; border: 1px solid #3d4a6b; border-radius: 4px;
padding: 4px 10px; color: #ffffff; font-size: 14px; font-weight: bold;
}
""")
self.mode_combo.currentIndexChanged.connect(self._on_mode_changed)
mode_row.addWidget(self.mode_combo)
mode_row.addStretch()
flash_layout.addLayout(mode_row)
# Container cho cấu hình tuỳ chọn "Nạp Mới FW"
self.method_container = QWidget()
method_layout = QVBoxLayout(self.method_container)
method_layout.setContentsMargins(0, 0, 0, 0)
method_layout.setSpacing(10)
# Method selector row
method_row = QHBoxLayout()
method_lbl = QLabel("Method:")
method_lbl.setStyleSheet("font-size: 13px; font-weight: bold;")
method_row.addWidget(method_lbl)
self.method_combo = QComboBox()
self.method_combo.addItem("🌐 API (LuCI - Recommended)", "api")
self.method_combo.addItem("💻 SSH (paramiko/scp)", "ssh")
self.method_combo.setMinimumWidth(220)
self.method_combo.setMinimumHeight(32)
self.method_combo.setStyleSheet("""
QComboBox {
background-color: #1e1e2e; border: 1px solid #3d4a6b; border-radius: 4px;
padding: 4px 10px; color: #ffffff; font-size: 13px; font-weight: bold;
}
""")
self.method_combo.currentIndexChanged.connect(self._on_method_changed)
method_row.addWidget(self.method_combo)
method_row.addStretch()
method_layout.addLayout(method_row)
# SSH credentials (hidden by default)
self.ssh_creds_widget = QWidget()
self.ssh_creds_widget.setStyleSheet("background-color: #11121d; border-radius: 6px; border: 1px dashed #3d4a6b;")
ssh_creds_layout = QVBoxLayout(self.ssh_creds_widget)
ssh_creds_layout.setContentsMargins(15, 12, 15, 12)
ssh_creds_layout.setSpacing(10)
ssh_row1 = QHBoxLayout()
ssh_lbl1 = QLabel("SSH User:")
ssh_lbl1.setStyleSheet("font-size: 12px; font-weight: bold;")
ssh_row1.addWidget(ssh_lbl1)
self.ssh_user_input = QLineEdit("root")
self.ssh_user_input.setFixedWidth(130)
str_qlineedit = "QLineEdit { background-color: #1a1b2e; font-size: 13px; padding: 4px; border: 1px solid #3d4a6b; color: #ffffff; }"
self.ssh_user_input.setStyleSheet(str_qlineedit)
ssh_row1.addWidget(self.ssh_user_input)
ssh_row1.addSpacing(25)
ssh_lbl2 = QLabel("SSH Password:")
ssh_lbl2.setStyleSheet("font-size: 12px; font-weight: bold;")
ssh_row1.addWidget(ssh_lbl2)
self.ssh_pass_input = QLineEdit("admin123a")
self.ssh_pass_input.setEchoMode(QLineEdit.EchoMode.Password)
self.ssh_pass_input.setFixedWidth(130)
self.ssh_pass_input.setStyleSheet(str_qlineedit)
ssh_row1.addWidget(self.ssh_pass_input)
ssh_row1.addStretch()
ssh_creds_layout.addLayout(ssh_row1)
self.set_passwd_cb = QCheckBox("Set password before flash (passwd → admin123a)")
self.set_passwd_cb.setChecked(True)
self.set_passwd_cb.setStyleSheet("color: #94a3b8; font-size: 12px; font-weight: bold;")
ssh_creds_layout.addWidget(self.set_passwd_cb)
self.ssh_creds_widget.setVisible(False)
method_layout.addWidget(self.ssh_creds_widget)
flash_layout.addWidget(self.method_container)
# Warning UI for Update Mode
self.update_warning_lbl = QLabel("⚠️ FW Update Mode: Forced to SSH (root/admin123a). Target IP [192.168.11.102]. Other IPs require confirmation.")
self.update_warning_lbl.setStyleSheet("color: #f38ba8; font-size: 13px; font-weight: bold; padding: 8px; border: 1px dotted #f38ba8;")
self.update_warning_lbl.setWordWrap(True)
self.update_warning_lbl.setVisible(False)
flash_layout.addWidget(self.update_warning_lbl)
# Parallel count row
parallel_row = QHBoxLayout()
parallel_row.addWidget(QLabel("Concurrent devices:"))
parallel_lbl = QLabel("Concurrent devices:")
parallel_lbl.setStyleSheet("font-size: 14px; font-weight: bold;")
parallel_row.addWidget(parallel_lbl)
self.parallel_spin = QSpinBox()
self.parallel_spin.setRange(0, 100)
self.parallel_spin.setValue(10)
self.parallel_spin.setSpecialValueText("Unlimited")
self.parallel_spin.setSpecialValueText("0 (Unlimited)")
self.parallel_spin.setToolTip("0 = unlimited (all devices at once)")
self.parallel_spin.setFixedWidth(80)
self.parallel_spin.setMinimumWidth(160)
self.parallel_spin.setMinimumHeight(35)
self.parallel_spin.setStyleSheet("""
QSpinBox { font-size: 15px; font-weight: bold; text-align: center; }
""")
parallel_row.addWidget(self.parallel_spin)
parallel_row.addStretch()
flash_layout.addLayout(parallel_row)
btn_flash = QPushButton(" Flash Selected Devices")
btn_flash = QPushButton("FLASH SELECTED DEVICES")
btn_flash.setObjectName("flash")
btn_flash.setMinimumHeight(45)
btn_flash.setStyleSheet("QPushButton#flash { font-size: 16px; font-weight: bold; letter-spacing: 1px; }")
btn_flash.clicked.connect(self.flash_all)
flash_layout.addWidget(btn_flash)
@@ -724,27 +863,42 @@ class App(QWidget):
self.table.setRowCount(len(self.devices))
for i, d in enumerate(self.devices):
mac_str = d["mac"].upper()
# Checkbox column
cb_item = QTableWidgetItem()
cb_item.setCheckState(Qt.CheckState.Checked)
# If device MAC is already flashed, mark it
is_already_flashed = mac_str in self.flashed_macs
if is_already_flashed and d["status"] in ["READY", ""]:
d["status"] = "Already Flashed"
# Checkbox logic
if is_already_flashed:
cb_item.setCheckState(Qt.CheckState.Unchecked)
else:
cb_item.setCheckState(Qt.CheckState.Checked)
cb_item.setFlags(cb_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
self.table.setItem(i, 0, cb_item)
ip_item = QTableWidgetItem(d["ip"])
name_item = QTableWidgetItem(d.get("name", ""))
mac_item = QTableWidgetItem(d["mac"].upper())
mac_item = QTableWidgetItem(mac_str)
status_item = QTableWidgetItem(d["status"])
if is_already_flashed:
status_item.setForeground(QColor("#a6e3a1"))
ip_item.setForeground(QColor("#a6e3a1"))
# Dim gateway & self if showing all
if d["ip"] in {self.local_ip, self.gateway_ip}:
for item in [ip_item, name_item, mac_item, status_item]:
for item in [ip_item, mac_item, status_item]:
item.setForeground(QColor("#4a5568"))
cb_item.setCheckState(Qt.CheckState.Unchecked)
self.table.setItem(i, 1, ip_item)
self.table.setItem(i, 2, name_item)
self.table.setItem(i, 3, mac_item)
self.table.setItem(i, 4, status_item)
self.table.setItem(i, 2, mac_item)
self.table.setItem(i, 3, status_item)
total = len(self.devices)
hidden = len(self.all_devices) - total
@@ -756,9 +910,12 @@ class App(QWidget):
def _select_all_devices(self):
"""Check all device checkboxes."""
for i in range(self.table.rowCount()):
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Checked)
mac = self.table.item(i, 2).text().strip()
# Prevent checking if already flashed or is a gateway (optional but good UI logic)
if mac not in self.flashed_macs:
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Checked)
def _deselect_all_devices(self):
"""Uncheck all device checkboxes."""
@@ -766,6 +923,15 @@ class App(QWidget):
item = self.table.item(i, 0)
if item:
item.setCheckState(Qt.CheckState.Unchecked)
def _show_history(self):
"""Show list of successfully flashed MACs this session."""
if not self.flashed_macs:
QMessageBox.information(self, "Flash History", "No successful flashes during this session.")
else:
macs = "\n".join(sorted(list(self.flashed_macs)))
msg = f"Successfully flashed devices in this session ({len(self.flashed_macs)}):\n\n{macs}"
QMessageBox.information(self, "Flash History", msg)
# ── Actions ──
@@ -801,6 +967,7 @@ class App(QWidget):
self.scan_progress_bar.setRange(0, 0)
self.scan_progress_bar.setFormat(" Starting...")
self.scan_progress_bar.setVisible(True)
self.btn_scan.setEnabled(False)
self.scan_thread = ScanThread(network)
self.scan_thread.finished.connect(self._on_scan_done)
@@ -811,6 +978,7 @@ class App(QWidget):
def _on_scan_done(self, results):
self.scan_progress_bar.setVisible(False)
self.btn_scan.setEnabled(True)
self.all_devices = []
for dev in results:
@@ -853,12 +1021,29 @@ class App(QWidget):
def _on_scan_error(self, error_msg):
self.scan_progress_bar.setVisible(False)
self.btn_scan.setEnabled(True)
self.scan_status.setText("❌ Scan failed")
self.scan_status.setStyleSheet("color: #f38ba8;")
QMessageBox.critical(
self, "Scan Error", f"Failed to scan network:\n{error_msg}"
)
def _on_mode_changed(self, index):
"""Show/hide method based on selected mode."""
mode = self.mode_combo.currentData()
if mode == "update":
self.method_container.setVisible(False)
self.update_warning_lbl.setVisible(True)
else:
self.method_container.setVisible(True)
self.update_warning_lbl.setVisible(False)
self._on_method_changed(self.method_combo.currentIndex())
def _on_method_changed(self, index):
"""Show/hide SSH credentials based on selected method."""
method = self.method_combo.currentData()
self.ssh_creds_widget.setVisible(method == "ssh")
def _get_selected_devices(self):
"""Return list of (table_row_index, device_dict) for checked devices."""
selected = []
@@ -905,10 +1090,41 @@ class App(QWidget):
self._flash_row_map[idx] = row
flash_devices.append(dev)
# Determine flash method and SSH credentials
mode = self.mode_combo.currentData()
if mode == "update":
# Hỏi xác nhận nếu có device nào khác 192.168.11.102
strange_ips = [dev["ip"] for _, dev in selected if dev["ip"] != "192.168.11.102"]
if strange_ips:
msg = (f"⚠️ Detected IP(s) other than [192.168.11.102] in your selection:\n"
f"{', '.join(strange_ips[:3])}{'...' if len(strange_ips) > 3 else ''}\n\n"
f"Are you SURE you want to update firmware on these devices?")
reply = QMessageBox.question(
self, "Confirm Update Target", msg,
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No,
QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.No:
return
method = "ssh"
ssh_user = "root"
ssh_password = "admin123a"
set_passwd = False
else:
method = self.method_combo.currentData()
ssh_user = self.ssh_user_input.text().strip() or "root"
ssh_password = self.ssh_pass_input.text() or "admin123a"
set_passwd = self.set_passwd_cb.isChecked() if method == "ssh" else False
# Run flashing in background thread so UI doesn't freeze
self.flash_thread = FlashThread(
flash_devices, self.firmware,
max_workers=self.parallel_spin.value()
max_workers=self.parallel_spin.value(),
method=method,
ssh_user=ssh_user,
ssh_password=ssh_password,
set_passwd=set_passwd
)
self.flash_thread.device_status.connect(self._on_flash_status)
self.flash_thread.device_done.connect(self._on_flash_done)
@@ -918,7 +1134,7 @@ class App(QWidget):
def _on_flash_status(self, index, msg):
"""Update status column while flashing."""
row = self._flash_row_map.get(index, index)
self.table.setItem(row, 4, QTableWidgetItem(f"{msg}"))
self.table.setItem(row, 3, QTableWidgetItem(f"{msg}"))
def _on_flash_done(self, index, result):
"""One device finished flashing."""
@@ -926,6 +1142,12 @@ class App(QWidget):
if result.startswith("DONE"):
item = QTableWidgetItem(f"{result}")
item.setForeground(QColor("#a6e3a1"))
# Save MAC to history
mac_item = self.table.item(row, 2)
if mac_item:
self.flashed_macs.add(mac_item.text().strip())
# Auto-uncheck so it won't be flashed again
cb = self.table.item(row, 0)
if cb:
@@ -933,7 +1155,7 @@ class App(QWidget):
else:
item = QTableWidgetItem(f"{result}")
item.setForeground(QColor("#f38ba8"))
self.table.setItem(row, 4, item)
self.table.setItem(row, 3, item)
self.progress.setValue(self.progress.value() + 1)
def _on_flash_all_done(self):

View File

@@ -1,3 +1,5 @@
PyQt6
scapy
requests
paramiko
scp

241
ssh_flasher.py Normal file
View File

@@ -0,0 +1,241 @@
"""
SSH-based Firmware Flasher for OpenWrt Devices
Replicates the flash.ps1 logic using Python paramiko/scp:
1. Auto-accept SSH host key
2. Set device password via `passwd` command
3. Upload firmware via SCP to /tmp/
4. Verify, sync, and flash via sysupgrade
"""
import os
import time
import paramiko
from scp import SCPClient
def _create_ssh_client(ip, user, password, timeout=10):
"""Create an SSH client with auto-accept host key policy."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=user, password=password, timeout=timeout,
look_for_keys=False, allow_agent=False)
return client
import telnetlib
def set_device_password(ip, user="root", old_password="", new_password="admin123a",
status_cb=None):
"""
Set device password via Telnet (if raw/reset) or SSH.
"""
if status_cb:
status_cb("Checking Telnet port for raw device...")
# 1. Thử Telnet trước (OpenWrt mặc định mở Telnet 23 và cấm SSH Root khi chưa có Pass)
try:
tn = telnetlib.Telnet(ip, timeout=5)
# Nếu vô được Telnet tức là thiết bị vừa Reset cứng chưa có pass
if status_cb:
status_cb("Telnet connected! Setting password...")
# Đợi logo OpenWrt và prompt "root@OpenWrt:/# "
time.sleep(1)
tn.read_very_eager()
# Gửi lệnh đổi pass
tn.write(b"passwd\n")
time.sleep(1)
tn.write(new_password.encode('ascii') + b"\n")
time.sleep(1)
tn.write(new_password.encode('ascii') + b"\n")
time.sleep(1)
# Thoát telnet
tn.write(b"exit\n")
time.sleep(0.5)
tn.close()
# Chờ 3 giây để OpenWrt kịp đóng Telnet và nổ tiến trình Dropbear (SSH Server)
if status_cb:
status_cb("Password set via Telnet. Waiting for SSH to start...")
time.sleep(3)
return "DONE"
except ConnectionRefusedError:
# Port 23 đóng -> Tức là thiết bị đã có Pass và đã bật SSH, chuyển qua luồng mồi mật khẩu cũ
pass
except Exception as e:
# Các lỗi timeout khác có thể do ping không tới
return f"FAIL: Telnet check -> {e}"
# 2. Rơi xuống luồng SSH nếu thiết bị cũ (cổng Telnet đóng)
if status_cb:
status_cb("Connecting SSH for password update...")
try:
client = _create_ssh_client(ip, user, old_password, timeout=5)
except Exception as e:
return f"FAIL: Cannot connect SSH (Old pass incorrect?) — {e}"
try:
if status_cb:
status_cb("Setting password via SSH...")
shell = client.invoke_shell()
time.sleep(1)
if shell.recv_ready():
shell.recv(65535)
shell.send("passwd\n")
time.sleep(2)
shell.send(f"{new_password}\n")
time.sleep(1)
shell.send(f"{new_password}\n")
time.sleep(2)
if shell.recv_ready():
shell.recv(65535)
shell.send("exit\n")
time.sleep(0.5)
shell.close()
if status_cb:
status_cb("Password set ✓")
return "DONE"
except Exception as e:
return f"FAIL: {e}"
finally:
client.close()
def flash_device_ssh(ip, firmware_path, user="root", password="admin123a",
set_passwd=False, status_cb=None):
"""
Flash firmware to an OpenWrt device via SSH/SCP.
Steps (mirroring flash.ps1):
1. (Optional) Set device password via passwd
2. Upload firmware via SCP to /tmp/
3. Verify uploaded file
4. Sync filesystem
5. Execute sysupgrade
Returns:
"DONE" on success, "FAIL: reason" on error
"""
# ═══════════════════════════════════════════
# STEP 0: Set password (optional)
# ═══════════════════════════════════════════
if set_passwd:
result = set_device_password(ip, user, "", password, status_cb)
if result.startswith("FAIL"):
# Try with current password as old password
result = set_device_password(ip, user, password, password, status_cb)
if result.startswith("FAIL"):
return result
# ═══════════════════════════════════════════
# STEP 1: Connect SSH
# ═══════════════════════════════════════════
if status_cb:
status_cb("Connecting SSH...")
try:
client = _create_ssh_client(ip, user, password)
except paramiko.AuthenticationException:
return "FAIL: SSH authentication failed"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
try:
# ═══════════════════════════════════════════
# STEP 2: Upload firmware via SCP
# ═══════════════════════════════════════════
if status_cb:
status_cb("Uploading firmware via SCP...")
filename = os.path.basename(firmware_path)
remote_path = f"/tmp/{filename}"
try:
scp_client = SCPClient(client.get_transport(), socket_timeout=300)
scp_client.put(firmware_path, remote_path)
scp_client.close()
except Exception as e:
return f"FAIL: SCP upload failed — {e}"
time.sleep(2)
# ═══════════════════════════════════════════
# STEP 3: Verify firmware uploaded
# ═══════════════════════════════════════════
if status_cb:
status_cb("Verifying firmware...")
stdin, stdout, stderr = client.exec_command(
f"test -f {remote_path} && ls -lh {remote_path}",
timeout=10
)
verify_output = stdout.read().decode("utf-8", errors="ignore").strip()
verify_err = stderr.read().decode("utf-8", errors="ignore").strip()
if not verify_output:
return f"FAIL: Firmware file not found on device after upload"
# ═══════════════════════════════════════════
# STEP 4: Sync filesystem
# ═══════════════════════════════════════════
if status_cb:
status_cb("Syncing filesystem...")
client.exec_command("sync", timeout=10)
time.sleep(2)
# ═══════════════════════════════════════════
# STEP 5: Flash firmware (sysupgrade)
# ═══════════════════════════════════════════
if status_cb:
status_cb("Flashing firmware (sysupgrade)...")
try:
# Capture output by redirecting to a file in /tmp first, or read from stdout
# Use -F to force upgrade and bypass "Image metadata not present" error for uImage files
stdin, stdout, stderr = client.exec_command(f"sysupgrade -F -v -n {remote_path} > /tmp/sysup.log 2>&1")
# Wait up to 4 seconds to see if it immediately fails
# Sysupgrade takes time and normally drops the connection, so if it finishes in < 4s, it's an error.
time.sleep(4)
if stdout.channel.exit_status_ready():
exit_code = stdout.channel.recv_exit_status()
# Fetch the error log
_, log_out, _ = client.exec_command("cat /tmp/sysup.log")
err_msg = log_out.read().decode("utf-8", errors="ignore").strip()
return f"FAIL: sysupgrade terminated early (Code {exit_code}). Details:\n{err_msg}"
except Exception:
# Connection drop during sysupgrade is exactly what we expect if it succeeds
pass
if status_cb:
status_cb("Rebooting...")
time.sleep(3)
return "DONE"
except Exception as e:
return f"FAIL: {e}"
finally:
try:
client.close()
except Exception:
pass