/*
 * Decompiled with CFR 0.152.
 */
package com.elitescloud.cloudt.system.service.impl;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.ObjectUtil;
import com.elitescloud.boot.core.base.BaseServiceImpl;
import com.elitescloud.boot.mq.common.MessageRetryService;
import com.elitescloud.boot.mq.common.model.RetryMessageDTO;
import com.elitescloud.boot.mq.config.CloudtMqAutoConfiguration;
import com.elitescloud.boot.task.retry.RetryTask;
import com.elitescloud.cloudt.common.base.ApiResult;
import com.elitescloud.cloudt.core.annotation.TenantOrgTransaction;
import com.elitescloud.cloudt.core.annotation.TenantTransaction;
import com.elitescloud.cloudt.core.annotation.common.TenantIsolateType;
import com.elitescloud.cloudt.system.convert.MqMessageConvert;
import com.elitescloud.cloudt.system.model.entity.SysMqConsumeDO;
import com.elitescloud.cloudt.system.model.entity.SysMqMessageDO;
import com.elitescloud.cloudt.system.model.entity.SysMqRetryDO;
import com.elitescloud.cloudt.system.provider.dto.SysMqConsumeResultDTO;
import com.elitescloud.cloudt.system.provider.dto.SysMqMessageStorageDTO;
import com.elitescloud.cloudt.system.provider.dto.SysMqSendResultDTO;
import com.elitescloud.cloudt.system.service.SysMqMessageMngService;
import com.elitescloud.cloudt.system.service.repo.MqConsumeRepoProc;
import com.elitescloud.cloudt.system.service.repo.MqMessageRepoProc;
import com.elitescloud.cloudt.system.service.repo.MqMessageRetryRepoProc;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

