package com.elitescloud.boot.mq.config.support;

import cn.hutool.core.collection.CollUtil;
import com.elitescloud.boot.mq.common.MessageQueueConstant;
import com.elitescloud.boot.mq.common.MessageRetryProvider;
import com.elitescloud.boot.mq.common.MessageRetryService;
import com.elitescloud.boot.mq.common.model.FailMessageDTO;
import com.elitescloud.boot.mq.common.model.RetryMessageDTO;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.provider.TenantDataIsolateProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.lang.NonNull;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.util.Assert;

/* loaded from: input_file:com/elitescloud/boot/mq/config/support/MessageRetryServiceImpl.class */
public class MessageRetryServiceImpl extends AbstractMessageQueueDelegate implements SchedulingConfigurer, MessageRetryService {
    private static final Logger log = LoggerFactory.getLogger(MessageRetryServiceImpl.class);
    private final MessageRetryProvider messageRetryProvider;
    private final CloudtMqProperties mqProperties;
    private final RedissonClient redissonClient;
    private final TenantDataIsolateProvider tenantDataIsolateProvider;
    private MessageRetryQueueWrapper queueWrapper;
    private MessageChannel messageChannel;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/elitescloud/boot/mq/config/support/MessageRetryServiceImpl$MessageRetryQueueWrapper.class */
    public static class MessageRetryQueueWrapper {
        private final DelayQueue<RetryMessage> queue = new DelayQueue<>();
        private final Set<String> messageIdAll = new HashSet();
        private final int size;
        private final BiConsumer<RetryMessage, Integer> consumer;

        public MessageRetryQueueWrapper(int i, BiConsumer<RetryMessage, Integer> biConsumer) {
            this.size = i;
            this.consumer = biConsumer;
            consumeMessage();
        }

        public void addMessage(RetryMessage retryMessage) {
            String messageId = retryMessage.getRetryMessageDTO().getMessageId();
            Assert.hasText(messageId, "添加消息重试队列失败，消息ID为空");
            if (this.messageIdAll.contains(messageId)) {
                return;
            }
            if (this.queue.size() >= this.size) {
                MessageRetryServiceImpl.log.info("重试消息队列已满");
            } else {
                this.queue.add((DelayQueue<RetryMessage>) retryMessage);
                this.messageIdAll.add(messageId);
            }
        }

        private void consumeMessage() {
            while (true) {
                try {
                    RetryMessage take = this.queue.take();
                    this.messageIdAll.remove(take.getRetryMessageDTO().getMessageId());
                    try {
                        this.consumer.accept(take, Integer.valueOf(this.queue.size()));
                    } catch (Exception e) {
                        MessageRetryServiceImpl.log.error("消费消息异常：", e);
                    }
                } catch (InterruptedException e2) {
                    MessageRetryServiceImpl.log.error("从消息队列获取延迟消息异常", e2);
                }
            }
        }
    }

    public MessageRetryServiceImpl(ObjectMapper objectMapper, RedissonClient redissonClient, MessageRetryProvider messageRetryProvider, CloudtMqProperties cloudtMqProperties, TenantDataIsolateProvider tenantDataIsolateProvider) {
        super(objectMapper);
        this.redissonClient = redissonClient;
        this.messageRetryProvider = messageRetryProvider;
        this.mqProperties = cloudtMqProperties;
        this.tenantDataIsolateProvider = tenantDataIsolateProvider;
    }

