package com.xinqiyi.framework.mq.intercept;

import com.xinqiyi.framework.mq.MqProcessResult;
import com.xinqiyi.framework.mq.filter.MqConsumeFilterFactory;
import com.xinqiyi.framework.mq.filter.impl.MqFilterRedisKeyBuilder;
import com.xinqiyi.framework.mq.util.MsgConvertUtil;
import com.xinqiyi.framework.redis.lock.RedisReentrantLock;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.proxy.MethodInterceptor;
import org.springframework.cglib.proxy.MethodProxy;

/* loaded from: input_file:com/xinqiyi/framework/mq/intercept/DefaultMqConsumeIntercept.class */
public class DefaultMqConsumeIntercept implements MethodInterceptor {
    private static final Logger log = LoggerFactory.getLogger(DefaultMqConsumeIntercept.class);
    private static final String MQ_CONSUME_METHOD_NAME = "consumeMessage";

    @Autowired
    private MqConsumeFilterFactory consumeFilterFactory;
    private static final int LOCK_MQ_CONSUME_TIMEOUT = 0;

    private String buildConsumeLockKey(MessageExt messageExt) {
        return MqFilterRedisKeyBuilder.buildMqFilterRedisKey(messageExt.getMsgId(), messageExt.getKeys(), messageExt.getTopic());
    }