@Service
@TenantTransaction(isolateType=TenantIsolateType.DEFAULT)
@TenantOrgTransaction(useTenantOrg=false)
@ConditionalOnClass(value={CloudtMqAutoConfiguration.class})
public class SysMqMessageMngServiceImpl
extends BaseServiceImpl
implements SysMqMessageMngService {
    private static final Logger log = LoggerFactory.getLogger(SysMqMessageMngServiceImpl.class);
    @Autowired
    private MqMessageRepoProc repoProc;
    @Autowired
    private MqMessageRetryRepoProc retryRepoProc;
    @Autowired
    private MqConsumeRepoProc consumeRepoProc;
    @Autowired(required=false)
    private MessageRetryService messageRetryService;

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<String> saveMessage(SysMqMessageStorageDTO messageDTO) {
        SysMqMessageDO messageDO = MqMessageConvert.INSTANCE.dto2Do(messageDTO);
        messageDO.setSysTenantId(super.currentTenantId());
        messageDO.setRetryTimes(0);
        messageDO.setRetried(false);
        messageDO.setConsumed(false);
        this.repoProc.save((Serializable)((Object)messageDO));
        return ApiResult.ok((Object)messageDO.getOriginalMessageId());
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<String> updateSendResult(SysMqSendResultDTO sendResult) {
        if (CharSequenceUtil.isBlank((CharSequence)sendResult.getMessageId())) {
            log.error("\u66f4\u65b0\u53d1\u9001\u7ed3\u679c\u5931\u8d25\uff0c\u6d88\u606fID\u4e3a\u7a7a");
            return ApiResult.fail((String)"\u66f4\u65b0\u53d1\u9001\u7ed3\u679c\u5931\u8d25\uff0c\u6d88\u606fID\u4e3a\u7a7a");
        }
        boolean sendSuccess = Boolean.TRUE.equals(sendResult.getSuccess());
        this.repoProc.updateSendResult(sendResult.getMessageId(), sendSuccess, sendResult.getFailReason());
        String originalMessageId = this.repoProc.getOriginalMessageId(sendResult.getMessageId());
        if (sendSuccess) {
            if (StringUtils.hasText((String)originalMessageId)) {
                this.repoProc.updateSendResult(originalMessageId, true, null);
                this.retryRepoProc.deleteRetryByMessageId(originalMessageId, false);
            }
            return ApiResult.ok((Object)sendResult.getMessageId());
        }
        this.createRetryMessage(sendResult.getMessageId(), false);
        return ApiResult.ok((Object)sendResult.getMessageId());
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<String> updateConsumeResult(SysMqConsumeResultDTO consumeResult) {
        String originalMessageId = this.repoProc.getOriginalMessageId(consumeResult.getMessageId());
        this.updateConsumeResult(consumeResult, consumeResult.getMessageId(), originalMessageId);
        if (StringUtils.hasText((String)originalMessageId) && Boolean.TRUE.equals(consumeResult.getSuccess())) {
            this.retryRepoProc.deleteRetryByMessageId(originalMessageId);
        }
        if (!Boolean.TRUE.equals(consumeResult.getSuccess())) {
            this.createRetryMessage(consumeResult.getMessageId(), true);
        }
        return ApiResult.ok((Object)consumeResult.getMessageId());
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<Boolean> deleteRetryMessage(String messageId, String reason) {
        String originalMessageId = this.repoProc.getOriginalMessageId(messageId);
        if (CharSequenceUtil.isBlank((CharSequence)originalMessageId)) {
            log.error("\u672a\u67e5\u8be2\u5230\u539f\u59cb\u6d88\u606f\uff1a{}", (Object)messageId);
            return ApiResult.fail((String)"\u672a\u67e5\u8be2\u5230\u539f\u59cb\u6d88\u606f");
        }
        this.retryRepoProc.deleteRetryByMessageId(originalMessageId);
        this.repoProc.deleteByMessageId(messageId);
        this.repoProc.updateRetryFailResult(originalMessageId, reason);
        return ApiResult.ok((Object)true);
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<Boolean> trySend(String messageId, Integer version) {
        boolean canSend;
        String originalMessageId = this.repoProc.getOriginalMessageId(messageId);
        if (CharSequenceUtil.isBlank((CharSequence)originalMessageId)) {
            log.info("\u672a\u67e5\u8be2\u5230\u539f\u59cb\u6d88\u606f\uff1a{}", (Object)messageId);
            return ApiResult.ok((Object)false);
        }
        Integer retryVersion = this.retryRepoProc.getVersionByMessageId(originalMessageId);
        boolean bl = canSend = retryVersion != null && retryVersion.intValue() == ((Integer)ObjectUtil.defaultIfNull((Object)version, (Object)0)).intValue();
        if (canSend) {
            this.repoProc.updateRetrySendTime(messageId, LocalDateTime.now());
        }
        return ApiResult.ok((Object)canSend);
    }

    @Override
    @Transactional(rollbackFor={Exception.class})
    public ApiResult<Boolean> deleteExpiredMessage(LocalDateTime expiredTime) {
        this.repoProc.deleteExpiredMessage(expiredTime);
        this.retryRepoProc.deleteExpiredMessage(expiredTime);
        this.consumeRepoProc.deleteExpiredMessage(expiredTime);
        return ApiResult.ok((Object)true);
    }

    @Override
    public ApiResult<List<RetryMessageDTO>> queryRetryMessage(String lastMessageId, int size) {
        List<SysMqRetryDO> retryList = this.retryRepoProc.queryRetry(lastMessageId, size);
        if (retryList.isEmpty()) {
            return ApiResult.ok(Collections.emptyList());
        }
        Set<String> messageIds = retryList.stream().map(SysMqRetryDO::getMessageId).collect(Collectors.toSet());
        Map<String, SysMqMessageDO> messageMap = this.repoProc.listByMessageId(messageIds).stream().collect(Collectors.toMap(SysMqMessageDO::getMessageId, t -> t, (t1, t2) -> t1));
        List messageList = this.retryRepoProc.queryRetry(lastMessageId, size).stream().filter(t -> messageMap.containsKey(t.getMessageId())).map(t -> {
            SysMqMessageDO message = (SysMqMessageDO)((Object)((Object)messageMap.get(t.getMessageId())));
            RetryMessageDTO messageDTO = new RetryMessageDTO();
            messageDTO.setRetryTimes(t.getRetryTimes().intValue());
            messageDTO.setRetryTime(t.getSendTimeNext());
            messageDTO.setSysTenantId(message.getSysTenantId());
            messageDTO.setTaskId(message.getLastRetryMessageId());
            messageDTO.setChannel(message.getChannel());
            messageDTO.setMessageContent(message.getMessageContent());
            messageDTO.setSendTime(t.getSendTime());
            messageDTO.setVersion(t.getAuditDataVersion().intValue());
            return messageDTO;
        }).filter(t -> StringUtils.hasText((String)t.getTaskId())).collect(Collectors.toList());
        return ApiResult.ok(messageList);
    }

    private void updateConsumeResult(SysMqConsumeResultDTO consumeResult, String messageId, String originalMessageId) {
        Long msgId = this.repoProc.getIdByMessageId(messageId);
        if (msgId == null) {
            log.error("\u672a\u627e\u5230\u6d88\u606f\u8bb0\u5f55\uff1a{}", (Object)messageId);
            return;
        }
        if (Boolean.TRUE.equals(consumeResult.getSuccess())) {
            this.repoProc.updateConsumed(msgId, consumeResult.getConsumeTime());
            if (CharSequenceUtil.isNotBlank((CharSequence)originalMessageId)) {
                this.repoProc.updateConsumedByMessageId(originalMessageId, consumeResult.getConsumeTime());
            }
        }
        SysMqConsumeDO consumeDO = new SysMqConsumeDO();
        consumeDO.setMessageId(messageId);
        consumeDO.setRecordId(msgId);
        consumeDO.setAppCode(consumeResult.getAppCode());
        consumeDO.setConsumerName(consumeResult.getConsumerName());
        consumeDO.setConsumerIp(consumeResult.getConsumerIp());
        consumeDO.setConsumeTime(consumeResult.getConsumeTime());
        consumeDO.setSuccess(consumeResult.getSuccess());
        consumeDO.setFailReason(consumeResult.getFailReason());
        consumeDO.setOriginalMessageId(CharSequenceUtil.blankToDefault((CharSequence)originalMessageId, (String)messageId));
        consumeDO.setCostTime(consumeResult.getCostTime());
        this.consumeRepoProc.save((Serializable)((Object)consumeDO));
    }

    private void createRetryMessage(String messageId, Boolean consumer) {
        if (this.messageRetryService == null) {
            return;
        }
        String originalMessageId = this.repoProc.getOriginalMessageId(messageId);
        SysMqMessageDO originalMessageDO = this.repoProc.getByMessageId(CharSequenceUtil.blankToDefault((CharSequence)originalMessageId, (String)messageId));
        if (originalMessageDO == null) {
            log.info("\u672a\u67e5\u8be2\u5230\u6d88\u606f\u4fe1\u606f\uff1a{}\uff0c\u65e0\u6cd5\u521b\u5efa\u91cd\u8bd5\u6d88\u606f", (Object)messageId);
            return;
        }
        if (Boolean.TRUE.equals(originalMessageDO.getConsumed())) {
            log.info("\u5df2\u6210\u529f\u6d88\u8d39\u8fc7\u6d88\u606f\uff1a{}\uff0c\u65e0\u9700\u91cd\u8bd5", (Object)originalMessageDO.getMessageId());
            return;
        }
        SysMqMessageDO messageDO = this.createRetryMessage(originalMessageDO);
        SysMqRetryDO retryDO = this.upsertRetryRecord(messageDO, consumer);
        this.createRetryRequest(messageDO, retryDO);
    }

    private SysMqMessageDO createRetryMessage(SysMqMessageDO original) {
        SysMqMessageDO messageDO = new SysMqMessageDO();
        messageDO.setMessageId(this.messageRetryService.generateMessageId());
        messageDO.setSysTenantId(original.getSysTenantId());
        messageDO.setAppCode(original.getAppCode());
        messageDO.setChannel(original.getChannel());
        messageDO.setMessageContent(original.getMessageContent());
        messageDO.setBusinessKey(original.getBusinessKey());
        messageDO.setLocal(original.getLocal());
        messageDO.setRetried(true);
        messageDO.setOriginalMessageId(original.getMessageId());
        messageDO.setConsumed(false);
        this.repoProc.save((Serializable)((Object)messageDO));
        this.repoProc.updateLastRetryMessageId(original.getMessageId(), messageDO.getMessageId());
        return messageDO;
    }

    private SysMqRetryDO upsertRetryRecord(SysMqMessageDO messageDO, Boolean consumer) {
        String messageId = CharSequenceUtil.blankToDefault((CharSequence)messageDO.getOriginalMessageId(), (String)messageDO.getMessageId());
        SysMqRetryDO retryDO = this.retryRepoProc.getByMessageId(messageId, consumer);
        if (retryDO != null) {
            retryDO.setAuditDataVersion(retryDO.getAuditDataVersion() + 1);
            retryDO.setRetryTimes(retryDO.getRetryTimes() + 1);
            retryDO.setSendTime(LocalDateTime.now());
            retryDO.setSendTimeNext(this.messageRetryService.generateNextRetryTime(retryDO.getSendTime(), retryDO.getRetryTimes().intValue()));
            this.retryRepoProc.save((Serializable)((Object)retryDO));
            return retryDO;
        }
        retryDO = new SysMqRetryDO();
        retryDO.setMessageId(messageId);
        retryDO.setConsumer(consumer);
        retryDO.setRetryTimes(0);
        retryDO.setSendTime((LocalDateTime)ObjectUtil.defaultIfNull((Object)messageDO.getSendTime(), (Object)LocalDateTime.now()));
        retryDO.setSendTimeNext(this.messageRetryService.generateNextRetryTime(retryDO.getSendTime(), retryDO.getRetryTimes().intValue()));
        retryDO.setAuditDataVersion(0);
        this.retryRepoProc.save((Serializable)((Object)retryDO));
        return retryDO;
    }

    private void createRetryRequest(SysMqMessageDO messageDO, SysMqRetryDO retryDO) {
        RetryMessageDTO failMessageDTO = new RetryMessageDTO();
        failMessageDTO.setSysTenantId(messageDO.getSysTenantId());
        failMessageDTO.setTaskId(messageDO.getMessageId());
        failMessageDTO.setChannel(messageDO.getChannel());
        failMessageDTO.setMessageContent(messageDO.getMessageContent());
        failMessageDTO.setSendTime(retryDO.getSendTime());
        failMessageDTO.setVersion(retryDO.getAuditDataVersion().intValue());
        failMessageDTO.setRetryTimes(retryDO.getRetryTimes().intValue());
        failMessageDTO.setRetryTime(retryDO.getSendTimeNext());
        this.messageRetryService.addRetryTask((RetryTask)failMessageDTO);
    }
}