    public void configureTasks(@NonNull ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.addFixedDelayTask(() -> {
            if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
                return;
            }
            try {
                addRetryMessageToQueue();
            } catch (Exception e) {
                log.error("消息重试异常：", e);
            }
        }, TimeUnit.MINUTES.toMillis(30L));
    }

    @Override // com.elitescloud.boot.mq.common.MessageRetryService
    public void addRetryMessage(FailMessageDTO failMessageDTO) {
        RetryMessageDTO convert2RetryMessageDTO = convert2RetryMessageDTO(failMessageDTO);
        if (convert2RetryMessageDTO == null || convert2RetryMessageDTO.getSendTimeNext() == null) {
            this.messageRetryProvider.deleteMessage(failMessageDTO.getMessageId(), convert2RetryMessageDTO == null ? "重试功能已禁用" : "已达到最大重试次数");
            return;
        }
        if (this.queueWrapper == null) {
            initQueue();
        }
        this.queueWrapper.addMessage(new RetryMessage(convert2RetryMessageDTO));
    }

    @Override // com.elitescloud.boot.mq.config.support.AbstractMessageQueueDelegate, com.elitescloud.boot.mq.common.MessageRetryService
    public String generateMessageId() {
        return super.generateMessageId();
    }

    @Override // com.elitescloud.boot.mq.common.MessageRetryService
    public LocalDateTime generateNextRetryTime(LocalDateTime localDateTime, int i) {
        if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
            log.info("消息重试已关闭，无需重试");
            return null;
        }
        Integer retryTimes = this.mqProperties.getRetryTimes();
        if (retryTimes != null && i >= retryTimes.intValue()) {
            return null;
        }
        List<Duration> retryIntervals = this.mqProperties.getRetryIntervals();
        if (!CollUtil.isEmpty(retryIntervals)) {
            return localDateTime.plusSeconds((i > retryIntervals.size() - 1 ? retryIntervals.get(retryIntervals.size() - 1) : retryIntervals.get(i)).toSeconds());
        }
        log.error("消息重试间隔未设置，无法重试");
        return null;
    }

    @Autowired
    @Output(MessageQueueConstant.CLOUDT_MESSAGE_CHANNEL_OUTPUT)
    public void setMessageChannel(MessageChannel messageChannel) {
        this.messageChannel = messageChannel;
    }

    private void initQueue() {
        this.queueWrapper = new MessageRetryQueueWrapper(2000, (retryMessage, num) -> {
            RetryMessageDTO retryMessageDTO = retryMessage.getRetryMessageDTO();
            Throwable th = null;
            RLock lock = this.redissonClient.getLock("mq:" + retryMessageDTO.getMessageId() + ":" + retryMessageDTO.getVersion());
            try {
                try {
                    if (lock.tryLock(1L, TimeUnit.MINUTES)) {
                        if (!this.messageRetryProvider.trySend(retryMessageDTO.getMessageId(), retryMessageDTO.getVersion())) {
                            log.info("消息{}不需要再重试", retryMessageDTO.getMessageId());
                            return;
                        }
                        this.tenantDataIsolateProvider.byTenantDirectly(() -> {
                            super.publishMqMessage(this.messageChannel, retryMessageDTO.getChannel(), retryMessageDTO.getMessageId(), retryMessageDTO.getMessageContent());
                            return null;
                        }, retryMessageDTO.getSysTenantId());
                    }
                    try {
                        this.messageRetryProvider.updateRetryResult(retryMessageDTO.getMessageId(), 0 == 0, 0 == 0 ? null : th.getMessage());
                    } catch (Exception e) {
                        log.error("更新消息重试结果异常：", e);
                    }
                    lock.unlock();
                } catch (Throwable th2) {
                    log.error("发送重试消息异常：{}", retryMessageDTO.getMessageId(), th2);
                    try {
                        this.messageRetryProvider.updateRetryResult(retryMessageDTO.getMessageId(), th2 == null, th2 == null ? null : th2.getMessage());
                    } catch (Exception e2) {
                        log.error("更新消息重试结果异常：", e2);
                    }
                    lock.unlock();
                }
            } finally {
                try {
                    this.messageRetryProvider.updateRetryResult(retryMessageDTO.getMessageId(), 0 == 0, 0 == 0 ? null : th.getMessage());
                } catch (Exception e3) {
                    log.error("更新消息重试结果异常：", e3);
                }
                lock.unlock();
            }
        });
    }

    private RetryMessageDTO convert2RetryMessageDTO(FailMessageDTO failMessageDTO) {
        if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
            log.info("消息重试已关闭，无需重试");
            return null;
        }
        if (failMessageDTO instanceof RetryMessageDTO) {
            return (RetryMessageDTO) failMessageDTO;
        }
        RetryMessageDTO retryMessageDTO = new RetryMessageDTO();
        retryMessageDTO.setRetryTimes(0);
        retryMessageDTO.setSendTimeNext(generateNextRetryTime(failMessageDTO.getSendTime(), retryMessageDTO.getRetryTimes().intValue()));
        retryMessageDTO.setMessageId(failMessageDTO.getMessageId());
        retryMessageDTO.setChannel(failMessageDTO.getChannel());
        retryMessageDTO.setMessageContent(failMessageDTO.getMessageContent());
        retryMessageDTO.setSendTime(failMessageDTO.getSendTime());
        return retryMessageDTO;
    }

    private void addRetryMessageToQueue() {
        String str = null;
        while (true) {
            List<RetryMessageDTO> queryMessage = this.messageRetryProvider.queryMessage(str, 50);
            if (CollUtil.isEmpty(queryMessage)) {
                return;
            }
            for (RetryMessageDTO retryMessageDTO : queryMessage) {
                addRetryMessage(retryMessageDTO);
                str = retryMessageDTO.getMessageId();
            }
        }
    }
}
