/*
 * Decompiled with CFR 0.152.
 */
package com.elitescloud.boot.mq.config.support.queue.redisstream;

import cn.hutool.core.text.CharSequenceUtil;
import com.elitescloud.boot.context.ExecutorContextHolder;
import com.elitescloud.boot.context.TenantContextHolder;
import com.elitescloud.boot.mq.common.MessageQueueStorage;
import com.elitescloud.boot.mq.config.CloudtMqProperties;
import com.elitescloud.boot.mq.config.support.queue.AbstractMqProvider;
import com.elitescloud.boot.provider.TenantClientProvider;
import com.elitescloud.boot.provider.TenantDataIsolateProvider;
import com.elitescloud.boot.redis.util.RedisUtils;
import com.elitescloud.boot.support.app.CloudtAppHolder;
import com.elitescloud.boot.wrapper.RedisWrapper;
import com.elitescloud.cloudt.system.dto.SysTenantDTO;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.util.StringUtils;

public class RedisStreamMqProvider
extends AbstractMqProvider
implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RedisStreamMqProvider.class);
    private static final String STREAM_KEY = "cloudt_message_delegate_channel";
    private static final String MSG_TENANT_CODE = "_tenant_code";
    private static final String MSG_CHANNEL = "_channel";
    private static final String MSG_MESSAGE_ID = "_message_id";
    private static final String MSG_PAYLOAD = "_payload";
    private final CloudtMqProperties properties;
    private final RedisUtils redisUtils;
    private final RedisWrapper redisWrapper;
    private final TenantClientProvider tenantClientProvider;
    private final TenantDataIsolateProvider tenantDataIsolateProvider;
    private final String applicationName;

    public RedisStreamMqProvider(CloudtMqProperties properties, List<MessageQueueStorage> queueStorageList, RedisUtils redisUtils, RedisWrapper redisWrapper, TenantClientProvider tenantClientProvider, TenantDataIsolateProvider tenantDataIsolateProvider, String applicationName) {
        super(properties, queueStorageList);
        this.properties = properties;
        this.redisUtils = redisUtils;
        this.redisWrapper = redisWrapper;
        this.tenantClientProvider = tenantClientProvider;
        this.tenantDataIsolateProvider = tenantDataIsolateProvider;
        this.applicationName = applicationName;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        super.run(args);
        this.initConsumer();
    }

    @Override
    protected void sendMessage(String channel, String messageId, String payload) {
        SysTenantDTO tenant = this.tenantClientProvider.getSessionTenant();
        HashMap<String, String> message = new HashMap<String, String>(8);
        message.put(MSG_TENANT_CODE, tenant == null ? "" : tenant.getTenantCode());
        message.put(MSG_MESSAGE_ID, messageId);
        message.put(MSG_CHANNEL, channel);
        message.put(MSG_PAYLOAD, payload);
        this.redisWrapper.apply(() -> {
            MapRecord record = StreamRecords.newRecord().in((Object)STREAM_KEY).ofMap(message);
            RecordId recordId = this.redisUtils.getRedisTemplate().opsForStream().add(record);
            logger.info("\u53d1\u9001RedisStream\u6d88\u606f\uff1a{}\uff0c{}", (Object)messageId, (Object)recordId);
            return null;
        }, null);
    }

    private void initConsumer() {
        String groupName = CharSequenceUtil.blankToDefault((CharSequence)this.properties.getConsumerGroup(), (String)this.applicationName);
        String consumerName = this.applicationName + ":" + CloudtAppHolder.getServerIp();
        Consumer consumer = this.redisUtils.createStreamConsumer(STREAM_KEY, groupName, consumerName);
        StreamOffset stream = StreamOffset.create((Object)this.redisUtils.getKeyGenerator().computeKey(STREAM_KEY), (ReadOffset)ReadOffset.lastConsumed());
        logger.info("MQ Redis Stream Consumer\uff1a{}, {}", (Object)stream, (Object)consumer);
        this.redisUtils.getStreamMessageListenerContainer().receive(consumer, stream, record -> {
            if (record instanceof MapRecord) {
                Map msg = (Map)record.getValue();
                String tenantCode = (String)msg.get(MSG_TENANT_CODE);
                String channel = (String)msg.get(MSG_CHANNEL);
                String messageId = (String)msg.get(MSG_MESSAGE_ID);
                String payload = (String)msg.get(MSG_PAYLOAD);
                logger.debug("start Redis Stream Message\uff1a{}, {}, {}, {}", new Object[]{channel, messageId, tenantCode, payload});
                try {
                    SysTenantDTO tenant;
                    ExecutorContextHolder.create((ExecutorContextHolder.Source)ExecutorContextHolder.Source.REDIS_STREAM, null, (boolean)true);
                    SysTenantDTO sysTenantDTO = tenant = StringUtils.hasText((String)tenantCode) ? this.tenantClientProvider.getTenantByCode(tenantCode) : null;
                    if (tenant != null) {
                        TenantContextHolder.setCurrentTenant((SysTenantDTO)tenant);
                    }
                    this.tenantDataIsolateProvider.byTenantDirectly(() -> {
                        this.consumeMessage(channel, messageId, payload);
                        return null;
                    }, tenant);
                }
                catch (Exception e) {
                    logger.error("\u6d88\u8d39MQ\u6d88\u606f\u5f02\u5e38\uff1a{}, {}", new Object[]{messageId, payload, e});
                }
                finally {
                    this.redisUtils.getRedisTemplate().opsForStream().acknowledge(groupName, record);
                    TenantContextHolder.clearCurrentTenant();
                    ExecutorContextHolder.clear();
                    logger.debug("finish Redis Stream Message\uff1a{}", (Object)messageId);
                }
            }
        });
    }
}