    private Object startBatchMessageIntercept(Object obj, Object[] objArr, MethodProxy methodProxy) throws Throwable {
        List<MessageExt> list = (List) objArr[LOCK_MQ_CONSUME_TIMEOUT];
        boolean z = LOCK_MQ_CONSUME_TIMEOUT;
        StringBuilder sb = new StringBuilder();
        int size = list.size();
        ArrayList arrayList = new ArrayList();
        if (log.isDebugEnabled()) {
            log.debug("startBatchMessageIntercept.MessageSize={}", Integer.valueOf(size));
        }
        for (int i = size - 1; i >= 0; i--) {
            MessageExt messageExt = (MessageExt) list.get(i);
            Object startBeforeConsumeIntercept = startBeforeConsumeIntercept(messageExt);
            if (startBeforeConsumeIntercept == ConsumeConcurrentlyStatus.CONSUME_SUCCESS) {
                arrayList.add(messageExt.getMsgId());
            } else if (startBeforeConsumeIntercept == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
                z = true;
            }
            sb.append("MsgId=");
            sb.append(messageExt.getMsgId());
            sb.append(";");
            sb.append("MsgKey=");
            sb.append(messageExt.getKeys());
            sb.append("$$$");
        }
        if (z) {
            if (log.isDebugEnabled()) {
                log.debug("startBatchMessageIntercept.ReconsumeLater.Message={}", sb.toString());
            }
            return isOrderMqTopic() ? ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT : ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        if (size == arrayList.size()) {
            if (log.isDebugEnabled()) {
                log.debug("startBatchMessageIntercept.CommitMessage.Empty");
            }
            return isOrderMqTopic() ? ConsumeOrderlyStatus.SUCCESS : ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if (size != list.size() && log.isDebugEnabled()) {
            StringBuilder sb2 = new StringBuilder();
            for (MessageExt messageExt2 : list) {
                sb2.append("MsgId=");
                sb2.append(messageExt2.getMsgId());
                sb2.append(";");
                sb2.append("MsgKey=");
                sb2.append(messageExt2.getKeys());
                sb2.append("$$$");
            }
            log.debug("startBatchMessageIntercept.OperateMessage.Changed.Before={}", sb);
            log.debug("startBatchMessageIntercept.OperateMessage.Changed.After={}", sb2);
        }
        Object invokeSuper = methodProxy.invokeSuper(obj, objArr);
        for (MessageExt messageExt3 : list) {
            if (!arrayList.contains(messageExt3.getMsgId())) {
                startAfterConsumeIntercept(messageExt3, invokeSuper);
            }
        }
        return invokeSuper;
    }

    private Object startBeforeConsumeIntercept(MessageExt messageExt) {
        String topic = messageExt.getTopic();
        String keys = messageExt.getKeys();
        String tags = messageExt.getTags();
        String msgId = messageExt.getMsgId();
        String str = MsgConvertUtil.EMPTY_STRING;
        try {
            str = MsgConvertUtil.objectDeserialize(messageExt.getBody()).toString();
        } catch (Exception e) {
        }
        if (log.isDebugEnabled()) {
            log.debug("DefaultMqConsumeIntercept.ReceiveMessage.Topic={};Key={};Tag={};MsgId={}", new Object[]{topic, keys, tags, msgId});
        }
        int checkMqCanConsume = this.consumeFilterFactory.checkMqCanConsume(messageExt, str);
        if (checkMqCanConsume == 1) {
            log.warn("DefaultMqMessageConsumerListener.Consume. Already Consumed.Topic={};Key={};Tag={};MsgId={}", new Object[]{topic, keys, tags, msgId});
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if (checkMqCanConsume == 0) {
            log.warn("DefaultMqMessageConsumerListener.Consume. In Consuming.Topic={};Key={};Tag={};MsgId={}", new Object[]{topic, keys, tags, msgId});
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        this.consumeFilterFactory.beforeMqConsume(messageExt, str);
        return null;
    }

    private void startAfterConsumeIntercept(MessageExt messageExt, Object obj) {
        String str = MsgConvertUtil.EMPTY_STRING;
        try {
            str = MsgConvertUtil.objectDeserialize(messageExt.getBody()).toString();
        } catch (Exception e) {
        }
        if (obj == ConsumeConcurrentlyStatus.CONSUME_SUCCESS || obj == ConsumeOrderlyStatus.SUCCESS) {
            this.consumeFilterFactory.afterMqConsume(messageExt, str, new MqProcessResult(true, MsgConvertUtil.EMPTY_STRING));
        } else {
            this.consumeFilterFactory.failedMqConsume(messageExt, str, new MqProcessResult(false, "Action=" + obj));
        }
    }

    private Object startSingleMessageConsume(Object obj, Object[] objArr, MethodProxy methodProxy) throws Throwable {
        RedisReentrantLock redisReentrantLock;
        MessageExt messageExt = (MessageExt) objArr[LOCK_MQ_CONSUME_TIMEOUT];
        Object obj2 = LOCK_MQ_CONSUME_TIMEOUT;
        try {
            redisReentrantLock = new RedisReentrantLock(buildConsumeLockKey(messageExt));
            try {
            } finally {
                redisReentrantLock.unlock();
            }
        } catch (Exception e) {
            this.consumeFilterFactory.failedMqConsume(messageExt, MsgConvertUtil.EMPTY_STRING, new MqProcessResult(false, e.getMessage()));
            log.error("DefaultMqConsumeIntercept Intercept Error", e);
        }
        if (!redisReentrantLock.tryLock(0L, TimeUnit.MILLISECONDS)) {
            ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            redisReentrantLock.unlock();
            return consumeConcurrentlyStatus;
        }
        Object startBeforeConsumeIntercept = startBeforeConsumeIntercept(messageExt);
        if (startBeforeConsumeIntercept != null) {
            return startBeforeConsumeIntercept;
        }
        obj2 = methodProxy.invokeSuper(obj, objArr);
        startAfterConsumeIntercept(messageExt, obj2);
        redisReentrantLock.unlock();
        return obj2;
    }

    public Object intercept(Object obj, Method method, Object[] objArr, MethodProxy methodProxy) throws Throwable {
        Object obj2 = LOCK_MQ_CONSUME_TIMEOUT;
        if (log.isDebugEnabled()) {
            log.debug("DefaultMqConsumeIntercept Start Intercept Name=" + method.getName());
        }
        try {
            if (!StringUtils.equals(MQ_CONSUME_METHOD_NAME, method.getName())) {
                obj2 = methodProxy.invokeSuper(obj, objArr);
            } else {
                if (ArrayUtils.isEmpty(objArr)) {
                    log.error("DefaultMqConsumeIntercept Consume Method Objects Is Null");
                    return null;
                }
                if (objArr[LOCK_MQ_CONSUME_TIMEOUT] instanceof Message) {
                    obj2 = startSingleMessageConsume(obj, objArr, methodProxy);
                } else if (objArr[LOCK_MQ_CONSUME_TIMEOUT] instanceof List) {
                    obj2 = startBatchMessageIntercept(obj, objArr, methodProxy);
                }
            }
        } catch (Exception e) {
            log.error("DefaultMqConsumeIntercept Error", e);
        }
        return obj2;
    }

    public boolean isOrderMqTopic() {
        return false;
    }
}
