package com.zyplayer.doc.db.framework.sse.util;

import cn.hutool.core.util.StrUtil;
import com.zyplayer.doc.core.exception.ConfirmException;
import com.zyplayer.doc.db.framework.json.DocDbResponseJson;
import com.zyplayer.doc.db.framework.sse.enums.DbSseEmitterParameterEnum;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:com/zyplayer/doc/db/framework/sse/util/DbSseCacheUtil.class */
public class DbSseCacheUtil {
    private static final Logger logger = LoggerFactory.getLogger(DbSseCacheUtil.class);
    public static Map<String, Map<String, Object>> sseCache = new ConcurrentHashMap();

    public static SseEmitter getSseEmitterByClientId(String str) {
        Map<String, Object> map = sseCache.get(str);
        if (map == null || map.isEmpty()) {
            return null;
        }
        return (SseEmitter) map.get(DbSseEmitterParameterEnum.EMITTER.getValue());
    }

    public static ScheduledFuture<?> getSseFutureByClientId(String str) {
        Map<String, Object> map = sseCache.get(str);
        if (map == null || map.isEmpty()) {
            return null;
        }
        return (ScheduledFuture) map.get(DbSseEmitterParameterEnum.FUTURE.getValue());
    }

    public static ScheduledFuture<?> getLoginIdByClientId(String str) {
        Map<String, Object> map = sseCache.get(str);
        if (map == null || map.isEmpty()) {
            return null;
        }
        return (ScheduledFuture) map.get(DbSseEmitterParameterEnum.LOGINID.getValue());
    }

    public static String getClientIdByLoginId(String str) {
        if (!existSseCache()) {
            return null;
        }
        for (Map.Entry<String, Map<String, Object>> entry : sseCache.entrySet()) {
            if (str.equals((String) sseCache.get(entry.getKey()).get(DbSseEmitterParameterEnum.LOGINID.getValue()))) {
                return entry.getKey();
            }
        }
        return null;
    }

    public static boolean existSseCache() {
        return !sseCache.isEmpty();
    }

    public static boolean connectionValidity(String str, String str2) {
        if (sseCache.get(str) == null) {
            return false;
        }
        return Objects.equals(str2, sseCache.get(str).get(DbSseEmitterParameterEnum.LOGINID.getValue()));
    }

    public static void addConnection(String str, String str2, SseEmitter sseEmitter, ScheduledFuture<?> scheduledFuture) {
        if (getSseEmitterByClientId(str) != null) {
            throw new ConfirmException("连接已存在:" + str);
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentHashMap.put(DbSseEmitterParameterEnum.EMITTER.getValue(), sseEmitter);
        if (scheduledFuture != null) {
            concurrentHashMap.put(DbSseEmitterParameterEnum.FUTURE.getValue(), scheduledFuture);
        }
        concurrentHashMap.put(DbSseEmitterParameterEnum.LOGINID.getValue(), str2);
        sseCache.put(str, concurrentHashMap);
    }

    public static void removeConnection(String str) {
        if (getSseEmitterByClientId(str) != null) {
            cancelScheduledFuture(str);
        }
        sseCache.remove(str);
        logger.info("移除连接:{}", str);
    }

    public static void cancelScheduledFuture(String str) {
        ScheduledFuture<?> sseFutureByClientId = getSseFutureByClientId(str);
        if (sseFutureByClientId != null) {
            sseFutureByClientId.cancel(true);
        }
    }

    public static Runnable completionCallBack(String str) {
        return () -> {
            logger.info("结束连接:{}", str);
            removeConnection(str);
            cancelScheduledFuture(str);
        };
    }

    public static Runnable timeoutCallBack(String str) {
        return () -> {
            logger.info("连接超时:{}", str);
            removeConnection(str);
            cancelScheduledFuture(str);
        };
    }

    public static Consumer<Throwable> errorCallBack(String str) {
        return th -> {
            logger.info("推送消息异常:{}", str);
            removeConnection(str);
            cancelScheduledFuture(str);
        };
    }

    public static void sendMessageToAllClient(String str) {
        if (existSseCache()) {
            if (StrUtil.isEmpty(str)) {
                logger.info("群发消息为空");
                return;
            }
            Iterator<Map.Entry<String, Map<String, Object>>> it = sseCache.entrySet().iterator();
            while (it.hasNext()) {
                sendMessageToClientByClientId(it.next().getKey(), DocDbResponseJson.ok(str));
            }
        }
    }

    public static void sendMessageToOneClient(String str, String str2) {
        if (StrUtil.isEmpty(str)) {
            logger.info("客户端ID为空");
        } else if (StrUtil.isEmpty(str2)) {
            logger.info("向客户端{}推送消息为空", str);
        } else {
            sendMessageToClientByClientId(str, DocDbResponseJson.ok(str2));
        }
    }

    public static void sendMessageToClientByClientId(String str, DocDbResponseJson docDbResponseJson) {
        Map<String, Object> map = sseCache.get(str);
        if (map == null || map.isEmpty()) {
            logger.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", str, docDbResponseJson.toString());
            return;
        }
        try {
            ((SseEmitter) Objects.requireNonNull(getSseEmitterByClientId(str))).send(SseEmitter.event().data(docDbResponseJson, MediaType.APPLICATION_JSON));
        } catch (Exception e) {
            logger.error("推送消息失败,报错异常:", e);
            removeConnection(str);
        }
    }
}
