package com.elitescloud.boot.mq.config.support.queue.redisstream;

import cn.hutool.core.text.CharSequenceUtil;
import com.elitescloud.boot.context.ExecutorContextHolder;
import com.elitescloud.boot.context.TenantContextHolder;
import com.elitescloud.boot.mq.common.MessageQueueStorage;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.mq.config.support.queue.AbstractMqProvider;
import com.elitescloud.boot.provider.TenantClientProvider;
import com.elitescloud.boot.provider.TenantDataIsolateProvider;
import com.elitescloud.boot.redis.util.RedisUtils;
import com.elitescloud.boot.support.app.CloudtAppHolder;
import com.elitescloud.boot.wrapper.RedisWrapper;
import com.elitescloud.cloudt.system.dto.SysTenantDTO;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/elitescloud/boot/mq/config/support/queue/redisstream/RedisStreamMqProvider.class */
public class RedisStreamMqProvider extends AbstractMqProvider implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamMqProvider.class);
    private static final String STREAM_KEY = "cloudt_message_delegate_channel";
    private static final String MSG_TENANT_CODE = "_tenant_code";
    private static final String MSG_CHANNEL = "_channel";
    private static final String MSG_MESSAGE_ID = "_message_id";
    private static final String MSG_PAYLOAD = "_payload";
    private final CloudtMqProperties properties;
    private final RedisUtils redisUtils;
    private final RedisWrapper<?, ?> redisWrapper;
    private final TenantClientProvider tenantClientProvider;
    private final TenantDataIsolateProvider tenantDataIsolateProvider;
    private final String applicationName;

    public RedisStreamMqProvider(CloudtMqProperties cloudtMqProperties, List<MessageQueueStorage> list, RedisUtils redisUtils, RedisWrapper<?, ?> redisWrapper, TenantClientProvider tenantClientProvider, TenantDataIsolateProvider tenantDataIsolateProvider, String str) {
        super(cloudtMqProperties, list);
        this.properties = cloudtMqProperties;
        this.redisUtils = redisUtils;
        this.redisWrapper = redisWrapper;
        this.tenantClientProvider = tenantClientProvider;
        this.tenantDataIsolateProvider = tenantDataIsolateProvider;
        this.applicationName = str;
    }

    @Override // com.elitescloud.boot.mq.config.support.queue.AbstractMqProvider
    public void run(ApplicationArguments applicationArguments) throws Exception {
        super.run(applicationArguments);
        initConsumer();
    }

    @Override // com.elitescloud.boot.mq.config.support.queue.AbstractMqProvider
    protected void sendMessage(String str, String str2, String str3) {
        SysTenantDTO sessionTenant = this.tenantClientProvider.getSessionTenant();
        HashMap hashMap = new HashMap(8);
        hashMap.put(MSG_TENANT_CODE, sessionTenant == null ? "" : sessionTenant.getTenantCode());
        hashMap.put(MSG_MESSAGE_ID, str2);
        hashMap.put(MSG_CHANNEL, str);
        hashMap.put(MSG_PAYLOAD, str3);
        this.redisWrapper.apply(() -> {
            logger.info("发送RedisStream消息：{}，{}", str2, this.redisUtils.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().in("cloudt_message_delegate_channel").ofMap(hashMap)));
            return null;
        }, (Object) null);
    }

    private void initConsumer() {
        String blankToDefault = CharSequenceUtil.blankToDefault(this.properties.getConsumerGroup(), this.applicationName);
        Consumer createStreamConsumer = this.redisUtils.createStreamConsumer("cloudt_message_delegate_channel", blankToDefault, this.applicationName + ":" + CloudtAppHolder.getServerIp());
        StreamOffset create = StreamOffset.create(this.redisUtils.getKeyGenerator().computeKey("cloudt_message_delegate_channel"), ReadOffset.lastConsumed());
        logger.info("MQ Redis Stream Consumer：{}, {}", create, createStreamConsumer);
        this.redisUtils.getStreamMessageListenerContainer().receive(createStreamConsumer, create, record -> {
            if (record instanceof MapRecord) {
                Map map = (Map) record.getValue();
                String str = (String) map.get(MSG_TENANT_CODE);
                String str2 = (String) map.get(MSG_CHANNEL);
                String str3 = (String) map.get(MSG_MESSAGE_ID);
                String str4 = (String) map.get(MSG_PAYLOAD);
                logger.debug("start Redis Stream Message：{}, {}, {}, {}", new Object[]{str2, str3, str, str4});
                try {
                    try {
                        ExecutorContextHolder.create(ExecutorContextHolder.Source.REDIS_STREAM, (ExecutorContextHolder.ExecutorContext) null, true);
                        SysTenantDTO tenantByCode = StringUtils.hasText(str) ? this.tenantClientProvider.getTenantByCode(str) : null;
                        if (tenantByCode != null) {
                            TenantContextHolder.setCurrentTenant(tenantByCode);
                        }
                        this.tenantDataIsolateProvider.byTenantDirectly(() -> {
                            consumeMessage(str2, str3, str4);
                            return null;
                        }, tenantByCode);
                        this.redisUtils.getRedisTemplate().opsForStream().acknowledge(blankToDefault, record);
                        TenantContextHolder.clearCurrentTenant();
                        ExecutorContextHolder.clear();
                        logger.debug("finish Redis Stream Message：{}", str3);
                    } catch (Exception e) {
                        logger.error("消费MQ消息异常：{}, {}", new Object[]{str3, str4, e});
                        this.redisUtils.getRedisTemplate().opsForStream().acknowledge(blankToDefault, record);
                        TenantContextHolder.clearCurrentTenant();
                        ExecutorContextHolder.clear();
                        logger.debug("finish Redis Stream Message：{}", str3);
                    }
                } catch (Throwable th) {
                    this.redisUtils.getRedisTemplate().opsForStream().acknowledge(blankToDefault, record);
                    TenantContextHolder.clearCurrentTenant();
                    ExecutorContextHolder.clear();
                    logger.debug("finish Redis Stream Message：{}", str3);
                    throw th;
                }
            }
        });
    }
}
