package com.elitescloud.boot.mq.config.support;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.crypto.digest.MD5;
import com.elitescloud.boot.mq.MessageQueueTemplate;
import com.elitescloud.boot.mq.common.MessageDuplicateStrategy;
import com.elitescloud.boot.mq.common.MessageQueueStorage;
import com.elitescloud.boot.mq.common.model.StoreMessageDTO;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.mq.config.support.AbstractMessageQueueDelegate;
import com.elitescloud.boot.redis.util.RedisUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

/* loaded from: input_file:com/elitescloud/boot/mq/config/support/DefaultMessageQueueTemplate.class */
public class DefaultMessageQueueTemplate extends AbstractMessageQueueDelegate implements MessageQueueTemplate {
    private static final Logger log = LoggerFactory.getLogger(DefaultMessageQueueTemplate.class);
    private final TaskExecutor taskExecutor;
    private final CloudtMqProperties mqProperties;
    private final RedisUtils redisUtils;
    private List<MessageQueueStorage> queueStorageList;
    private List<MessageDuplicateStrategy> duplicateStrategyList;
    private MessageChannel messageChannel;

    public DefaultMessageQueueTemplate(TaskExecutor taskExecutor, ObjectMapper objectMapper, CloudtMqProperties cloudtMqProperties, RedisUtils redisUtils) {
        super(objectMapper);
        this.queueStorageList = Collections.emptyList();
        this.duplicateStrategyList = Collections.emptyList();
        this.taskExecutor = taskExecutor;
        this.mqProperties = cloudtMqProperties;
        this.redisUtils = redisUtils;
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void sendMessage(String str, T t) {
        sendMessage(null, str, t);
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void sendMessage(String str, String str2, T t) {
        Assert.notBlank(str2, "消息渠道为空", new Object[0]);
        Assert.notNull(t, "消息体为空", new Object[0]);
        LocalDateTime now = LocalDateTime.now();
        if (!canSend(str)) {
            log.info("{}的消息发送已禁用", str);
            storage(str, str2, t, now, true, false, "已禁用该应用发送");
            return;
        }
        List<AbstractMessageQueueDelegate.MessageQueueListenerWrapper> orDefault = super.getListenerMap().getOrDefault(str2, Collections.emptyList());
        if (orDefault.isEmpty()) {
            executeSend(() -> {
                sendByMq(str2, t);
            }, str, str2, t, false, now);
        } else {
            log.info("本地消息：{}, 消费者：{}", str2, Integer.valueOf(orDefault.size()));
            executeSend(() -> {
                this.taskExecutor.execute(() -> {
                    Iterator it = orDefault.iterator();
                    while (it.hasNext()) {
                        AbstractMessageQueueDelegate.MessageQueueListenerWrapper messageQueueListenerWrapper = (AbstractMessageQueueDelegate.MessageQueueListenerWrapper) it.next();
                        try {
                            messageQueueListenerWrapper.getMessageQueueListener().onConsume(str2, t);
                        } catch (Throwable th) {
                            log.error("{}消费消息异常：", messageQueueListenerWrapper.getClass().getName(), th);
                        }
                    }
                });
            }, str, str2, t, true, now);
        }
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void publishMessage(String str, T t) {
        publishMessage(null, str, t);
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void publishMessage(String str, String str2, T t) {
        Assert.notBlank(str2, "消息渠道为空", new Object[0]);
        Assert.notNull(t, "消息体为空", new Object[0]);
        LocalDateTime now = LocalDateTime.now();
        if (canSend(str)) {
            executeSend(() -> {
                sendByMq(str2, t);
            }, str, str2, t, false, now);
        } else {
            log.info("{}的消息发送已禁用", str);
            storage(str, str2, t, now, false, false, "已禁用该应用发送");
        }
    }

    @Autowired
    @Output(MessageQueueConstant.CLOUDT_MESSAGE_CHANNEL_OUTPUT)
    public void setMessageChannel(MessageChannel messageChannel) {
        this.messageChannel = messageChannel;
    }

    public void setQueueStorageList(List<MessageQueueStorage> list) {
        this.queueStorageList = (List) ObjectUtil.defaultIfNull(list, Collections.emptyList());
    }

    public void setDuplicateStrategyList(List<MessageDuplicateStrategy> list) {
        this.duplicateStrategyList = (List) ObjectUtil.defaultIfNull(list, Collections.emptyList());
    }

    private boolean canSend(String str) {
        if (CharSequenceUtil.isBlank(str)) {
            return true;
        }
        Set<String> disabledApps = this.mqProperties.getDisabledApps();
        return CollUtil.isEmpty(disabledApps) || disabledApps.contains("*") || !disabledApps.contains(str);
    }

    private void executeSend(Runnable runnable, String str, String str2, Object obj, boolean z, LocalDateTime localDateTime) {
        if (isDuplicate(str, str2, obj)) {
            storage(str, str2, obj, localDateTime, z, false, "消息已重复");
            return;
        }
        Throwable th = null;
        try {
            try {
                runnable.run();
                storage(str, str2, obj, localDateTime, z, 0 == 0, 0 == 0 ? null : th.getMessage());
            } finally {
            }
        } catch (Throwable th2) {
            storage(str, str2, obj, localDateTime, z, th == null, th == null ? null : th.getMessage());
            throw th2;
        }
    }

    private boolean isDuplicate(String str, String str2, Object obj) {
        if (Boolean.TRUE.equals(this.mqProperties.getAllowDuplicate())) {
            return false;
        }
        if (!this.duplicateStrategyList.isEmpty()) {
            Iterator<MessageDuplicateStrategy> it = this.duplicateStrategyList.iterator();
            while (it.hasNext()) {
                if (it.next().isDuplicate(str, str2, obj)) {
                    log.info("自定义消息判重策略判断已重复：{}，{}", str, str2);
                    return true;
                }
            }
            return false;
        }
        try {
            String digestHex = MD5.create().digestHex(str + "&" + str2 + "&" + this.objectMapper.writeValueAsString(obj), StandardCharsets.UTF_8);
            if (this.redisUtils.get(digestHex) != null) {
                log.info("消息已重复：{}，{}", str, str2);
                return true;
            }
            this.redisUtils.set(digestHex, "true", this.mqProperties.getDuplicateInterval().toSeconds(), TimeUnit.SECONDS);
            return false;
        } catch (Exception e) {
            throw new IllegalStateException("判断消息是否重复异常", e);
        }
    }

    private void storage(String str, String str2, Object obj, LocalDateTime localDateTime, boolean z, boolean z2, String str3) {
        if (Boolean.FALSE.equals(this.mqProperties.getAllowStorage()) || this.queueStorageList.isEmpty()) {
            return;
        }
        LocalDateTime now = LocalDateTime.now();
        StoreMessageDTO storeMessageDTO = new StoreMessageDTO();
        storeMessageDTO.setAppCode(str);
        storeMessageDTO.setChannel(str2);
        try {
            storeMessageDTO.setMessageContent(this.objectMapper.writeValueAsString(obj));
        } catch (JsonProcessingException e) {
            storeMessageDTO.setMessageContent("序列化异常：" + e.getMessage());
        }
        storeMessageDTO.setSendTime(localDateTime);
        storeMessageDTO.setFinishTime(now);
        storeMessageDTO.setLocal(Boolean.valueOf(z));
        storeMessageDTO.setSuccess(Boolean.valueOf(z2));
        storeMessageDTO.setFailReason(str3);
        Iterator<MessageQueueStorage> it = this.queueStorageList.iterator();
        while (it.hasNext()) {
            try {
                it.next().saveMessage(storeMessageDTO);
            } catch (Throwable th) {
                log.error("持久化消息发送记录异常：", th);
            }
        }
    }

    private <T extends Serializable> void sendByMq(String str, T t) {
        log.info("发布消息：{}", str);
        try {
            this.messageChannel.send(MessageBuilder.withPayload(this.objectMapper.writeValueAsString(t)).setHeader(MessageQueueConstant.CLOUDT_MESSAGE_CHANNEL_ORIGINAL, str).build());
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(str + "序列化消息体异常：", e);
        }
    }
}
