package io.choerodon.asgard.saga.consumer;

import io.choerodon.asgard.common.AbstractAsgardConsumer;
import io.choerodon.asgard.common.ApplicationContextHelper;
import io.choerodon.asgard.common.InstanceResultUtils;
import io.choerodon.asgard.common.UpdateStatusDTO;
import io.choerodon.asgard.saga.SagaDefinition;
import io.choerodon.asgard.saga.SagaProperties;
import io.choerodon.asgard.saga.annotation.SagaTask;
import io.choerodon.asgard.saga.dto.PollSagaTaskInstanceDTO;
import io.choerodon.asgard.saga.dto.SagaTaskInstanceDTO;
import io.choerodon.asgard.saga.feign.SagaConsumerClient;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;

/* loaded from: input_file:io/choerodon/asgard/saga/consumer/SagaConsumer.class */
public class SagaConsumer extends AbstractAsgardConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SagaConsumer.class);
    static final Map<String, SagaTaskInvokeBean> invokeBeanMap = new HashMap();
    private SagaConsumerClient consumerClient;
    private PollSagaTaskInstanceDTO pollDTO;
    private SagaProperties properties;

    public SagaConsumer(String str, String str2, PlatformTransactionManager platformTransactionManager, Executor executor, ScheduledExecutorService scheduledExecutorService, ApplicationContextHelper applicationContextHelper, long j) {
        super(str, str2, platformTransactionManager, executor, scheduledExecutorService, applicationContextHelper, j);
    }

    public void setConsumerClient(SagaConsumerClient sagaConsumerClient) {
        this.consumerClient = sagaConsumerClient;
    }

    public void setProperties(SagaProperties sagaProperties) {
        this.properties = sagaProperties;
    }

    @Override // io.choerodon.asgard.common.AbstractAsgardConsumer
    public void scheduleRunning(String str) {
        this.consumerClient.pollBatch(getPollDTO()).forEach(sagaTaskInstanceDTO -> {
            LOGGER.trace("SagaConsumer polled sagaTaskInstances: {}", sagaTaskInstanceDTO);
            this.runningTasks.add(sagaTaskInstanceDTO.getId());
            CompletableFuture.supplyAsync(() -> {
                return invoke(sagaTaskInstanceDTO);
            }, this.executor).exceptionally(th -> {
                LOGGER.warn("@SagaTask method code: {}, id: {} supplyAsync failed", new Object[]{sagaTaskInstanceDTO.getTaskCode(), sagaTaskInstanceDTO.getId(), th});
                return null;
            }).thenAccept(sagaTaskInstanceDTO -> {
                LOGGER.trace("@SagaTask method code: {}, id: {} supplyAsync completed", sagaTaskInstanceDTO.getTaskCode(), sagaTaskInstanceDTO.getId());
            });
        });
    }

    private PollSagaTaskInstanceDTO getPollDTO() {
        if (this.pollDTO == null) {
            this.pollDTO = new PollSagaTaskInstanceDTO(this.instance, this.service, this.properties.getConsumer().getMaxPollSize(), this.runningTasks);
        }
        return this.pollDTO;
    }

    public String invokeByRun(SagaTaskInstanceDTO sagaTaskInstanceDTO) {
        try {
            try {
                beforeInvoke(sagaTaskInstanceDTO.getUserDetails());
                SagaTaskInvokeBean sagaTaskInvokeBean = invokeBeanMap.get(sagaTaskInstanceDTO.getSagaCode() + sagaTaskInstanceDTO.getTaskCode());
                sagaTaskInvokeBean.method.setAccessible(true);
                String resultToJson = InstanceResultUtils.resultToJson(sagaTaskInvokeBean.method.invoke(sagaTaskInvokeBean.object, sagaTaskInstanceDTO.getInput()), this.objectMapper);
                afterInvoke();
                return resultToJson;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            afterInvoke();
            throw th;
        }
    }

    private SagaTaskInstanceDTO invoke(SagaTaskInstanceDTO sagaTaskInstanceDTO) {
        SagaTaskInvokeBean sagaTaskInvokeBean = invokeBeanMap.get(sagaTaskInstanceDTO.getSagaCode() + sagaTaskInstanceDTO.getTaskCode());
        SagaTask sagaTask = sagaTaskInvokeBean.sagaTask;
        PlatformTransactionManager sagaTaskTransactionManager = getSagaTaskTransactionManager(sagaTask.transactionManager());
        TransactionStatus createTransactionStatus = createTransactionStatus(this.transactionManager, sagaTask.transactionIsolation().value());
        beforeInvoke(sagaTaskInstanceDTO.getUserDetails());
        try {
            try {
                sagaTaskInvokeBean.method.setAccessible(true);
                this.consumerClient.updateStatus(sagaTaskInstanceDTO.getId(), UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.COMPLETED.name()).withOutput(InstanceResultUtils.resultToJson(sagaTaskInvokeBean.method.invoke(sagaTaskInvokeBean.object, sagaTaskInstanceDTO.getInput()), this.objectMapper)).withId(sagaTaskInstanceDTO.getId()).withObjectVersionNumber(sagaTaskInstanceDTO.getObjectVersionNumber()).build());
                this.runningTasks.remove(sagaTaskInstanceDTO.getId());
                sagaTaskTransactionManager.commit(createTransactionStatus);
                afterInvoke();
            } catch (Exception e) {
                LOGGER.info("@SagaTask method code: {}, id: {} invoke error", new Object[]{sagaTaskInstanceDTO.getTaskCode(), sagaTaskInstanceDTO.getId(), InstanceResultUtils.getLoggerException(e)});
                invokeError(sagaTaskTransactionManager, createTransactionStatus, sagaTaskInstanceDTO, InstanceResultUtils.getErrorInfoFromException(e));
                afterInvoke();
            }
            return sagaTaskInstanceDTO;
        } catch (Throwable th) {
            afterInvoke();
            throw th;
        }
    }

    private void invokeError(PlatformTransactionManager platformTransactionManager, TransactionStatus transactionStatus, SagaTaskInstanceDTO sagaTaskInstanceDTO, String str) {
        try {
            try {
                platformTransactionManager.rollback(transactionStatus);
            } catch (Exception e) {
                LOGGER.warn("@SagaTask method code: {}, id: {} transaction rollback error", new Object[]{sagaTaskInstanceDTO.getTaskCode(), sagaTaskInstanceDTO.getId(), e});
                try {
                    this.consumerClient.updateStatus(sagaTaskInstanceDTO.getId(), UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.FAILED.name()).withExceptionMessage(str).withId(sagaTaskInstanceDTO.getId()).withObjectVersionNumber(sagaTaskInstanceDTO.getObjectVersionNumber()).build());
                    this.runningTasks.remove(sagaTaskInstanceDTO.getId());
                } catch (Exception e2) {
                    CompletableFuture.supplyAsync(() -> {
                        return retryUpdateStatusFailed(sagaTaskInstanceDTO.getId(), str);
                    }, this.executor);
                }
            }
        } finally {
            try {
                this.consumerClient.updateStatus(sagaTaskInstanceDTO.getId(), UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.FAILED.name()).withExceptionMessage(str).withId(sagaTaskInstanceDTO.getId()).withObjectVersionNumber(sagaTaskInstanceDTO.getObjectVersionNumber()).build());
                this.runningTasks.remove(sagaTaskInstanceDTO.getId());
            } catch (Exception e3) {
                CompletableFuture.supplyAsync(() -> {
                    return retryUpdateStatusFailed(sagaTaskInstanceDTO.getId(), str);
                }, this.executor);
            }
        }
    }

    private Long retryUpdateStatusFailed(Long l, String str) {
        SagaTaskInstanceDTO queryStatus;
        while (true) {
            try {
                Thread.sleep(1000L);
                queryStatus = this.consumerClient.queryStatus(l);
                break;
            } catch (InterruptedException e) {
                this.runningTasks.remove(l);
                Thread.currentThread().interrupt();
                LOGGER.error("@SagaTask method id: {} retry to updateStatus failed, thread is Interrupted", l, e);
            } catch (Exception e2) {
                LOGGER.debug("@SagaTask method id: {} auto retry to updateStatus failed", l, e2);
            }
        }
        if (queryStatus == null) {
            this.runningTasks.remove(l);
            LOGGER.error("@SagaTask method id: {} queryStatus failed", l);
            return l;
        }
        this.consumerClient.updateStatus(l, UpdateStatusDTO.UpdateStatusDTOBuilder.newInstance().withStatus(SagaDefinition.TaskInstanceStatus.FAILED.name()).withExceptionMessage(str).withId(l).withObjectVersionNumber(queryStatus.getObjectVersionNumber()).build());
        this.runningTasks.remove(l);
        return l;
    }
}
