package com.xinqiyi.framework.mq.initial;

import com.xinqiyi.framework.mq.config.MqConfig;
import com.xinqiyi.framework.mq.config.consumer.ChildMqConsumerConfig;
import com.xinqiyi.framework.mq.config.consumer.MqConsumerConfig;
import com.xinqiyi.framework.mq.config.consumer.MqConsumerListenerConfig;
import com.xinqiyi.framework.mq.intercept.DefaultMqConsumeIntercept;
import com.xinqiyi.framework.mq.intercept.DefaultOrderMqConsumeIntercept;
import com.xinqiyi.framework.mq.listener.DefaultMqConsumerListener;
import com.xinqiyi.framework.mq.listener.DefaultOrderMqConsumerListener;
import com.xinqiyi.framework.util.ApplicationContextHelper;
import com.xinqiyi.framework.util.PropertiesHelper;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.cglib.proxy.Callback;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.lang.NonNull;

/* loaded from: input_file:com/xinqiyi/framework/mq/initial/BaseMqConsumerInitial.class */
public abstract class BaseMqConsumerInitial<TConsumer, TListener> implements ApplicationContextAware {
    private ApplicationContext applicationContext;
    protected PropertiesHelper propConf;
    private List<MqConsumerConfig> consumerConfigList;
    private static final int INITIAL_SUBSCRIPTION_MAP_SIZE = 16;
    private static final int DEFAULT_MQ_CONSUME_THREAD_NUM = 50;
    private static final int DEFAULT_MQ_CONSUME_TIMEOUT = 5;
    private static final int DEFAULT_MQ_MAX_CACHED_MESSAGE_AMOUNT = 1000;
    private static final int DEFAULT_MQ_MAX_CACHED_MESSAGE_SIZE_IN_MIB = 512;
    private static final String DEFAULT_MQ_CONSUME_SERVER_ADDRESS = "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80";
    private static final int DEFAULT_ORDER_MQ_CONSUME_SUSPEND_TIME_MILLS = 1000;
    private static final int DEFAULT_ORDER_MQ_MAX_RE_CONSUME_TIMES = 10;
    private static final int DEFAULT_BATCH_CONSUME_MESSAGE_COUNT = 10;
    private static final Logger log = LoggerFactory.getLogger(BaseMqConsumerInitial.class);
    private static final String DEFAULT_MQ_CONSUME_LISTENER_CLASS_NAME = DefaultMqConsumerListener.class.getName();

    protected abstract TConsumer createConsumerBean(MqConsumerProperties mqConsumerProperties, Map<Subscription, TListener> map) throws MQClientException;

    protected abstract boolean isOrderMqConsume();

    protected abstract MqConsumerConfig getCurrentMqConsumerConfig();

    protected abstract String getCurrentMqConsumerType();

