update scan ip with MAC, update UI show history
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import ipaddress
|
||||
@@ -45,6 +46,58 @@ def _ping_one(ip, is_win):
|
||||
return False
|
||||
|
||||
|
||||
def _get_mac_from_arp(ip):
|
||||
"""
|
||||
Lấy MAC address của một IP qua ARP table.
|
||||
|
||||
Chỉ được gọi SAU KHI IP đã phản hồi ping thành công — đảm bảo ARP
|
||||
cache vừa được OS cập nhật với thông tin mới nhất (không bị stale).
|
||||
|
||||
Trả về chuỗi MAC dạng 'AA:BB:CC:DD:EE:FF' hoặc 'N/A' nếu không tra được.
|
||||
"""
|
||||
try:
|
||||
if sys.platform == "win32":
|
||||
# arp -a <ip> → " 192.168.4.5 aa-bb-cc-dd-ee-ff dynamic"
|
||||
r = subprocess.run(
|
||||
["arp", "-a", str(ip)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=3,
|
||||
creationflags=_NO_WINDOW
|
||||
)
|
||||
output = r.stdout.decode(errors="ignore")
|
||||
# Dạng Windows: aa-bb-cc-dd-ee-ff
|
||||
match = re.search(
|
||||
r"([0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}"
|
||||
r"[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2}[-:][0-9a-fA-F]{2})",
|
||||
output
|
||||
)
|
||||
else:
|
||||
# macOS / Linux: arp -n <ip>
|
||||
# macOS output: "? (192.168.4.5) at aa:bb:cc:dd:ee:ff on en0 ..."
|
||||
# Linux output: "192.168.4.5 ether aa:bb:cc:dd:ee:ff C eth0"
|
||||
r = subprocess.run(
|
||||
["arp", "-n", str(ip)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.DEVNULL,
|
||||
timeout=3
|
||||
)
|
||||
output = r.stdout.decode(errors="ignore")
|
||||
match = re.search(
|
||||
r"([0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}"
|
||||
r"[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2}[:-][0-9a-fA-F]{2})",
|
||||
output
|
||||
)
|
||||
|
||||
if match:
|
||||
# Chuẩn hoá sang dấu ':' và chữ hoa
|
||||
mac = match.group(1).replace("-", ":").upper()
|
||||
return mac
|
||||
except Exception:
|
||||
pass
|
||||
return "N/A"
|
||||
|
||||
|
||||
def _ping_sweep(network, progress_cb=None):
|
||||
"""Ping tất cả host trong network đồng thời.
|
||||
Trả về list IP đã phản hồi. Gọi progress_cb(done, total) sau mỗi ping.
|
||||
@@ -83,10 +136,37 @@ def _ping_sweep(network, progress_cb=None):
|
||||
|
||||
|
||||
def scan_network(network, progress_cb=None, stage_cb=None):
|
||||
"""Scan network: chỉ dùng ping để xác định thiết bị online."""
|
||||
"""Scan network: ping → lấy MAC từ ARP (chỉ cho IP đang online).
|
||||
|
||||
Flow:
|
||||
1. Ping sweep — xác định thiết bị online
|
||||
2. MAC lookup — query ARP cho từng IP vừa phản hồi ping (song song)
|
||||
=> ARP cache vừa được OS cập nhật sau ping, không bị stale.
|
||||
"""
|
||||
# ── Stage 1: Ping ──
|
||||
if stage_cb:
|
||||
stage_cb("ping")
|
||||
alive_ips = _ping_sweep(network, progress_cb)
|
||||
|
||||
results = [{"ip": ip_str, "mac": "N/A"} for ip_str in alive_ips]
|
||||
# ── Stage 2: MAC lookup ──
|
||||
if stage_cb:
|
||||
stage_cb("mac")
|
||||
|
||||
results = []
|
||||
if alive_ips:
|
||||
# Chạy song song để tra MAC nhanh hơn
|
||||
mac_workers = min(32, len(alive_ips))
|
||||
macs = {}
|
||||
with ThreadPoolExecutor(max_workers=mac_workers) as executor:
|
||||
future_to_ip = {executor.submit(_get_mac_from_arp, ip): ip for ip in alive_ips}
|
||||
for future in as_completed(future_to_ip):
|
||||
ip = future_to_ip[future]
|
||||
try:
|
||||
macs[ip] = future.result()
|
||||
except Exception:
|
||||
macs[ip] = "N/A"
|
||||
|
||||
for ip_str in alive_ips:
|
||||
results.append({"ip": ip_str, "mac": macs.get(ip_str, "N/A")})
|
||||
|
||||
return sorted(results, key=lambda d: ipaddress.ip_address(d["ip"]))
|
||||
|
||||
158
main.py
158
main.py
@@ -535,17 +535,77 @@ class App(QWidget):
|
||||
item.setCheckState(Qt.CheckState.Unchecked)
|
||||
|
||||
def _show_history(self):
|
||||
"""Show history of flashed devices in this session."""
|
||||
"""Show history of flashed devices in this session using a table dialog."""
|
||||
if not self.flashed_macs:
|
||||
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
|
||||
else:
|
||||
lines = []
|
||||
for mac in sorted(self.flashed_macs.keys()):
|
||||
ip, _, result, ts = self.flashed_macs[mac]
|
||||
icon = "✅" if result.startswith("DONE") else "❌"
|
||||
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}")
|
||||
msg = f"Flash History ({len(self.flashed_macs)} device(s)):\n\n" + "\n".join(lines)
|
||||
QMessageBox.information(self, "Flash History", msg)
|
||||
return
|
||||
|
||||
from PyQt6.QtWidgets import QDialog
|
||||
dialog = QDialog(self)
|
||||
dialog.setWindowTitle("Flash History")
|
||||
dialog.resize(600, 400)
|
||||
dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
|
||||
|
||||
layout = QVBoxLayout(dialog)
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
|
||||
table = QTableWidget()
|
||||
table.setColumnCount(4)
|
||||
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
|
||||
table.setAlternatingRowColors(True)
|
||||
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
|
||||
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
|
||||
|
||||
header = table.horizontalHeader()
|
||||
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(0, 80)
|
||||
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(1, 120)
|
||||
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(2, 140)
|
||||
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
|
||||
table.verticalHeader().setVisible(False)
|
||||
|
||||
table.setRowCount(len(self.flashed_macs))
|
||||
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
|
||||
for row, mac in enumerate(sorted(self.flashed_macs.keys())):
|
||||
ip, _, result, ts = self.flashed_macs[mac]
|
||||
ok = result.startswith("DONE")
|
||||
if ok:
|
||||
success_count += 1
|
||||
else:
|
||||
fail_count += 1
|
||||
|
||||
time_item = QTableWidgetItem(ts)
|
||||
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
table.setItem(row, 0, time_item)
|
||||
table.setItem(row, 1, QTableWidgetItem(ip))
|
||||
table.setItem(row, 2, QTableWidgetItem(mac))
|
||||
|
||||
res_item = QTableWidgetItem(f"✅ {result}" if ok else f"❌ {result}")
|
||||
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
|
||||
table.setItem(row, 3, res_item)
|
||||
|
||||
layout.addWidget(table)
|
||||
|
||||
summary_lbl = QLabel(f"Total: {len(self.flashed_macs)} | ✅ {success_count} | ❌ {fail_count}")
|
||||
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
|
||||
layout.addWidget(summary_lbl)
|
||||
|
||||
btn_close = QPushButton("Close")
|
||||
btn_close.setFixedHeight(30)
|
||||
btn_close.setFixedWidth(100)
|
||||
btn_close.clicked.connect(dialog.accept)
|
||||
|
||||
btn_layout = QHBoxLayout()
|
||||
btn_layout.addStretch()
|
||||
btn_layout.addWidget(btn_close)
|
||||
layout.addLayout(btn_layout)
|
||||
|
||||
dialog.exec()
|
||||
|
||||
# ── Actions ──
|
||||
|
||||
@@ -614,6 +674,11 @@ class App(QWidget):
|
||||
self.scan_progress_bar.setFormat("⚡ Pinging hosts... %v / %m")
|
||||
self.scan_status.setText("⏳ Pinging all hosts...")
|
||||
self.scan_status.setStyleSheet("color: #f9e2af;")
|
||||
elif stage == "mac":
|
||||
self.scan_progress_bar.setRange(0, 0)
|
||||
self.scan_progress_bar.setFormat(" Reading MAC addresses...")
|
||||
self.scan_status.setText("⏳ Reading MAC addresses from ARP...")
|
||||
self.scan_status.setStyleSheet("color: #f9e2af;")
|
||||
elif stage == "arp":
|
||||
self.scan_progress_bar.setRange(0, 0)
|
||||
self.scan_progress_bar.setFormat(" Reading ARP cache...")
|
||||
@@ -775,7 +840,6 @@ class App(QWidget):
|
||||
row = self._flash_row_map.get(index, index)
|
||||
mac_item = self.table.item(row, 2)
|
||||
ip_item = self.table.item(row, 1)
|
||||
import datetime
|
||||
now_str = datetime.datetime.now().strftime("%H:%M:%S")
|
||||
mac_str = mac_item.text().strip() if mac_item else ""
|
||||
ip_str = ip_item.text().strip() if ip_item else ""
|
||||
@@ -1103,15 +1167,76 @@ class App(QWidget):
|
||||
# ── Auto Flash Actions ──
|
||||
|
||||
def _show_auto_history(self):
|
||||
"""Show history of auto-flashed devices using a table dialog."""
|
||||
if not self._auto_history:
|
||||
QMessageBox.information(self, "Flash History", "No devices have been flashed in this session.")
|
||||
return
|
||||
lines = []
|
||||
for ip, mac, result, ts in self._auto_history:
|
||||
icon = "✅" if result.startswith("DONE") else "❌"
|
||||
lines.append(f"[{ts}] {icon} {ip} ({mac}) — {result}")
|
||||
msg = f"Flash History ({len(self._auto_history)} device(s)):\n\n" + "\n".join(lines)
|
||||
QMessageBox.information(self, "Flash History", msg)
|
||||
|
||||
from PyQt6.QtWidgets import QDialog
|
||||
dialog = QDialog(self)
|
||||
dialog.setWindowTitle("Auto Flash History")
|
||||
dialog.resize(600, 400)
|
||||
dialog.setStyleSheet("QDialog { background-color: #1a1b2e; } QTableWidget { font-size: 12px; }")
|
||||
|
||||
layout = QVBoxLayout(dialog)
|
||||
layout.setContentsMargins(10, 10, 10, 10)
|
||||
|
||||
table = QTableWidget()
|
||||
table.setColumnCount(4)
|
||||
table.setHorizontalHeaderLabels(["Time", "IP", "MAC", "Result"])
|
||||
table.setAlternatingRowColors(True)
|
||||
table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
|
||||
table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
|
||||
|
||||
header = table.horizontalHeader()
|
||||
header.setSectionResizeMode(0, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(0, 80)
|
||||
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(1, 120)
|
||||
header.setSectionResizeMode(2, QHeaderView.ResizeMode.Fixed)
|
||||
header.resizeSection(2, 140)
|
||||
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Stretch)
|
||||
table.verticalHeader().setVisible(False)
|
||||
|
||||
table.setRowCount(len(self._auto_history))
|
||||
|
||||
success_count = 0
|
||||
fail_count = 0
|
||||
|
||||
for row, (ip, mac, result, ts) in enumerate(self._auto_history):
|
||||
ok = result.startswith("DONE")
|
||||
if ok:
|
||||
success_count += 1
|
||||
else:
|
||||
fail_count += 1
|
||||
|
||||
time_item = QTableWidgetItem(ts)
|
||||
time_item.setTextAlignment(Qt.AlignmentFlag.AlignCenter)
|
||||
table.setItem(row, 0, time_item)
|
||||
table.setItem(row, 1, QTableWidgetItem(ip))
|
||||
table.setItem(row, 2, QTableWidgetItem(mac))
|
||||
|
||||
res_item = QTableWidgetItem(f"✅ {result}" if ok else f"❌ {result}")
|
||||
res_item.setForeground(QColor("#a6e3a1") if ok else QColor("#f38ba8"))
|
||||
table.setItem(row, 3, res_item)
|
||||
|
||||
layout.addWidget(table)
|
||||
|
||||
summary_lbl = QLabel(f"Total: {len(self._auto_history)} | ✅ {success_count} | ❌ {fail_count}")
|
||||
summary_lbl.setStyleSheet("color: #cdd6f4; font-size: 13px; font-weight: bold;")
|
||||
layout.addWidget(summary_lbl)
|
||||
|
||||
btn_close = QPushButton("Close")
|
||||
btn_close.setFixedHeight(30)
|
||||
btn_close.setFixedWidth(100)
|
||||
btn_close.clicked.connect(dialog.accept)
|
||||
|
||||
btn_layout = QHBoxLayout()
|
||||
btn_layout.addStretch()
|
||||
btn_layout.addWidget(btn_close)
|
||||
layout.addLayout(btn_layout)
|
||||
|
||||
dialog.exec()
|
||||
|
||||
def _auto_select_firmware(self):
|
||||
file, _ = QFileDialog.getOpenFileName(
|
||||
@@ -1269,6 +1394,7 @@ class App(QWidget):
|
||||
self._auto_fail_count += 1
|
||||
self.auto_result_table.setItem(row, 3, item)
|
||||
self._auto_history.append((ip, mac.upper(), result, now_str))
|
||||
|
||||
total = len(self._auto_device_rows)
|
||||
done = self._auto_success_count + self._auto_fail_count
|
||||
self.auto_summary_label.setText(
|
||||
|
||||
@@ -1 +1 @@
|
||||
1.2.2
|
||||
1.2.3
|
||||
|
||||
Reference in New Issue
Block a user