948 lines
28 KiB
Python
948 lines
28 KiB
Python
import sys
|
|
import socket
|
|
import ipaddress
|
|
import platform
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from PyQt6.QtWidgets import (
|
|
QApplication, QWidget, QPushButton,
|
|
QVBoxLayout, QHBoxLayout, QFileDialog, QTableWidget,
|
|
QTableWidgetItem, QLabel, QProgressBar,
|
|
QMessageBox, QGroupBox, QHeaderView, QLineEdit,
|
|
QFrame, QCheckBox, QSpinBox, QScrollArea, QSizePolicy
|
|
)
|
|
from PyQt6.QtCore import (
|
|
Qt, QThread, pyqtSignal, QPropertyAnimation, QAbstractAnimation,
|
|
QParallelAnimationGroup, QRect
|
|
)
|
|
from PyQt6.QtGui import QFont, QColor, QIcon, QAction
|
|
|
|
import datetime
|
|
|
|
from scanner import scan_network
|
|
from flasher import flash_device
|
|
|
|
|
|
class CollapsibleGroupBox(QGroupBox):
|
|
def __init__(self, title="", parent=None):
|
|
super().__init__(title, parent)
|
|
self.setCheckable(True)
|
|
self.setChecked(True)
|
|
|
|
self.animation = QPropertyAnimation(self, b"maximumHeight")
|
|
self.animation.setDuration(200)
|
|
|
|
# Connect the toggled signal to our animation function
|
|
self.toggled.connect(self._toggle_animation)
|
|
|
|
self._full_height = 0
|
|
|
|
def set_content_layout(self, layout):
|
|
# We need a wrapper widget to hold the layout
|
|
self.content_widget = QWidget()
|
|
self.content_widget.setStyleSheet("background-color: transparent;")
|
|
self.content_widget.setLayout(layout)
|
|
|
|
main_layout = QVBoxLayout()
|
|
main_layout.setContentsMargins(0, 0, 0, 0)
|
|
main_layout.addWidget(self.content_widget)
|
|
self.setLayout(main_layout)
|
|
|
|
def _toggle_animation(self, checked):
|
|
if not hasattr(self, 'content_widget'):
|
|
return
|
|
|
|
if checked:
|
|
# Expand: show content first, then animate
|
|
self.content_widget.setVisible(True)
|
|
target_height = self.sizeHint().height()
|
|
self.animation.stop()
|
|
self.animation.setStartValue(self.height())
|
|
self.animation.setEndValue(target_height)
|
|
self.animation.finished.connect(self._on_expand_finished)
|
|
self.animation.start()
|
|
else:
|
|
# Collapse
|
|
self.animation.stop()
|
|
self.animation.setStartValue(self.height())
|
|
self.animation.setEndValue(32)
|
|
self.animation.finished.connect(self._on_collapse_finished)
|
|
self.animation.start()
|
|
|
|
def _on_expand_finished(self):
|
|
# Remove height constraint so content can grow dynamically
|
|
self.setMaximumHeight(16777215)
|
|
try:
|
|
self.animation.finished.disconnect(self._on_expand_finished)
|
|
except TypeError:
|
|
pass
|
|
|
|
def _on_collapse_finished(self):
|
|
if not self.isChecked():
|
|
self.content_widget.setVisible(False)
|
|
try:
|
|
self.animation.finished.disconnect(self._on_collapse_finished)
|
|
except TypeError:
|
|
pass
|
|
|
|
|
|
def _resolve_hostname(ip):
|
|
"""Reverse DNS lookup for a single IP."""
|
|
try:
|
|
return socket.gethostbyaddr(ip)[0]
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def get_local_ip():
|
|
"""Get the local IP address of this machine."""
|
|
try:
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
s.connect(("8.8.8.8", 80))
|
|
ip = s.getsockname()[0]
|
|
s.close()
|
|
return ip
|
|
except Exception:
|
|
return "N/A"
|
|
|
|
|
|
def get_default_network(ip):
|
|
"""Guess the /24 network from local IP."""
|
|
try:
|
|
parts = ip.split(".")
|
|
return f"{parts[0]}.{parts[1]}.{parts[2]}.0/24"
|
|
except Exception:
|
|
return "192.168.1.0/24"
|
|
|
|
|
|
def get_machine_info():
|
|
"""Collect machine info."""
|
|
hostname = socket.gethostname()
|
|
local_ip = get_local_ip()
|
|
os_info = f"{platform.system()} {platform.release()}"
|
|
mac_addr = "N/A"
|
|
|
|
try:
|
|
import uuid
|
|
mac = uuid.getnode()
|
|
mac_addr = ":".join(
|
|
f"{(mac >> (8 * i)) & 0xFF:02X}" for i in reversed(range(6))
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"hostname": hostname,
|
|
"ip": local_ip,
|
|
"os": os_info,
|
|
"mac": mac_addr,
|
|
}
|
|
|
|
|
|
# ── Stylesheet ──────────────────────────────────────────────
|
|
|
|
STYLE = """
|
|
QWidget {
|
|
background-color: #1a1b2e;
|
|
color: #e2e8f0;
|
|
font-family: 'Segoe UI', 'SF Pro Display', sans-serif;
|
|
font-size: 12px;
|
|
}
|
|
|
|
QGroupBox {
|
|
border: 1px solid #2d3748;
|
|
border-radius: 8px;
|
|
margin-top: 10px;
|
|
padding: 20px 8px 6px 8px;
|
|
font-weight: bold;
|
|
color: #7eb8f7;
|
|
background-color: #1e2035;
|
|
}
|
|
|
|
QGroupBox::title {
|
|
subcontrol-origin: margin;
|
|
subcontrol-position: top left;
|
|
left: 14px;
|
|
top: 5px;
|
|
padding: 0px 8px;
|
|
background-color: transparent;
|
|
}
|
|
|
|
QGroupBox::indicator {
|
|
width: 14px;
|
|
height: 14px;
|
|
border-radius: 4px;
|
|
border: 1px solid #3d4a6b;
|
|
background-color: #13141f;
|
|
margin-top: 5px;
|
|
}
|
|
|
|
QGroupBox::indicator:unchecked {
|
|
background-color: #13141f;
|
|
}
|
|
|
|
QGroupBox::indicator:checked {
|
|
background-color: #3b82f6;
|
|
border-color: #3b82f6;
|
|
}
|
|
|
|
QLabel#title {
|
|
font-size: 16px;
|
|
font-weight: bold;
|
|
color: #7eb8f7;
|
|
letter-spacing: 1px;
|
|
}
|
|
|
|
QLabel#info {
|
|
color: #94a3b8;
|
|
font-size: 11px;
|
|
}
|
|
|
|
QPushButton {
|
|
background-color: #2d3352;
|
|
border: 1px solid #3d4a6b;
|
|
border-radius: 6px;
|
|
padding: 4px 12px;
|
|
color: #e2e8f0;
|
|
font-weight: 600;
|
|
min-height: 24px;
|
|
}
|
|
|
|
QPushButton:hover {
|
|
background-color: #3d4a6b;
|
|
border-color: #7eb8f7;
|
|
color: #ffffff;
|
|
}
|
|
|
|
QPushButton:pressed {
|
|
background-color: #1a2040;
|
|
}
|
|
|
|
QPushButton#scan {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #1a56db, stop:1 #1e66f5);
|
|
border-color: #1a56db;
|
|
color: #ffffff;
|
|
}
|
|
|
|
QPushButton#scan:hover {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #2563eb, stop:1 #3b82f6);
|
|
}
|
|
|
|
QPushButton#flash {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #15803d, stop:1 #16a34a);
|
|
border-color: #15803d;
|
|
color: #ffffff;
|
|
font-size: 13px;
|
|
min-height: 30px;
|
|
}
|
|
|
|
QPushButton#flash:hover {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #16a34a, stop:1 #22c55e);
|
|
}
|
|
|
|
QTableWidget {
|
|
background-color: #13141f;
|
|
alternate-background-color: #1a1b2e;
|
|
border: 1px solid #2d3748;
|
|
border-radius: 8px;
|
|
gridline-color: #2d3748;
|
|
selection-background-color: #2d3a5a;
|
|
selection-color: #e2e8f0;
|
|
}
|
|
|
|
QTableWidget::item {
|
|
padding: 2px 6px;
|
|
border: none;
|
|
}
|
|
|
|
QTableWidget::item:selected {
|
|
background-color: #2d3a5a;
|
|
color: #7eb8f7;
|
|
}
|
|
|
|
QHeaderView::section {
|
|
background-color: #1e2035;
|
|
color: #7eb8f7;
|
|
border: none;
|
|
border-bottom: 2px solid #3b82f6;
|
|
border-right: 1px solid #2d3748;
|
|
padding: 4px 6px;
|
|
font-weight: bold;
|
|
font-size: 11px;
|
|
letter-spacing: 0.5px;
|
|
text-transform: uppercase;
|
|
}
|
|
|
|
QHeaderView::section:last {
|
|
border-right: none;
|
|
}
|
|
|
|
QProgressBar {
|
|
border: 1px solid #2d3748;
|
|
border-radius: 6px;
|
|
text-align: center;
|
|
background-color: #13141f;
|
|
color: #e2e8f0;
|
|
height: 20px;
|
|
font-size: 11px;
|
|
font-weight: 600;
|
|
}
|
|
|
|
QProgressBar::chunk {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #3b82f6, stop:1 #7eb8f7);
|
|
border-radius: 7px;
|
|
}
|
|
|
|
QProgressBar#scan_bar {
|
|
border: 1px solid #374151;
|
|
border-radius: 5px;
|
|
text-align: center;
|
|
background-color: #13141f;
|
|
color: #fbbf24;
|
|
height: 16px;
|
|
font-size: 10px;
|
|
font-weight: 600;
|
|
}
|
|
|
|
QProgressBar#scan_bar::chunk {
|
|
background: qlineargradient(x1:0, y1:0, x2:1, y2:0,
|
|
stop:0 #d97706, stop:1 #fbbf24);
|
|
border-radius: 4px;
|
|
}
|
|
|
|
QLineEdit {
|
|
background-color: #13141f;
|
|
border: 1px solid #2d3748;
|
|
border-radius: 8px;
|
|
padding: 7px 12px;
|
|
color: #e2e8f0;
|
|
selection-background-color: #2d3a5a;
|
|
}
|
|
|
|
QLineEdit:focus {
|
|
border-color: #3b82f6;
|
|
background-color: #161727;
|
|
}
|
|
|
|
QScrollBar:vertical {
|
|
background: #13141f;
|
|
width: 10px;
|
|
border-radius: 5px;
|
|
margin: 2px;
|
|
}
|
|
|
|
QScrollBar::handle:vertical {
|
|
background: #3d4a6b;
|
|
border-radius: 5px;
|
|
min-height: 30px;
|
|
}
|
|
|
|
QScrollBar::handle:vertical:hover {
|
|
background: #7eb8f7;
|
|
}
|
|
|
|
QScrollBar::handle:vertical:pressed {
|
|
background: #3b82f6;
|
|
}
|
|
|
|
QScrollBar::add-line:vertical,
|
|
QScrollBar::sub-line:vertical {
|
|
height: 0px;
|
|
background: transparent;
|
|
}
|
|
|
|
QScrollBar::add-page:vertical,
|
|
QScrollBar::sub-page:vertical {
|
|
background: transparent;
|
|
}
|
|
|
|
QScrollBar:horizontal {
|
|
background: #13141f;
|
|
height: 10px;
|
|
border-radius: 5px;
|
|
margin: 2px;
|
|
}
|
|
|
|
QScrollBar::handle:horizontal {
|
|
background: #3d4a6b;
|
|
border-radius: 5px;
|
|
min-width: 30px;
|
|
}
|
|
|
|
QScrollBar::handle:horizontal:hover {
|
|
background: #7eb8f7;
|
|
}
|
|
|
|
QScrollBar::handle:horizontal:pressed {
|
|
background: #3b82f6;
|
|
}
|
|
|
|
QScrollBar::add-line:horizontal,
|
|
QScrollBar::sub-line:horizontal {
|
|
width: 0px;
|
|
background: transparent;
|
|
}
|
|
|
|
QScrollBar::add-page:horizontal,
|
|
QScrollBar::sub-page:horizontal {
|
|
background: transparent;
|
|
}
|
|
|
|
QCheckBox {
|
|
spacing: 6px;
|
|
color: #94a3b8;
|
|
font-size: 12px;
|
|
}
|
|
|
|
QCheckBox::indicator {
|
|
width: 16px;
|
|
height: 16px;
|
|
border-radius: 4px;
|
|
border: 1px solid #3d4a6b;
|
|
background-color: #13141f;
|
|
}
|
|
|
|
QCheckBox::indicator:checked {
|
|
background-color: #3b82f6;
|
|
border-color: #3b82f6;
|
|
}
|
|
|
|
QCheckBox::indicator:hover {
|
|
border-color: #7eb8f7;
|
|
}
|
|
"""
|
|
|
|
|
|
class ScanThread(QThread):
|
|
"""Run network scan in a background thread so UI doesn't freeze."""
|
|
finished = pyqtSignal(list)
|
|
error = pyqtSignal(str)
|
|
scan_progress = pyqtSignal(int, int) # done, total (ping sweep)
|
|
stage = pyqtSignal(str) # current scan phase
|
|
|
|
def __init__(self, network):
|
|
super().__init__()
|
|
self.network = network
|
|
|
|
def run(self):
|
|
try:
|
|
def on_ping_progress(done, total):
|
|
self.scan_progress.emit(done, total)
|
|
|
|
def on_stage(s):
|
|
self.stage.emit(s)
|
|
|
|
results = scan_network(self.network,
|
|
progress_cb=on_ping_progress,
|
|
stage_cb=on_stage)
|
|
# Resolve hostnames in parallel
|
|
self.stage.emit("hostname")
|
|
with ThreadPoolExecutor(max_workers=50) as executor:
|
|
future_to_dev = {
|
|
executor.submit(_resolve_hostname, d["ip"]): d
|
|
for d in results
|
|
}
|
|
for future in as_completed(future_to_dev):
|
|
dev = future_to_dev[future]
|
|
try:
|
|
dev["name"] = future.result(timeout=3)
|
|
except Exception:
|
|
dev["name"] = ""
|
|
self.finished.emit(results)
|
|
except Exception as e:
|
|
self.error.emit(str(e))
|
|
|
|
|
|
class FlashThread(QThread):
|
|
"""Run firmware flash in background so UI stays responsive."""
|
|
device_status = pyqtSignal(int, str) # index, status message
|
|
device_done = pyqtSignal(int, str) # index, result
|
|
all_done = pyqtSignal()
|
|
|
|
def __init__(self, devices, firmware_path, max_workers=10):
|
|
super().__init__()
|
|
self.devices = devices
|
|
self.firmware_path = firmware_path
|
|
self.max_workers = max_workers
|
|
|
|
def run(self):
|
|
def _flash_one(i, dev):
|
|
try:
|
|
def on_status(msg):
|
|
self.device_status.emit(i, msg)
|
|
|
|
result = flash_device(
|
|
dev["ip"], self.firmware_path,
|
|
status_cb=on_status
|
|
)
|
|
self.device_done.emit(i, result)
|
|
except Exception as e:
|
|
self.device_done.emit(i, f"FAIL: {e}")
|
|
|
|
# Use configured max_workers (0 = unlimited = one per device)
|
|
workers = self.max_workers if self.max_workers > 0 else len(self.devices)
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
futures = []
|
|
for i, dev in enumerate(self.devices):
|
|
futures.append(executor.submit(_flash_one, i, dev))
|
|
for f in futures:
|
|
f.result()
|
|
|
|
self.all_done.emit()
|
|
|
|
|
|
class App(QWidget):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("IoT Firmware Loader")
|
|
self.firmware = None
|
|
self.all_devices = [] # all scan results (unfiltered)
|
|
self.devices = [] # currently displayed (filtered)
|
|
self.scan_thread = None
|
|
|
|
info = get_machine_info()
|
|
self.local_ip = info["ip"]
|
|
self.gateway_ip = self._guess_gateway(self.local_ip)
|
|
|
|
layout = QVBoxLayout()
|
|
layout.setSpacing(4)
|
|
layout.setContentsMargins(8, 6, 8, 6)
|
|
|
|
# ── Title ──
|
|
title = QLabel("⚡ IoT Firmware Loader (MiraV3)")
|
|
title.setObjectName("title")
|
|
title.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
|
layout.addWidget(title)
|
|
|
|
copyright_label = QLabel(f"© {datetime.datetime.now().year} Smatec — R&D Software")
|
|
copyright_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
|
|
copyright_label.setStyleSheet(
|
|
"color: #9399b2; font-size: 11px; font-weight: 500; font-family: -apple-system, BlinkMacSystemFont, Segoe UI, Roboto;"
|
|
)
|
|
layout.addWidget(copyright_label)
|
|
|
|
# ── Machine Info Group ──
|
|
info_group = CollapsibleGroupBox("🖥 Machine Info")
|
|
info_layout = QVBoxLayout()
|
|
info_layout.setSpacing(2)
|
|
info_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
row1 = QHBoxLayout()
|
|
row1.addWidget(self._info_label("Hostname:"))
|
|
row1.addWidget(self._info_value(info["hostname"]))
|
|
row1.addStretch()
|
|
row1.addWidget(self._info_label("IP:"))
|
|
row1.addWidget(self._info_value(info["ip"]))
|
|
info_layout.addLayout(row1)
|
|
|
|
row2 = QHBoxLayout()
|
|
row2.addWidget(self._info_label("OS:"))
|
|
row2.addWidget(self._info_value(info["os"]))
|
|
row2.addStretch()
|
|
row2.addWidget(self._info_label("MAC:"))
|
|
row2.addWidget(self._info_value(info["mac"]))
|
|
info_layout.addLayout(row2)
|
|
|
|
info_group.set_content_layout(info_layout)
|
|
layout.addWidget(info_group)
|
|
|
|
# ── Firmware Selection ──
|
|
fw_group = CollapsibleGroupBox("📦 Firmware")
|
|
fw_layout = QHBoxLayout()
|
|
fw_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
self.fw_label = QLabel("No firmware selected")
|
|
self.fw_label.setObjectName("info")
|
|
self.fw_label.setWordWrap(True)
|
|
fw_layout.addWidget(self.fw_label, 1)
|
|
|
|
btn_fw = QPushButton("📁 Select File")
|
|
btn_fw.clicked.connect(self.select_fw)
|
|
btn_fw.setFixedWidth(110)
|
|
fw_layout.addWidget(btn_fw)
|
|
|
|
fw_group.set_content_layout(fw_layout)
|
|
layout.addWidget(fw_group)
|
|
|
|
# ── Network Scan ──
|
|
scan_group = CollapsibleGroupBox("📡 Network Scan")
|
|
scan_layout = QVBoxLayout()
|
|
scan_layout.setContentsMargins(0, 0, 0, 0)
|
|
|
|
net_row = QHBoxLayout()
|
|
net_row.addWidget(QLabel("Network:"))
|
|
self.net_input = QLineEdit(get_default_network(self.local_ip))
|
|
self.net_input.setPlaceholderText("e.g. 192.168.4.0/24")
|
|
net_row.addWidget(self.net_input, 1)
|
|
|
|
btn_scan = QPushButton("🔍 Scan LAN")
|
|
btn_scan.setObjectName("scan")
|
|
btn_scan.clicked.connect(self.scan)
|
|
btn_scan.setFixedWidth(110)
|
|
net_row.addWidget(btn_scan)
|
|
scan_layout.addLayout(net_row)
|
|
|
|
self.scan_progress_bar = QProgressBar()
|
|
self.scan_progress_bar.setObjectName("scan_bar")
|
|
self.scan_progress_bar.setRange(0, 0)
|
|
self.scan_progress_bar.setFormat("")
|
|
self.scan_progress_bar.setVisible(False)
|
|
scan_layout.addWidget(self.scan_progress_bar)
|
|
|
|
self.scan_status = QLabel("")
|
|
self.scan_status.setObjectName("info")
|
|
scan_layout.addWidget(self.scan_status)
|
|
|
|
scan_group.set_content_layout(scan_layout)
|
|
layout.addWidget(scan_group)
|
|
|
|
# ── Device Table ──
|
|
dev_group = QGroupBox("📋 Devices Found")
|
|
dev_layout = QVBoxLayout()
|
|
dev_layout.setSpacing(4)
|
|
dev_layout.setContentsMargins(4, 4, 4, 4)
|
|
|
|
self.table = QTableWidget()
|
|
self.table.setColumnCount(5)
|
|
self.table.setHorizontalHeaderLabels(["", "IP", "Name", "MAC", "Status"])
|
|
self.table.setAlternatingRowColors(True)
|
|
header = self.table.horizontalHeader()
|
|
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
|
|
header.resizeSection(0, 40)
|
|
for col in range(1, 5):
|
|
header.setSectionResizeMode(col, QHeaderView.ResizeMode.Stretch)
|
|
self.table.verticalHeader().setVisible(False)
|
|
self.table.setSelectionBehavior(
|
|
QTableWidget.SelectionBehavior.SelectRows
|
|
)
|
|
dev_layout.addWidget(self.table)
|
|
|
|
filter_row = QHBoxLayout()
|
|
self.device_count_label = QLabel("Total: 0 devices")
|
|
self.device_count_label.setObjectName("info")
|
|
filter_row.addWidget(self.device_count_label)
|
|
filter_row.addStretch()
|
|
|
|
btn_select_all = QPushButton("☑ Select All")
|
|
btn_select_all.clicked.connect(self._select_all_devices)
|
|
filter_row.addWidget(btn_select_all)
|
|
|
|
btn_deselect_all = QPushButton("☐ Deselect All")
|
|
btn_deselect_all.clicked.connect(self._deselect_all_devices)
|
|
filter_row.addWidget(btn_deselect_all)
|
|
|
|
self.show_all_cb = QCheckBox("Show all (include gateway & self)")
|
|
self.show_all_cb.setChecked(False)
|
|
self.show_all_cb.stateChanged.connect(self._refresh_table)
|
|
filter_row.addWidget(self.show_all_cb)
|
|
dev_layout.addLayout(filter_row)
|
|
|
|
dev_group.setLayout(dev_layout)
|
|
layout.addWidget(dev_group, stretch=1)
|
|
|
|
# ── Flash Controls ──
|
|
flash_group = QGroupBox("🚀 Flash")
|
|
flash_layout = QVBoxLayout()
|
|
flash_layout.setSpacing(4)
|
|
flash_layout.setContentsMargins(4, 4, 4, 4)
|
|
|
|
self.progress = QProgressBar()
|
|
self.progress.setFormat("%v / %m devices (%p%)")
|
|
flash_layout.addWidget(self.progress)
|
|
|
|
# Parallel count row
|
|
parallel_row = QHBoxLayout()
|
|
parallel_row.addWidget(QLabel("Concurrent devices:"))
|
|
self.parallel_spin = QSpinBox()
|
|
self.parallel_spin.setRange(0, 100)
|
|
self.parallel_spin.setValue(10)
|
|
self.parallel_spin.setSpecialValueText("Unlimited")
|
|
self.parallel_spin.setToolTip("0 = unlimited (all devices at once)")
|
|
self.parallel_spin.setFixedWidth(80)
|
|
parallel_row.addWidget(self.parallel_spin)
|
|
parallel_row.addStretch()
|
|
flash_layout.addLayout(parallel_row)
|
|
|
|
btn_flash = QPushButton("⚡ Flash Selected Devices")
|
|
btn_flash.setObjectName("flash")
|
|
btn_flash.clicked.connect(self.flash_all)
|
|
flash_layout.addWidget(btn_flash)
|
|
|
|
flash_group.setLayout(flash_layout)
|
|
layout.addWidget(flash_group)
|
|
|
|
self.setLayout(layout)
|
|
|
|
# ── Helpers ──
|
|
|
|
def _guess_gateway(self, ip):
|
|
"""Guess gateway IP (x.x.x.1) from local IP."""
|
|
try:
|
|
parts = ip.split(".")
|
|
return f"{parts[0]}.{parts[1]}.{parts[2]}.1"
|
|
except Exception:
|
|
return ""
|
|
|
|
def _info_label(self, text):
|
|
lbl = QLabel(text)
|
|
lbl.setObjectName("info")
|
|
return lbl
|
|
|
|
def _info_value(self, text):
|
|
lbl = QLabel(text)
|
|
lbl.setStyleSheet("color: #cdd6f4; font-weight: bold;")
|
|
return lbl
|
|
|
|
def _get_filtered_devices(self):
|
|
"""Return devices filtered based on show_all checkbox."""
|
|
if self.show_all_cb.isChecked():
|
|
return list(self.all_devices)
|
|
excluded = {self.local_ip, self.gateway_ip}
|
|
return [d for d in self.all_devices if d["ip"] not in excluded]
|
|
|
|
def _refresh_table(self):
|
|
"""Re-populate table based on current filter state."""
|
|
self.devices = self._get_filtered_devices()
|
|
self.table.setRowCount(len(self.devices))
|
|
|
|
for i, d in enumerate(self.devices):
|
|
# Checkbox column
|
|
cb_item = QTableWidgetItem()
|
|
cb_item.setCheckState(Qt.CheckState.Checked)
|
|
cb_item.setFlags(cb_item.flags() | Qt.ItemFlag.ItemIsUserCheckable)
|
|
self.table.setItem(i, 0, cb_item)
|
|
|
|
ip_item = QTableWidgetItem(d["ip"])
|
|
name_item = QTableWidgetItem(d.get("name", ""))
|
|
mac_item = QTableWidgetItem(d["mac"].upper())
|
|
status_item = QTableWidgetItem(d["status"])
|
|
|
|
# Dim gateway & self if showing all
|
|
if d["ip"] in {self.local_ip, self.gateway_ip}:
|
|
for item in [ip_item, name_item, mac_item, status_item]:
|
|
item.setForeground(QColor("#4a5568"))
|
|
cb_item.setCheckState(Qt.CheckState.Unchecked)
|
|
|
|
self.table.setItem(i, 1, ip_item)
|
|
self.table.setItem(i, 2, name_item)
|
|
self.table.setItem(i, 3, mac_item)
|
|
self.table.setItem(i, 4, status_item)
|
|
|
|
total = len(self.devices)
|
|
hidden = len(self.all_devices) - total
|
|
label = f"Total: {total} devices"
|
|
if hidden > 0:
|
|
label += f" (hiding {hidden})"
|
|
self.device_count_label.setText(label)
|
|
|
|
def _select_all_devices(self):
|
|
"""Check all device checkboxes."""
|
|
for i in range(self.table.rowCount()):
|
|
item = self.table.item(i, 0)
|
|
if item:
|
|
item.setCheckState(Qt.CheckState.Checked)
|
|
|
|
def _deselect_all_devices(self):
|
|
"""Uncheck all device checkboxes."""
|
|
for i in range(self.table.rowCount()):
|
|
item = self.table.item(i, 0)
|
|
if item:
|
|
item.setCheckState(Qt.CheckState.Unchecked)
|
|
|
|
# ── Actions ──
|
|
|
|
def select_fw(self):
|
|
file, _ = QFileDialog.getOpenFileName(
|
|
self, "Select Firmware", "",
|
|
"Firmware Files (*.bin *.hex *.uf2);;All Files (*)"
|
|
)
|
|
if file:
|
|
self.firmware = file
|
|
name = file.split("/")[-1]
|
|
self.fw_label.setText(f"✅ {name}")
|
|
self.fw_label.setStyleSheet("color: #a6e3a1;")
|
|
|
|
def scan(self):
|
|
network_str = self.net_input.text().strip()
|
|
|
|
try:
|
|
network = ipaddress.ip_network(network_str, strict=False)
|
|
except ValueError:
|
|
QMessageBox.warning(
|
|
self, "Invalid Network",
|
|
f"'{network_str}' is not a valid network.\n"
|
|
"Example: 192.168.4.0/24"
|
|
)
|
|
return
|
|
|
|
self.scan_status.setText("⏳ Preparing scan...")
|
|
self.scan_status.setStyleSheet("color: #f9e2af;")
|
|
self.table.setRowCount(0)
|
|
self.devices = []
|
|
|
|
self.scan_progress_bar.setRange(0, 0)
|
|
self.scan_progress_bar.setFormat(" Starting...")
|
|
self.scan_progress_bar.setVisible(True)
|
|
|
|
self.scan_thread = ScanThread(network)
|
|
self.scan_thread.finished.connect(self._on_scan_done)
|
|
self.scan_thread.error.connect(self._on_scan_error)
|
|
self.scan_thread.scan_progress.connect(self._on_scan_progress)
|
|
self.scan_thread.stage.connect(self._on_scan_stage)
|
|
self.scan_thread.start()
|
|
|
|
def _on_scan_done(self, results):
|
|
self.scan_progress_bar.setVisible(False)
|
|
self.all_devices = []
|
|
|
|
for dev in results:
|
|
dev["status"] = "READY"
|
|
self.all_devices.append(dev)
|
|
|
|
self.all_devices.sort(key=lambda d: ipaddress.ip_address(d["ip"]))
|
|
|
|
# Apply filter and populate table
|
|
self._refresh_table()
|
|
|
|
total_all = len(self.all_devices)
|
|
total_shown = len(self.devices)
|
|
self.scan_status.setText(f"✅ Scan complete — {total_all} found, {total_shown} shown")
|
|
self.scan_status.setStyleSheet("color: #a6e3a1;")
|
|
|
|
def _on_scan_stage(self, stage):
|
|
if stage == "ping":
|
|
self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m")
|
|
self.scan_status.setText("⏳ Pinging all hosts...")
|
|
self.scan_status.setStyleSheet("color: #f9e2af;")
|
|
elif stage == "arp":
|
|
self.scan_progress_bar.setRange(0, 0)
|
|
self.scan_progress_bar.setFormat(" Reading ARP cache...")
|
|
self.scan_status.setText("⏳ Reading ARP cache...")
|
|
elif stage == "scapy":
|
|
self.scan_progress_bar.setRange(0, 0)
|
|
self.scan_progress_bar.setFormat(" ARP broadcast scan...")
|
|
self.scan_status.setText("⏳ Running ARP broadcast scan...")
|
|
elif stage == "hostname":
|
|
self.scan_progress_bar.setRange(0, 0)
|
|
self.scan_progress_bar.setFormat(" Resolving hostnames...")
|
|
self.scan_status.setText("⏳ Resolving hostnames...")
|
|
|
|
def _on_scan_progress(self, done, total):
|
|
self.scan_progress_bar.setRange(0, total)
|
|
self.scan_progress_bar.setValue(done)
|
|
self.scan_status.setText(f"⏳ Pinging hosts... {done} / {total}")
|
|
self.scan_status.setStyleSheet("color: #f9e2af;")
|
|
|
|
def _on_scan_error(self, error_msg):
|
|
self.scan_progress_bar.setVisible(False)
|
|
self.scan_status.setText("❌ Scan failed")
|
|
self.scan_status.setStyleSheet("color: #f38ba8;")
|
|
QMessageBox.critical(
|
|
self, "Scan Error", f"Failed to scan network:\n{error_msg}"
|
|
)
|
|
|
|
def _get_selected_devices(self):
|
|
"""Return list of (table_row_index, device_dict) for checked devices."""
|
|
selected = []
|
|
for i in range(self.table.rowCount()):
|
|
item = self.table.item(i, 0)
|
|
if item and item.checkState() == Qt.CheckState.Checked:
|
|
if i < len(self.devices):
|
|
selected.append((i, self.devices[i]))
|
|
return selected
|
|
|
|
def flash_all(self):
|
|
if not self.firmware:
|
|
QMessageBox.warning(
|
|
self, "No Firmware",
|
|
"Please select a firmware file first."
|
|
)
|
|
return
|
|
|
|
if not self.devices:
|
|
QMessageBox.information(
|
|
self, "No Devices",
|
|
"No devices found to flash.\n"
|
|
"Please scan the network first."
|
|
)
|
|
return
|
|
|
|
selected = self._get_selected_devices()
|
|
if not selected:
|
|
QMessageBox.information(
|
|
self, "No Selection",
|
|
"No devices selected.\n"
|
|
"Check the boxes next to the devices you want to flash."
|
|
)
|
|
return
|
|
|
|
total = len(selected)
|
|
self.progress.setMaximum(total)
|
|
self.progress.setValue(0)
|
|
|
|
# Build list with row indices for UI updates
|
|
self._flash_row_map = {}
|
|
flash_devices = []
|
|
for idx, (row, dev) in enumerate(selected):
|
|
self._flash_row_map[idx] = row
|
|
flash_devices.append(dev)
|
|
|
|
# Run flashing in background thread so UI doesn't freeze
|
|
self.flash_thread = FlashThread(
|
|
flash_devices, self.firmware,
|
|
max_workers=self.parallel_spin.value()
|
|
)
|
|
self.flash_thread.device_status.connect(self._on_flash_status)
|
|
self.flash_thread.device_done.connect(self._on_flash_done)
|
|
self.flash_thread.all_done.connect(self._on_flash_all_done)
|
|
self.flash_thread.start()
|
|
|
|
def _on_flash_status(self, index, msg):
|
|
"""Update status column while flashing."""
|
|
row = self._flash_row_map.get(index, index)
|
|
self.table.setItem(row, 4, QTableWidgetItem(f"⏳ {msg}"))
|
|
|
|
def _on_flash_done(self, index, result):
|
|
"""One device finished flashing."""
|
|
row = self._flash_row_map.get(index, index)
|
|
if result.startswith("DONE"):
|
|
item = QTableWidgetItem(f"✅ {result}")
|
|
item.setForeground(QColor("#a6e3a1"))
|
|
# Auto-uncheck so it won't be flashed again
|
|
cb = self.table.item(row, 0)
|
|
if cb:
|
|
cb.setCheckState(Qt.CheckState.Unchecked)
|
|
else:
|
|
item = QTableWidgetItem(f"❌ {result}")
|
|
item.setForeground(QColor("#f38ba8"))
|
|
self.table.setItem(row, 4, item)
|
|
self.progress.setValue(self.progress.value() + 1)
|
|
|
|
def _on_flash_all_done(self):
|
|
"""All flashing complete."""
|
|
QMessageBox.information(self, "Flash Complete", "All devices have been processed.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
app.setStyleSheet(STYLE)
|
|
|
|
window = App()
|
|
|
|
# 60% height, limited width (750px), centered
|
|
screen = app.primaryScreen().availableGeometry()
|
|
w = min(750, screen.width())
|
|
h = int(screen.height() * 0.6)
|
|
x = (screen.width() - w) // 2
|
|
y = screen.y() + (screen.height() - h) // 2
|
|
window.setGeometry(x, y, w, h)
|
|
window.show()
|
|
|
|
sys.exit(app.exec()) |