/*
 * Decompiled with CFR 0.152.
 */
package com.elitescloud.boot.mq.config.support;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.crypto.digest.MD5;
import com.elitescloud.boot.common.CloudtBootLoggerFactory;
import com.elitescloud.boot.mq.MessageQueueListener;
import com.elitescloud.boot.mq.MessageQueueTemplate;
import com.elitescloud.boot.mq.common.BaseMessage;
import com.elitescloud.boot.mq.common.MessageDuplicateStrategy;
import com.elitescloud.boot.mq.common.MessageQueueStorage;
import com.elitescloud.boot.mq.common.model.StoreMessageDTO;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.mq.config.support.queue.MqProvider;
import com.elitescloud.boot.redis.util.RedisUtils;
import com.elitescloud.boot.threadpool.common.ThreadPoolHolder;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;

public class DefaultMessageQueueTemplate
implements MessageQueueTemplate {
    private static final Logger logger = CloudtBootLoggerFactory.MQ.getLogger(DefaultMessageQueueTemplate.class);
    private final MqProvider mqProvider;
    private final ThreadPoolExecutor taskExecutor;
    private final CloudtMqProperties mqProperties;
    private final RedisUtils redisUtils;
    private List<MessageQueueStorage> queueStorageList = Collections.emptyList();
    private List<MessageDuplicateStrategy> duplicateStrategyList = Collections.emptyList();

    public DefaultMessageQueueTemplate(MqProvider mqProvider, CloudtMqProperties mqProperties, RedisUtils redisUtils) {
        this.mqProvider = mqProvider;
        this.taskExecutor = ThreadPoolHolder.createThreadPool((String)mqProperties.getThreadPrefix(), (Integer)mqProperties.getThreadCoreSize(), (Integer)mqProperties.getThreadMaxSize());
        this.mqProperties = mqProperties;
        this.redisUtils = redisUtils;
    }

    @Override
    public <T extends Serializable> void sendMessage(String channel, T payload) {
        this.sendMessage(null, channel, payload);
    }

    @Override
    public <T extends Serializable> void sendMessage(String appCode, String channel, T payload) {
        this.sendMessage(appCode, channel, payload, false);
    }

    @Override
    public <T extends Serializable> void sendMessageSync(String appCode, String channel, T payload) {
        this.sendMessage(appCode, channel, payload, true);
    }

    @Override
    public <T extends Serializable> void publishMessage(String channel, T payload) {
        this.publishMessage(null, channel, payload);
    }

    @Override
    public <T extends Serializable> void publishMessage(String appCode, String channel, T payload) {
        this.publishMessage(appCode, channel, payload, false);
    }

    @Override
    public <T extends Serializable> void publishMessageSync(String appCode, String channel, T payload) {
        this.publishMessage(appCode, channel, payload, true);
    }

    public void setQueueStorageList(List<MessageQueueStorage> queueStorageList) {
        this.queueStorageList = (List)ObjectUtil.defaultIfNull(queueStorageList, Collections.emptyList());
    }

    public void setDuplicateStrategyList(List<MessageDuplicateStrategy> duplicateStrategyList) {
        this.duplicateStrategyList = (List)ObjectUtil.defaultIfNull(duplicateStrategyList, Collections.emptyList());
    }

    private <T extends Serializable> void sendMessage(String appCode, String channel, T payload, boolean sync) {
        Assert.notBlank((CharSequence)channel, (String)"\u6d88\u606f\u6e20\u9053\u4e3a\u7a7a", (Object[])new Object[0]);
        Assert.notNull(payload, (String)"\u6d88\u606f\u4f53\u4e3a\u7a7a", (Object[])new Object[0]);
        if (Boolean.FALSE.equals(this.mqProperties.getEnabled())) {
            logger.error("MQ\u5df2\u7981\u7528");
            return;
        }
        LocalDateTime startTime = LocalDateTime.now();
        String messageId = this.mqProvider.generateMessageId();
        if (this.isDisabledApp(appCode)) {
            logger.error("{}\u7684\u6d88\u606f\u53d1\u9001\u5df2\u7981\u7528", (Object)appCode);
            this.storageRecord(appCode, channel, messageId, payload, startTime, true, false, false, "\u5df2\u7981\u7528\u8be5\u5e94\u7528\u53d1\u9001");
            return;
        }
        List<MessageQueueListener<? extends Serializable>> listeners = this.mqProvider.getListeners(channel);
        if (listeners.isEmpty()) {
            this.executeSend(() -> this.mqProvider.publishMessage(channel, messageId, payload), appCode, channel, messageId, payload, false, startTime, sync);
            return;
        }
        logger.info("\u672c\u5730\u6d88\u606f\uff1a{}, \u6d88\u8d39\u8005\uff1a{}", (Object)channel, (Object)listeners.size());
        Runnable runnable = () -> {
            for (MessageQueueListener listener : listeners) {
                try {
                    listener.onConsume(channel, payload);
                }
                catch (Throwable e) {
                    logger.error("{}\u6d88\u8d39\u6d88\u606f\u5f02\u5e38\uff1a", (Object)listener.getClass().getName(), (Object)e);
                }
            }
        };
        if (sync) {
            this.executeSend(runnable, appCode, channel, messageId, payload, true, startTime, true);
        } else {
            this.executeSend(() -> this.taskExecutor.execute(runnable), appCode, channel, messageId, payload, true, startTime, false);
        }
    }

    private <T extends Serializable> void publishMessage(String appCode, String channel, T payload, boolean sync) {
        Assert.notBlank((CharSequence)channel, (String)"\u6d88\u606f\u6e20\u9053\u4e3a\u7a7a", (Object[])new Object[0]);
        Assert.notNull(payload, (String)"\u6d88\u606f\u4f53\u4e3a\u7a7a", (Object[])new Object[0]);
        if (Boolean.FALSE.equals(this.mqProperties.getEnabled())) {
            logger.error("MQ is disabled");
            return;
        }
        LocalDateTime startTime = LocalDateTime.now();
        String messageId = this.mqProvider.generateMessageId();
        if (this.isDisabledApp(appCode)) {
            logger.info("{}\u7684\u6d88\u606f\u53d1\u9001\u5df2\u7981\u7528", (Object)appCode);
            this.storageRecord(appCode, channel, messageId, payload, startTime, false, false, false, "\u5df2\u7981\u7528\u8be5\u5e94\u7528\u53d1\u9001");
            return;
        }
        this.executeSend(() -> this.mqProvider.publishMessage(channel, messageId, payload), appCode, channel, messageId, payload, false, startTime, sync);
    }

    private boolean isDisabledApp(String appCode) {
        if (CharSequenceUtil.isBlank((CharSequence)appCode)) {
            return false;
        }
        Set<String> apps = this.mqProperties.getDisabledApps();
        if (CollUtil.isEmpty(apps)) {
            return false;
        }
        return apps.contains("*") || apps.contains(appCode);
    }

    private void executeSend(Runnable sendThread, String appCode, String channel, String messageId, Serializable payload, boolean local, LocalDateTime startTime, boolean sync) {
        if (this.isDuplicate(appCode, channel, payload)) {
            this.storageRecord(appCode, channel, messageId, payload, startTime, local, false, false, "\u6d88\u606f\u5df2\u91cd\u590d");
            return;
        }
        final Supplier<Void> sender = () -> {
            this.storageRecord(appCode, channel, messageId, payload, startTime, local, true, true, null);
            Throwable exp = null;
            long start = System.currentTimeMillis();
            try {
                sendThread.run();
            }
            catch (Throwable e) {
                exp = e;
                throw e;
            }
            finally {
                this.updateSendResult(messageId, local, exp, System.currentTimeMillis() - start, sync);
            }
            return null;
        };
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization((TransactionSynchronization)new TransactionSynchronization(){

                public void afterCommit() {
                    logger.info("after transaction committed");
                    sender.get();
                }
            });
        } else {
            logger.info("there is no transaction");
            sender.get();
        }
    }

    private boolean isDuplicate(String appCode, String channel, Serializable payload) {
        if (Boolean.TRUE.equals(this.mqProperties.getAllowDuplicate())) {
            return false;
        }
        if (!this.duplicateStrategyList.isEmpty()) {
            for (MessageDuplicateStrategy messageDuplicateStrategy : this.duplicateStrategyList) {
                if (!messageDuplicateStrategy.isDuplicate(appCode, channel, payload)) continue;
                logger.info("\u81ea\u5b9a\u4e49\u6d88\u606f\u5224\u91cd\u7b56\u7565\u5224\u65ad\u5df2\u91cd\u590d\uff1a{}\uff0c{}", (Object)appCode, (Object)channel);
                return true;
            }
            return false;
        }
        String digest = null;
        try {
            String txt = appCode + "&" + channel + "&" + this.mqProvider.serializeMessage(payload);
            digest = MD5.create().digestHex(txt, StandardCharsets.UTF_8);
        }
        catch (Exception e) {
            throw new IllegalStateException("\u5224\u65ad\u6d88\u606f\u662f\u5426\u91cd\u590d\u5f02\u5e38", e);
        }
        Object obj = this.redisUtils.get(digest);
        if (obj != null) {
            logger.info("\u6d88\u606f\u5df2\u91cd\u590d\uff1a{}\uff0c{}", (Object)appCode, (Object)channel);
            return true;
        }
        this.redisUtils.set(digest, (Object)"true", this.mqProperties.getDuplicateInterval().toSeconds(), TimeUnit.SECONDS);
        return false;
    }

    private String storageRecord(String appCode, String channel, String messageId, Serializable payload, LocalDateTime sendTime, boolean local, boolean success, boolean retry, String failReason) {
        if (Boolean.FALSE.equals(this.mqProperties.getAllowStorage())) {
            return null;
        }
        if (this.queueStorageList.isEmpty()) {
            return null;
        }
        LocalDateTime endTime = LocalDateTime.now();
        StoreMessageDTO messageDTO = new StoreMessageDTO();
        messageDTO.setMessageId(messageId);
        messageDTO.setAppCode(appCode);
        messageDTO.setChannel(channel);
        messageDTO.setMessageContent(this.mqProvider.serializeMessage(payload));
        if (payload instanceof BaseMessage) {
            messageDTO.setBusinessKey(((BaseMessage)payload).getBusinessKey());
        }
        messageDTO.setSendTime(sendTime);
        messageDTO.setFinishTime(endTime);
        messageDTO.setLocal(local);
        messageDTO.setSuccess(success);
        messageDTO.setFailReason(failReason);
        messageDTO.setRetryable(retry);
        for (MessageQueueStorage messageQueueStorage : this.queueStorageList) {
            try {
                messageQueueStorage.saveMessage(messageDTO);
            }
            catch (Throwable e) {
                logger.error("\u6301\u4e45\u5316\u6d88\u606f\u53d1\u9001\u8bb0\u5f55\u5f02\u5e38\uff1a", e);
            }
        }
        return messageId;
    }

    public void updateSendResult(String messageId, boolean local, Throwable e, Long costTime, boolean sync) {
        if (CharSequenceUtil.isBlank((CharSequence)messageId)) {
            return;
        }
        String failReason = e == null ? null : e.getMessage();
        Runnable task = () -> {
            if (local) {
                for (MessageQueueStorage messageQueueStorage : this.queueStorageList) {
                    try {
                        messageQueueStorage.updateConsumeResult(messageId, e == null, failReason, (Long)ObjectUtil.defaultIfNull((Object)costTime, (Object)0L));
                    }
                    catch (Throwable t) {
                        logger.error("\u66f4\u65b0\u6d88\u8d39\u7ed3\u679c\u5f02\u5e38\uff1a", e);
                    }
                }
                return;
            }
            for (MessageQueueStorage messageQueueStorage : this.queueStorageList) {
                try {
                    messageQueueStorage.updateSendResult(messageId, e == null, failReason);
                }
                catch (Throwable t) {
                    logger.error("\u66f4\u65b0\u53d1\u9001\u7ed3\u679c\u5f02\u5e38\uff1a", e);
                }
            }
        };
        if (sync) {
            try {
                task.run();
            }
            catch (Exception ex) {
                logger.error("\u66f4\u65b0\u6d88\u606f\u7ed3\u679c\u5f02\u5e38\uff1a{}", (Object)messageId, (Object)ex);
            }
            return;
        }
        CompletableFuture.runAsync(task, this.taskExecutor).whenComplete((res, exp) -> {
            if (exp != null) {
                logger.error("\u66f4\u65b0\u6d88\u606f\u7ed3\u679c\u5f02\u5e38\uff1a{}", (Object)messageId, exp);
            }
        });
    }
}

