Files
Mira_Firmware_Loader/core/ssh_new_flash.py

229 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Luồng SSH cho chế độ "Nạp Mới FW" (Factory Reset / Raw device).
Đặc điểm:
- Thiết bị vừa reset cứng → Telnet port 23 mở, SSH chưa có pass.
- Bước 0 (tuỳ chọn): đặt password qua Telnet → SSH (set_passwd=True).
- Bước 14: kết nối SSH, upload SCP, verify, sync + sysupgrade.
Public API:
set_device_password(ip, user, old_password, new_password, status_cb)
flash_device_new_ssh(ip, firmware_path, user, password,
backup_password, set_passwd, status_cb)
"""
import socket
import time
import random
import paramiko
from core.ssh_utils import (
_create_ssh_client,
_upload_firmware,
_verify_firmware,
_sync_and_sysupgrade,
)
# ──────────────────────────────────────────────────
# Telnet client thủ công (telnetlib bị xoá Python 3.13+)
# ──────────────────────────────────────────────────
class _SimpleTelnet:
"""Minimal Telnet client dùng raw socket."""
def __init__(self, host, port=23, timeout=10):
self._sock = socket.create_connection((host, port), timeout=timeout)
def read_very_eager(self):
try:
self._sock.setblocking(False)
data = b""
while True:
try:
chunk = self._sock.recv(4096)
if not chunk:
break
data += chunk
except (BlockingIOError, OSError):
break
self._sock.setblocking(True)
return data
except Exception:
return b""
def write(self, data: bytes):
self._sock.sendall(data)
def close(self):
try:
self._sock.close()
except Exception:
pass
# ──────────────────────────────────────────────────
# Đặt mật khẩu thiết bị (Telnet → SSH fallback)
# ──────────────────────────────────────────────────
def set_device_password(ip, user="root", old_password="",
new_password="admin123a", status_cb=None):
"""
Đặt mật khẩu thiết bị OpenWrt.
Thứ tự thử:
1. Telnet port 23 — thiết bị vừa reset (chưa có pass, SSH chưa mở)
2. SSH — thiết bị cũ có SSH nhưng cần đổi pass
Returns:
"DONE" — thành công
"FAIL: …" — thất bại sau tất cả các retry
"""
# Jitter để không hammering khi chạy song song nhiều thiết bị
time.sleep(random.uniform(0.1, 1.5))
# ── 1. Thử Telnet ──────────────────────────────────────────────
if status_cb:
status_cb("Checking Telnet port for raw device...")
try:
tn = _SimpleTelnet(ip, timeout=5)
if status_cb:
status_cb("Telnet connected! Setting password...")
time.sleep(1)
tn.read_very_eager() # Flush banner OpenWrt
tn.write(b"passwd\n"); time.sleep(1)
tn.write(new_password.encode("ascii") + b"\n"); time.sleep(1)
tn.write(new_password.encode("ascii") + b"\n"); time.sleep(1)
tn.write(b"exit\n"); time.sleep(0.5)
tn.close()
if status_cb:
status_cb("Password set via Telnet. Waiting for SSH to start...")
time.sleep(3) # Chờ Dropbear (SSH daemon) khởi động
return "DONE"
except Exception:
# Telnet không truy cập được → thử SSH bên dưới
pass
# ── 2. Fallback SSH ─────────────────────────────────────────────
if status_cb:
status_cb("Connecting SSH for password update...")
last_err = None
for attempt in range(3):
try:
client = _create_ssh_client(ip, user, old_password, timeout=10)
if status_cb:
status_cb("Setting password via SSH...")
shell = client.invoke_shell()
time.sleep(1)
if shell.recv_ready():
shell.recv(65535)
shell.send("passwd\n"); time.sleep(2)
shell.send(f"{new_password}\n"); time.sleep(1)
shell.send(f"{new_password}\n"); time.sleep(2)
if shell.recv_ready():
shell.recv(65535)
shell.send("exit\n"); time.sleep(0.5)
shell.close()
client.close()
if status_cb:
status_cb("Password set ✓")
return "DONE"
except paramiko.AuthenticationException as e:
return f"FAIL: Cannot connect SSH (Old pass incorrect?) — {e}"
except Exception as e:
last_err = e
try:
client.close()
except Exception:
pass
time.sleep(2)
return f"FAIL: Cannot connect SSH after 3 attempts — {last_err}"
# ──────────────────────────────────────────────────
# Flash firmware — Nạp Mới (Factory Reset)
# ──────────────────────────────────────────────────
def flash_device_new_ssh(ip, firmware_path, user="root", password="admin123a",
backup_password="", set_passwd=False, status_cb=None):
"""
Flash firmware lên thiết bị OpenWrt vừa reset / chưa có mật khẩu.
Luồng:
0. (Tuỳ chọn) Đặt mật khẩu qua Telnet / SSH
1. Kết nối SSH
2. Upload firmware qua SCP lên /tmp/
3. Verify file tồn tại
4. sync + sysupgrade
Args:
ip : địa chỉ IP thiết bị
firmware_path : đường dẫn file .bin trên máy tính
user : SSH username (mặc định "root")
password : mật khẩu SSH (hoặc mật khẩu mới sẽ đặt)
backup_password : mật khẩu dự phòng nếu password chính không vào được
set_passwd : True = chạy bước đặt mật khẩu trước khi flash
status_cb : callback(str) để cập nhật trạng thái lên UI
Returns:
"DONE" — flash thành công
"FAIL: …" — thất bại, kèm lý do
"""
# ── STEP 0: Đặt mật khẩu (tuỳ chọn) ──────────────────────────
if set_passwd:
result = set_device_password(ip, user, "", password, status_cb)
if result.startswith("FAIL"):
# Thử với backup_password nếu có
if backup_password:
result = set_device_password(ip, user, backup_password, password, status_cb)
# Thử xem password đã được đặt chưa (idempotent)
if result.startswith("FAIL"):
result = set_device_password(ip, user, password, password, status_cb)
if result.startswith("FAIL"):
return result
else:
time.sleep(random.uniform(0.1, 1.5)) # Jitter khi không set_passwd
# ── STEP 1: Kết nối SSH ─────────────────────────────────────────
if status_cb:
status_cb("Connecting SSH...")
try:
client = _create_ssh_client(ip, user, password)
except paramiko.AuthenticationException:
return "FAIL: SSH authentication failed"
except paramiko.SSHException as e:
return f"FAIL: SSH error — {e}"
except Exception as e:
return f"FAIL: Cannot connect — {e}"
# ── STEP 24: Upload → Verify → sysupgrade ─────────────────────
try:
remote_path = _upload_firmware(client, firmware_path, status_cb)
_verify_firmware(client, remote_path, status_cb)
return _sync_and_sysupgrade(client, remote_path, status_cb)
except RuntimeError as e:
return f"FAIL: {e}"
except Exception as e:
return f"FAIL: {e}"
finally:
try:
client.close()
except Exception:
pass