/*
 * Decompiled with CFR 0.152.
 */
package com.elitescloud.boot.mq.config.support;

import cn.hutool.core.collection.CollUtil;
import com.elitescloud.boot.mq.common.MessageRetryProvider;
import com.elitescloud.boot.mq.common.MessageRetryService;
import com.elitescloud.boot.mq.common.model.FailMessageDTO;
import com.elitescloud.boot.mq.common.model.RetryMessageDTO;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.mq.config.support.AbstractMessageQueueDelegate;
import com.elitescloud.boot.mq.config.support.RetryMessage;
import com.elitescloud.boot.provider.TenantDataIsolateProvider;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.lang.NonNull;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.util.Assert;

public class MessageRetryServiceImpl
extends AbstractMessageQueueDelegate
implements SchedulingConfigurer,
MessageRetryService {
    private static final Logger log = LoggerFactory.getLogger(MessageRetryServiceImpl.class);
    private final MessageRetryProvider messageRetryProvider;
    private final CloudtMqProperties mqProperties;
    private final RedissonClient redissonClient;
    private final TenantDataIsolateProvider tenantDataIsolateProvider;
    private MessageRetryQueueWrapper queueWrapper;
    private MessageChannel messageChannel;

    public MessageRetryServiceImpl(ObjectMapper objectMapper, RedissonClient redissonClient, MessageRetryProvider messageRetryProvider, CloudtMqProperties mqProperties, TenantDataIsolateProvider tenantDataIsolateProvider) {
        super(objectMapper);
        this.redissonClient = redissonClient;
        this.messageRetryProvider = messageRetryProvider;
        this.mqProperties = mqProperties;
        this.tenantDataIsolateProvider = tenantDataIsolateProvider;
    }

    public void configureTasks(@NonNull ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addFixedDelayTask(() -> {
            if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
                return;
            }
            try {
                this.addRetryMessageToQueue();
            }
            catch (Exception e) {
                log.error("\u6d88\u606f\u91cd\u8bd5\u5f02\u5e38\uff1a", (Throwable)e);
            }
        }, TimeUnit.MINUTES.toMillis(30L));
    }

    @Override
    public void addRetryMessage(FailMessageDTO messageDTO) {
        RetryMessageDTO retryMessageDTO = this.convert2RetryMessageDTO(messageDTO);
        if (retryMessageDTO == null || retryMessageDTO.getSendTimeNext() == null) {
            this.messageRetryProvider.deleteMessage(messageDTO.getMessageId(), retryMessageDTO == null ? "\u91cd\u8bd5\u529f\u80fd\u5df2\u7981\u7528" : "\u5df2\u8fbe\u5230\u6700\u5927\u91cd\u8bd5\u6b21\u6570");
            return;
        }
        if (this.queueWrapper == null) {
            this.initQueue();
        }
        RetryMessage message = new RetryMessage(retryMessageDTO);
        this.queueWrapper.addMessage(message);
    }

    @Override
    public String generateMessageId() {
        return super.generateMessageId();
    }

    @Override
    public LocalDateTime generateNextRetryTime(LocalDateTime lastSendTime, int retryTimes) {
        if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
            log.info("\u6d88\u606f\u91cd\u8bd5\u5df2\u5173\u95ed\uff0c\u65e0\u9700\u91cd\u8bd5");
            return null;
        }
        Integer max = this.mqProperties.getRetryTimes();
        if (max != null && retryTimes >= max) {
            return null;
        }
        List<Duration> intervals = this.mqProperties.getRetryIntervals();
        if (CollUtil.isEmpty(intervals)) {
            log.error("\u6d88\u606f\u91cd\u8bd5\u95f4\u9694\u672a\u8bbe\u7f6e\uff0c\u65e0\u6cd5\u91cd\u8bd5");
            return null;
        }
        Duration interval = retryTimes > intervals.size() - 1 ? intervals.get(intervals.size() - 1) : intervals.get(retryTimes);
        return lastSendTime.plusSeconds(interval.toSeconds());
    }

    @Autowired
    @Output(value="cloudt_message_delegate_channel_output")
    public void setMessageChannel(MessageChannel messageChannel) {
        this.messageChannel = messageChannel;
    }

    private void initQueue() {
        this.queueWrapper = new MessageRetryQueueWrapper(2000, (message, size) -> {
            block15: {
                RLock lock;
                Throwable exp;
                RetryMessageDTO msg;
                block13: {
                    block14: {
                        msg = message.getRetryMessageDTO();
                        exp = null;
                        lock = this.redissonClient.getLock("mq:" + msg.getMessageId() + ":" + msg.getVersion());
                        if (!lock.tryLock(1L, TimeUnit.MINUTES)) break block13;
                        if (this.messageRetryProvider.trySend(msg.getMessageId(), msg.getVersion())) break block14;
                        log.info("\u6d88\u606f{}\u4e0d\u9700\u8981\u518d\u91cd\u8bd5", (Object)msg.getMessageId());
                        try {
                            this.messageRetryProvider.updateRetryResult(msg.getMessageId(), exp == null, exp == null ? null : exp.getMessage());
                        }
                        catch (Exception e) {
                            log.error("\u66f4\u65b0\u6d88\u606f\u91cd\u8bd5\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e);
                        }
                        lock.unlock();
                        return;
                    }
                    this.tenantDataIsolateProvider.byTenantDirectly(() -> {
                        super.publishMqMessage(this.messageChannel, msg.getChannel(), msg.getMessageId(), msg.getMessageContent());
                        return null;
                    }, msg.getSysTenantId());
                }
                try {
                    this.messageRetryProvider.updateRetryResult(msg.getMessageId(), exp == null, exp == null ? null : exp.getMessage());
                }
                catch (Exception e) {
                    log.error("\u66f4\u65b0\u6d88\u606f\u91cd\u8bd5\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e);
                }
                lock.unlock();
                break block15;
                catch (Throwable e) {
                    try {
                        log.error("\u53d1\u9001\u91cd\u8bd5\u6d88\u606f\u5f02\u5e38\uff1a{}", (Object)msg.getMessageId(), (Object)e);
                        exp = e;
                    }
                    catch (Throwable throwable) {
                        try {
                            this.messageRetryProvider.updateRetryResult(msg.getMessageId(), exp == null, exp == null ? null : exp.getMessage());
                        }
                        catch (Exception e2) {
                            log.error("\u66f4\u65b0\u6d88\u606f\u91cd\u8bd5\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e2);
                        }
                        lock.unlock();
                        throw throwable;
                    }
                    try {
                        this.messageRetryProvider.updateRetryResult(msg.getMessageId(), exp == null, exp == null ? null : exp.getMessage());
                    }
                    catch (Exception e3) {
                        log.error("\u66f4\u65b0\u6d88\u606f\u91cd\u8bd5\u7ed3\u679c\u5f02\u5e38\uff1a", (Throwable)e3);
                    }
                    lock.unlock();
                }
            }
        });
    }

    private RetryMessageDTO convert2RetryMessageDTO(FailMessageDTO failMessageDTO) {
        if (Boolean.FALSE.equals(this.mqProperties.getSupportRetry())) {
            log.info("\u6d88\u606f\u91cd\u8bd5\u5df2\u5173\u95ed\uff0c\u65e0\u9700\u91cd\u8bd5");
            return null;
        }
        if (failMessageDTO instanceof RetryMessageDTO) {
            return (RetryMessageDTO)failMessageDTO;
        }
        RetryMessageDTO retryMessageDTO = new RetryMessageDTO();
        retryMessageDTO.setRetryTimes(0);
        retryMessageDTO.setSendTimeNext(this.generateNextRetryTime(failMessageDTO.getSendTime(), retryMessageDTO.getRetryTimes()));
        retryMessageDTO.setMessageId(failMessageDTO.getMessageId());
        retryMessageDTO.setChannel(failMessageDTO.getChannel());
        retryMessageDTO.setMessageContent(failMessageDTO.getMessageContent());
        retryMessageDTO.setSendTime(failMessageDTO.getSendTime());
        return retryMessageDTO;
    }

    private void addRetryMessageToQueue() {
        List<RetryMessageDTO> messagList;
        String lastMessageId = null;
        while (!CollUtil.isEmpty(messagList = this.messageRetryProvider.queryMessage(lastMessageId, 50))) {
            for (RetryMessageDTO retryMessageDTO : messagList) {
                this.addRetryMessage(retryMessageDTO);
                lastMessageId = retryMessageDTO.getMessageId();
            }
        }
    }

    static class MessageRetryQueueWrapper {
        private final DelayQueue<RetryMessage> queue = new DelayQueue();
        private final Set<String> messageIdAll = new HashSet<String>();
        private final int size;
        private final BiConsumer<RetryMessage, Integer> consumer;

        public MessageRetryQueueWrapper(int size, BiConsumer<RetryMessage, Integer> consumer) {
            this.size = size;
            this.consumer = consumer;
            this.consumeMessage();
        }

        public void addMessage(RetryMessage message) {
            String messageId = message.getRetryMessageDTO().getMessageId();
            Assert.hasText((String)messageId, (String)"\u6dfb\u52a0\u6d88\u606f\u91cd\u8bd5\u961f\u5217\u5931\u8d25\uff0c\u6d88\u606fID\u4e3a\u7a7a");
            if (this.messageIdAll.contains(messageId)) {
                return;
            }
            if (this.queue.size() >= this.size) {
                log.info("\u91cd\u8bd5\u6d88\u606f\u961f\u5217\u5df2\u6ee1");
                return;
            }
            this.queue.add(message);
            this.messageIdAll.add(messageId);
        }

        private void consumeMessage() {
            Runnable runnable = () -> {
                while (true) {
                    RetryMessage message;
                    try {
                        message = (RetryMessage)this.queue.take();
                    }
                    catch (InterruptedException e) {
                        log.error("\u4ece\u6d88\u606f\u961f\u5217\u83b7\u53d6\u5ef6\u8fdf\u6d88\u606f\u5f02\u5e38", (Throwable)e);
                        continue;
                    }
                    this.messageIdAll.remove(message.getRetryMessageDTO().getMessageId());
                    try {
                        this.consumer.accept(message, this.queue.size());
                        continue;
                    }
                    catch (Exception e) {
                        log.error("\u6d88\u8d39\u6d88\u606f\u5f02\u5e38\uff1a", (Throwable)e);
                        continue;
                    }
                    break;
                }
            };
            Thread threadConsumer = new Thread(runnable);
            threadConsumer.setName("cloudt-mq-retry");
            threadConsumer.setDaemon(true);
            threadConsumer.setUncaughtExceptionHandler((t, e) -> log.error("\u6d88\u606f\u91cd\u8bd5\u670d\u52a1\u5f02\u5e38\uff1a", e));
            threadConsumer.start();
        }
    }
}

