package com.elitescloud.boot.mq.config.support;

import cn.hutool.core.lang.Assert;
import com.elitescloud.boot.mq.MessageQueueListener;
import com.elitescloud.boot.mq.MessageQueueTemplate;
import com.elitescloud.boot.mq.config.support.AbstractMessageQueueDelegate;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

/* loaded from: input_file:com/elitescloud/boot/mq/config/support/DefaultMessageQueueTemplate.class */
public class DefaultMessageQueueTemplate extends AbstractMessageQueueDelegate implements MessageQueueTemplate {
    private static final Logger log = LoggerFactory.getLogger(DefaultMessageQueueTemplate.class);
    private final TaskExecutor taskExecutor;
    private MessageChannel messageChannel;

    public DefaultMessageQueueTemplate(TaskExecutor taskExecutor, ObjectMapper objectMapper, List<MessageQueueListener<?>> list) {
        super(objectMapper, list);
        this.taskExecutor = taskExecutor;
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void sendMessage(String str, T t) {
        Assert.notBlank(str, "消息渠道为空", new Object[0]);
        Assert.notNull(t, "消息体为空", new Object[0]);
        List<AbstractMessageQueueDelegate.MessageQueueListenerWrapper> orDefault = this.listenerMap.getOrDefault(str, Collections.emptyList());
        if (orDefault.isEmpty()) {
            executeSend(str, t);
        } else {
            log.info("本地消息：{}, 消费者：{}", str, Integer.valueOf(orDefault.size()));
            this.taskExecutor.execute(() -> {
                Iterator it = orDefault.iterator();
                while (it.hasNext()) {
                    AbstractMessageQueueDelegate.MessageQueueListenerWrapper messageQueueListenerWrapper = (AbstractMessageQueueDelegate.MessageQueueListenerWrapper) it.next();
                    try {
                        messageQueueListenerWrapper.getMessageQueueListener().onConsume(str, t);
                    } catch (Throwable th) {
                        log.error("{}消费消息异常：", messageQueueListenerWrapper.getClass().getName(), th);
                    }
                }
            });
        }
    }

    @Override // com.elitescloud.boot.mq.MessageQueueTemplate
    public <T extends Serializable> void publishMessage(String str, T t) {
        Assert.notBlank(str, "消息渠道为空", new Object[0]);
        Assert.notNull(t, "消息体为空", new Object[0]);
        executeSend(str, t);
    }

    @Autowired
    @Output(MessageQueueConstant.CLOUDT_MESSAGE_CHANNEL)
    public void setMessageChannel(MessageChannel messageChannel) {
        this.messageChannel = messageChannel;
    }

    private <T extends Serializable> void executeSend(String str, T t) {
        log.info("发布消息：{}", str);
        try {
            this.messageChannel.send(MessageBuilder.withPayload(this.objectMapper.writeValueAsString(t)).setHeader(MessageQueueConstant.CLOUDT_MESSAGE_CHANNEL_ORIGINAL, str).build());
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(str + "序列化消息体异常：", e);
        }
    }
}