    public void initialMessageConsumer() throws Exception {
        if (!getCurrentMqConsumerConfig().isEnabled()) {
            log.info("MqConsumer.Enabled=False; MqConsumerType={}", getCurrentMqConsumerType());
            return;
        }
        for (String str : getCurrentMqConsumerConfig().getConfigList().keySet()) {
            ChildMqConsumerConfig childMqConsumerConfig = getCurrentMqConsumerConfig().getConfigList().get(str);
            if (StringUtils.isEmpty(childMqConsumerConfig.getGroupId())) {
                log.warn("MqConsumer.GroupId.Is Null. ConfigName={}", str);
            } else if (childMqConsumerConfig.isEnabled()) {
                Map<Subscription, TListener> initSubscription = initSubscription(childMqConsumerConfig);
                if (!MapUtils.isEmpty(initSubscription)) {
                    MqConsumerProperties mqConsumerProperties = new MqConsumerProperties();
                    String accessKey = childMqConsumerConfig.getAccessKey();
                    String secretKey = childMqConsumerConfig.getSecretKey();
                    String serverAddress = childMqConsumerConfig.getServerAddress();
                    if (StringUtils.isEmpty(accessKey)) {
                        accessKey = ((MqConfig) ApplicationContextHelper.getBean(MqConfig.class)).getAccessKey();
                    }
                    if (StringUtils.isEmpty(secretKey)) {
                        secretKey = ((MqConfig) ApplicationContextHelper.getBean(MqConfig.class)).getSecretKey();
                    }
                    if (StringUtils.isEmpty(serverAddress)) {
                        serverAddress = ((MqConfig) ApplicationContextHelper.getBean(MqConfig.class)).getServerAddress();
                    }
                    mqConsumerProperties.setGroupId(childMqConsumerConfig.getGroupId());
                    mqConsumerProperties.setAccessKey(accessKey);
                    mqConsumerProperties.setSecretKey(secretKey);
                    mqConsumerProperties.setConsumeThreadNums(childMqConsumerConfig.getConsumeThreadNum());
                    mqConsumerProperties.setConsumeTimeout(childMqConsumerConfig.getConsumeTimeout());
                    mqConsumerProperties.setNameServerAddress(serverAddress);
                    mqConsumerProperties.setMqInstanceId(childMqConsumerConfig.getMqInstanceId());
                    mqConsumerProperties.setMaxCachedMessageAmount(childMqConsumerConfig.getMaxCachedMessageAmount());
                    mqConsumerProperties.setMaxCachedMessageSizeInMiB(childMqConsumerConfig.getMaxCachedMessageSizeInMib());
                    if (childMqConsumerConfig.isBroadcastMq()) {
                        mqConsumerProperties.setMessageModel("BROADCASTING");
                    }
                    if (childMqConsumerConfig.isBatchMqConsume()) {
                        mqConsumerProperties.setMaxBatchMessageCount(childMqConsumerConfig.getMaxBatchMessageCount());
                        mqConsumerProperties.setConsumeMessageBatchMaxSize(childMqConsumerConfig.getMaxBatchMessageCount());
                    }
                    if (isOrderMqConsume()) {
                        mqConsumerProperties.setSuspendTimeMillis(childMqConsumerConfig.getSuspendTimeMills());
                        mqConsumerProperties.setMaxReconsumeTimes(childMqConsumerConfig.getMaxReconsumeTimes());
                    }
                    createConsumerBean(mqConsumerProperties, initSubscription);
                    if (log.isDebugEnabled()) {
                        log.debug("MQ Consumer Start! ConsumeConfig={}", childMqConsumerConfig);
                    }
                }
            } else {
                log.warn("MQConsumer.Enabled=False; ConsumerConfigName={}", childMqConsumerConfig);
            }
        }
    }

    private Map<Subscription, TListener> initSubscription(ChildMqConsumerConfig childMqConsumerConfig) throws IllegalAccessException, InstantiationException {
        HashMap hashMap = new HashMap(INITIAL_SUBSCRIPTION_MAP_SIZE);
        for (MqConsumerListenerConfig mqConsumerListenerConfig : childMqConsumerConfig.getListenerConfigList()) {
            try {
                String topic = mqConsumerListenerConfig.getTopic();
                String tagExpression = mqConsumerListenerConfig.getTagExpression();
                String listenerClassName = mqConsumerListenerConfig.getListenerClassName();
                if (StringUtils.isEmpty(listenerClassName) && !isOrderMqConsume()) {
                    listenerClassName = DefaultMqConsumerListener.class.getName();
                } else if (StringUtils.isEmpty(listenerClassName) && isOrderMqConsume()) {
                    listenerClassName = DefaultOrderMqConsumerListener.class.getName();
                }
                Class<?> cls = Class.forName(listenerClassName);
                String interceptClassName = childMqConsumerConfig.getInterceptClassName();
                if (StringUtils.isEmpty(interceptClassName) && childMqConsumerConfig.isMqFilterEnabled()) {
                    interceptClassName = isOrderMqConsume() ? DefaultOrderMqConsumeIntercept.class.getName() : DefaultMqConsumeIntercept.class.getName();
                }
                if (StringUtils.isNotEmpty(interceptClassName)) {
                    try {
                        Class<?> cls2 = Class.forName(interceptClassName);
                        Enhancer enhancer = new Enhancer();
                        enhancer.setSuperclass(cls);
                        Callback callback = (Callback) cls2.newInstance();
                        this.applicationContext.getAutowireCapableBeanFactory().autowireBean(callback);
                        enhancer.setCallback(callback);
                        Object create = enhancer.create();
                        this.applicationContext.getAutowireCapableBeanFactory().autowireBean(create);
                        Subscription subscription = new Subscription();
                        subscription.setTopic(topic);
                        subscription.setExpression(tagExpression);
                        hashMap.put(subscription, create);
                    } catch (Exception e) {
                        log.error("BaseMqConsumerInitial.InitialSubscription Error", e);
                    }
                } else {
                    Object newInstance = cls.newInstance();
                    this.applicationContext.getAutowireCapableBeanFactory().autowireBean(newInstance);
                    Subscription subscription2 = new Subscription();
                    subscription2.setTopic(topic);
                    subscription2.setExpression(tagExpression);
                    hashMap.put(subscription2, newInstance);
                }
            } catch (Exception e2) {
                log.error("BaseMqConsumerInitial.InitialSubscription Error", e2);
            }
        }
        return hashMap;
    }

    public void setApplicationContext(@NonNull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
