package cn.iocoder.yudao.module.iot.plugin.emqx.upstream;

import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler;
import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceWebhookVertxHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.mqtt.MqttClient;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.class */
public class IotDeviceUpstreamServer {
    private static final int RECONNECT_DELAY_MS = 5000;
    private static final int CONNECTION_TIMEOUT_MS = 10000;
    private final Vertx vertx;
    private final HttpServer server;
    private final MqttClient client;
    private final IotPluginEmqxProperties emqxProperties;
    private final IotDeviceMqttMessageHandler mqttMessageHandler;
    private volatile boolean isRunning = false;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(IotDeviceUpstreamServer.class);
    private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;

    public IotDeviceUpstreamServer(IotPluginEmqxProperties iotPluginEmqxProperties, IotDeviceUpstreamApi iotDeviceUpstreamApi, Vertx vertx, MqttClient mqttClient) {
        this.vertx = vertx;
        this.emqxProperties = iotPluginEmqxProperties;
        this.client = mqttClient;
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.post(IotDeviceAuthVertxHandler.PATH).handler(new IotDeviceAuthVertxHandler(iotDeviceUpstreamApi));
        router.post(IotDeviceWebhookVertxHandler.PATH).handler(new IotDeviceWebhookVertxHandler(iotDeviceUpstreamApi));
        this.server = vertx.createHttpServer().requestHandler(router);
        this.mqttMessageHandler = new IotDeviceMqttMessageHandler(iotDeviceUpstreamApi, mqttClient);
    }

    public void start() {
        if (this.isRunning) {
            log.warn("[start][服务已经在运行中，请勿重复启动]");
        } else {
            log.info("[start][开始启动服务]");
            CompletableFuture.allOf(this.server.listen(this.emqxProperties.getAuthPort().intValue()).toCompletionStage().toCompletableFuture().thenAccept(httpServer -> {
                log.info("[start][HTTP 服务器启动完成，端口: {}]", Integer.valueOf(this.server.actualPort()));
            }), connectMqtt().toCompletionStage().toCompletableFuture().thenAccept(r4 -> {
                this.client.closeHandler(r4 -> {
                    log.warn("[closeHandler][MQTT 连接已断开，准备重连]");
                    reconnectWithDelay();
                });
                setupMessageHandler();
            })).orTimeout(10000L, TimeUnit.MILLISECONDS).whenComplete((r5, th) -> {
                if (th != null) {
                    log.error("[start][服务启动失败]", th);
                } else {
                    this.isRunning = true;
                    log.info("[start][所有服务启动完成]");
                }
            });
        }
    }

    private void setupMessageHandler() {
        MqttClient mqttClient = this.client;
        IotDeviceMqttMessageHandler iotDeviceMqttMessageHandler = this.mqttMessageHandler;
        Objects.requireNonNull(iotDeviceMqttMessageHandler);
        mqttClient.publishHandler(iotDeviceMqttMessageHandler::handle);
        log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
    }

    private void reconnectWithDelay() {
        if (this.isRunning) {
            this.vertx.setTimer(5000L, l -> {
                log.info("[reconnectWithDelay][开始重新连接 MQTT]");
                connectMqtt();
            });
        } else {
            log.info("[reconnectWithDelay][服务已停止，不再尝试重连]");
        }
    }

    private Future<Void> connectMqtt() {
        return this.client.connect(this.emqxProperties.getMqttPort().intValue(), this.emqxProperties.getMqttHost()).compose(mqttConnAckMessage -> {
            log.info("[connectMqtt][MQTT客户端连接成功]");
            return subscribeToTopics();
        }).recover(th -> {
            log.error("[connectMqtt][连接MQTT Broker失败:]", th);
            reconnectWithDelay();
            return Future.failedFuture(th);
        });
    }

    private Future<Void> subscribeToTopics() {
        String[] mqttTopics = this.emqxProperties.getMqttTopics();
        if (ArrayUtil.isEmpty(mqttTopics)) {
            log.warn("[subscribeToTopics][未配置MQTT主题，跳过订阅]");
            return Future.succeededFuture();
        }
        log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
        Future<Void> succeededFuture = Future.succeededFuture();
        for (String str : mqttTopics) {
            String trim = str.trim();
            if (!trim.isEmpty()) {
                succeededFuture = succeededFuture.compose(r6 -> {
                    return this.client.subscribe(trim, DEFAULT_QOS.value()).map(num -> {
                        log.info("[subscribeToTopics][成功订阅主题: {}]", trim);
                        return null;
                    }).recover(th -> {
                        log.error("[subscribeToTopics][订阅主题失败: {}]", trim, th);
                        return Future.succeededFuture();
                    });
                });
            }
        }
        return succeededFuture;
    }

    public void stop() {
        if (!this.isRunning) {
            log.warn("[stop][服务未运行，无需停止]");
            return;
        }
        log.info("[stop][开始关闭服务]");
        this.isRunning = false;
        try {
            if (this.server != null) {
                this.server.close().toCompletionStage().toCompletableFuture().join();
            }
            if (this.client != null) {
                this.client.disconnect().toCompletionStage().toCompletableFuture().join();
            }
            if (this.vertx != null) {
                this.vertx.close().toCompletionStage().toCompletableFuture().join();
            }
            log.info("[stop][关闭完成]");
        } catch (Exception e) {
            log.error("[stop][关闭服务异常]", e);
            throw new RuntimeException("关闭 IoT 设备上行服务失败", e);
        }
    }
}
