152 lines
3.9 KiB
TypeScript
152 lines
3.9 KiB
TypeScript
import mqtt, { IClientOptions, MqttClient, Packet } from 'mqtt';
|
|
|
|
type MessageHandler = (topic: string, message: Buffer, packet: Packet) => void;
|
|
type ConnectionHandler = () => void;
|
|
type ErrorHandler = (error: Error) => void;
|
|
|
|
interface MqttCredentials {
|
|
username: string;
|
|
password: string;
|
|
}
|
|
|
|
class MQTTClientManager {
|
|
private client: MqttClient | null = null;
|
|
private messageHandlers = new Set<MessageHandler>();
|
|
private connectHandlers = new Set<ConnectionHandler>();
|
|
private closeHandlers = new Set<ConnectionHandler>();
|
|
private errorHandlers = new Set<ErrorHandler>();
|
|
|
|
/**
|
|
* Kết nối tới MQTT broker.
|
|
* @param credentials Thông tin xác thực (username, password)
|
|
* @param url Địa chỉ MQTT broker (mặc định là /mqtt)
|
|
*/
|
|
connect(credentials: MqttCredentials, url: string = '/mqtt') {
|
|
if (this.client?.connected) return;
|
|
|
|
// Build WebSocket URL
|
|
let mqttUrl = url;
|
|
if (url.startsWith('/')) {
|
|
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
|
|
mqttUrl = `${protocol}//${window.location.host}${url}`;
|
|
}
|
|
|
|
const opts: IClientOptions = {
|
|
clean: true,
|
|
username: credentials.username,
|
|
password: credentials.password,
|
|
reconnectPeriod: 5000,
|
|
connectTimeout: 30 * 1000,
|
|
};
|
|
|
|
this.client = mqtt.connect(mqttUrl, opts);
|
|
|
|
this.client.on('connect', () => {
|
|
console.log('MQTT Connected successfully!');
|
|
this.connectHandlers.forEach((fn) => fn());
|
|
});
|
|
|
|
this.client.on('close', () => {
|
|
console.log('MQTT Connection closed');
|
|
this.closeHandlers.forEach((fn) => fn());
|
|
});
|
|
|
|
this.client.on('error', (error: Error) => {
|
|
console.error('MQTT Error:', error);
|
|
this.errorHandlers.forEach((fn) => fn(error));
|
|
});
|
|
|
|
this.client.on('message', (topic, message, packet) => {
|
|
this.messageHandlers.forEach((fn) => fn(topic, message, packet));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Ngắt kết nối MQTT và giải phóng tài nguyên.
|
|
*/
|
|
disconnect() {
|
|
if (this.client) {
|
|
this.client.end();
|
|
this.client = null;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Subscribe vào một topic.
|
|
* @param topic Topic cần subscribe
|
|
*/
|
|
subscribe(topic: string | string[]) {
|
|
this.client?.subscribe(topic);
|
|
}
|
|
|
|
/**
|
|
* Unsubscribe khỏi một topic.
|
|
* @param topic Topic cần unsubscribe
|
|
*/
|
|
unsubscribe(topic: string | string[]) {
|
|
this.client?.unsubscribe(topic);
|
|
}
|
|
|
|
/**
|
|
* Publish message tới một topic.
|
|
* @param topic Topic để publish
|
|
* @param payload Payload (string hoặc object sẽ được stringify)
|
|
*/
|
|
publish(topic: string, payload: string | object) {
|
|
const payloadStr =
|
|
typeof payload === 'string' ? payload : JSON.stringify(payload);
|
|
this.client?.publish(topic, payloadStr);
|
|
}
|
|
|
|
/**
|
|
* Đăng ký callback khi nhận được message.
|
|
* @param cb Hàm callback xử lý message
|
|
* @returns Hàm hủy đăng ký callback
|
|
*/
|
|
onMessage(cb: MessageHandler) {
|
|
this.messageHandlers.add(cb);
|
|
return () => this.messageHandlers.delete(cb);
|
|
}
|
|
|
|
/**
|
|
* Đăng ký callback khi kết nối thành công.
|
|
*/
|
|
onConnect(cb: ConnectionHandler) {
|
|
this.connectHandlers.add(cb);
|
|
return () => this.connectHandlers.delete(cb);
|
|
}
|
|
|
|
/**
|
|
* Đăng ký callback khi kết nối bị đóng.
|
|
*/
|
|
onClose(cb: ConnectionHandler) {
|
|
this.closeHandlers.add(cb);
|
|
return () => this.closeHandlers.delete(cb);
|
|
}
|
|
|
|
/**
|
|
* Đăng ký callback khi có lỗi.
|
|
*/
|
|
onError(cb: ErrorHandler) {
|
|
this.errorHandlers.add(cb);
|
|
return () => this.errorHandlers.delete(cb);
|
|
}
|
|
|
|
/**
|
|
* Kiểm tra trạng thái kết nối MQTT.
|
|
* @returns true nếu đã kết nối, ngược lại là false
|
|
*/
|
|
isConnected() {
|
|
return this.client?.connected ?? false;
|
|
}
|
|
|
|
/**
|
|
* Lấy instance client MQTT gốc (nếu cần thao tác nâng cao).
|
|
*/
|
|
getClient() {
|
|
return this.client;
|
|
}
|
|
}
|
|
|
|
export const mqttClient = new MQTTClientManager();
|