package com.elitescloud.boot.task.retry;

import cn.hutool.core.collection.CollUtil;
import com.elitescloud.boot.provider.TenantDataIsolateProvider;
import com.elitescloud.boot.task.retry.RetryTask;
import com.elitescloud.boot.util.ExceptionsUtil;
import com.elitescloud.boot.util.LockUtil;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.chrono.ChronoLocalDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.util.Assert;

/* loaded from: input_file:com/elitescloud/boot/task/retry/AbstractRetryService.class */
public abstract class AbstractRetryService<T extends RetryTask> implements SchedulingConfigurer, RetryableService<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractRetryService.class);
    private final RetryTaskProvider<T> retryTaskProvider;
    protected final TenantDataIsolateProvider tenantDataIsolateProvider;
    private RetryTaskQueueWrapper<T> retryTaskQueueWrapper = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/elitescloud/boot/task/retry/AbstractRetryService$DelayTask.class */
    public static class DelayTask<T extends RetryTask> implements Delayed {
        private final T retryTask;
        private final LocalDateTime sendTime;

        public DelayTask(T t) {
            this.retryTask = t;
            this.sendTime = t.getRetryTime();
            Assert.notNull(this.sendTime, "重试时间为空");
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(@NonNull TimeUnit timeUnit) {
            return timeUnit.convert(Duration.between(LocalDateTime.now(), this.sendTime));
        }

        @Override // java.lang.Comparable
        public int compareTo(@NonNull Delayed delayed) {
            if (delayed != this && (delayed instanceof DelayTask)) {
                return getSendTime().compareTo((ChronoLocalDateTime<?>) ((DelayTask) delayed).getSendTime());
            }
            return 0;
        }

        public T getRetryTask() {
            return this.retryTask;
        }

        public LocalDateTime getSendTime() {
            return this.sendTime;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/elitescloud/boot/task/retry/AbstractRetryService$RetryTaskQueueWrapper.class */
    public static class RetryTaskQueueWrapper<T extends RetryTask> {
        private final String threadPrefix;
        private final DelayQueue<DelayTask<T>> queue = new DelayQueue<>();
        private final Set<String> taskIdsAll = new HashSet();
        private final int size;
        private final BiConsumer<T, Integer> consumer;

        public RetryTaskQueueWrapper(String str, int i, BiConsumer<T, Integer> biConsumer) {
            this.threadPrefix = str;
            this.size = i;
            this.consumer = biConsumer;
            consumeMessage();
        }

        public void addTask(T t) {
            String taskId = t.getTaskId();
            Assert.hasText(taskId, "添加重试队列失败，任务ID为空");
            if (this.taskIdsAll.contains(taskId)) {
                return;
            }
            if (this.queue.size() >= this.size) {
                AbstractRetryService.log.info("重试队列已满");
            } else {
                this.queue.add((DelayQueue<DelayTask<T>>) new DelayTask<>(t));
                this.taskIdsAll.add(taskId);
            }
        }

        private void consumeMessage() {
            Thread newThread = DaemonThreadFactory.INSTANCE.newThread(() -> {
                while (true) {
                    try {
                        DelayTask<T> take = this.queue.take();
                        this.taskIdsAll.remove(take.getRetryTask().getTaskId());
                        try {
                            this.consumer.accept(take.getRetryTask(), Integer.valueOf(this.queue.size()));
                        } catch (Exception e) {
                            AbstractRetryService.log.error("延时任务处理异常：", e);
                        }
                    } catch (InterruptedException e2) {
                        AbstractRetryService.log.error("从消息队列获取延迟任务异常", e2);
                    }
                }
            });
            newThread.setName(this.threadPrefix + "retry");
            newThread.setDaemon(true);
            newThread.setUncaughtExceptionHandler((thread, th) -> {
                AbstractRetryService.log.error("重试服务异常：", th);
            });
            newThread.start();
        }
    }

    protected AbstractRetryService(RetryTaskProvider<T> retryTaskProvider, TenantDataIsolateProvider tenantDataIsolateProvider) {
        this.retryTaskProvider = retryTaskProvider;
        this.tenantDataIsolateProvider = tenantDataIsolateProvider;
    }

    protected abstract boolean supportRetry();

    protected abstract int retryTimes();

    protected abstract List<Duration> retryIntervals();

    protected abstract void executeTask(T t);

    protected String threadPrefix() {
        return "common-";
    }

    protected Duration scheduleDelay() {
        return Duration.ofMinutes(30L);
    }

    public void configureTasks(@NonNull ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.addFixedDelayTask(() -> {
            if (supportRetry()) {
                try {
                    addRetryTaskToQueue();
                } catch (Exception e) {
                    log.error("定时执行添加任务重试异常：", e);
                }
            }
        }, scheduleDelay().toMillis());
    }

    @Override // com.elitescloud.boot.task.retry.RetryableService
    public void addRetryTask(T t) {
        Assert.hasText(t.getTaskId(), "任务ID为空");
        if (t.getRetryTime() == null) {
            String str = supportRetry() ? "已达最大重试次数" : "重试功能已禁用";
            log.info("删除重试任务：{}，{}，{}", new Object[]{t.getTaskId(), Integer.valueOf(t.getRetryTimes()), str});
            this.retryTaskProvider.deleteTask(t.getTaskId(), str);
        } else {
            if (this.retryTaskQueueWrapper == null) {
                initQueue();
            }
            this.retryTaskQueueWrapper.addTask(t);
        }
    }

    @Override // com.elitescloud.boot.task.retry.RetryableService
    public LocalDateTime generateNextRetryTime(LocalDateTime localDateTime, int i) {
        if (!supportRetry()) {
            log.info("重试已关闭，无需重试");
            return null;
        }
        if (i >= retryTimes()) {
            log.info("已达最大重试次数，不再重试");
            return null;
        }
        List<Duration> retryIntervals = retryIntervals();
        if (!CollUtil.isEmpty(retryIntervals)) {
            return localDateTime.plusSeconds((i > retryIntervals.size() - 1 ? retryIntervals.get(retryIntervals.size() - 1) : retryIntervals.get(i)).toSeconds());
        }
        log.error("消息重试间隔未设置，无法重试");
        return null;
    }

    private void addRetryTaskToQueue() {
        String str = null;
        while (true) {
            List<T> queryTask = this.retryTaskProvider.queryTask(str, 50);
            if (CollUtil.isEmpty(queryTask)) {
                return;
            }
            for (T t : queryTask) {
                addRetryTask(t);
                str = t.getTaskId();
            }
        }
    }

    private void initQueue() {
        this.retryTaskQueueWrapper = new RetryTaskQueueWrapper<>(threadPrefix(), 2000, (retryTask, num) -> {
            Throwable th = null;
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            try {
                try {
                    LockUtil.executeByLock(threadPrefix() + "retry-" + retryTask.getTaskId() + "-" + retryTask.getVersion(), () -> {
                        if (!this.retryTaskProvider.trySend(retryTask.getTaskId(), Integer.valueOf(retryTask.getVersion()))) {
                            log.info("任务不需要再重试：{}, {}", retryTask.getTaskId(), Integer.valueOf(retryTask.getVersion()));
                            return null;
                        }
                        log.info("重试任务：{}", retryTask.getTaskId());
                        if (!supportRetry()) {
                            this.retryTaskProvider.updateRetryResult(retryTask.getTaskId(), false, "自动重试功能未启用");
                            return null;
                        }
                        atomicBoolean.set(true);
                        this.tenantDataIsolateProvider.byDefaultDirectly(() -> {
                            executeTask(retryTask);
                            return null;
                        });
                        log.info("重试任务成功：{}", retryTask.getTaskId());
                        return null;
                    }, Duration.ofMinutes(1L));
                    if (atomicBoolean.get()) {
                        try {
                            this.retryTaskProvider.updateRetryResult(retryTask.getTaskId(), 0 == 0, 0 == 0 ? null : ExceptionsUtil.stackTraceAllToString((Throwable) null));
                        } catch (Exception e) {
                            log.error("更新任务重试结果异常：", e);
                        }
                    }
                } catch (Throwable th2) {
                    log.error("执行重试任务异常：{}", retryTask.getTaskId(), th2);
                    th = th2;
                    if (atomicBoolean.get()) {
                        try {
                            this.retryTaskProvider.updateRetryResult(retryTask.getTaskId(), th == null, th == null ? null : ExceptionsUtil.stackTraceAllToString(th));
                        } catch (Exception e2) {
                            log.error("更新任务重试结果异常：", e2);
                        }
                    }
                }
            } catch (Throwable th3) {
                if (atomicBoolean.get()) {
                    try {
                        this.retryTaskProvider.updateRetryResult(retryTask.getTaskId(), th == null, th == null ? null : ExceptionsUtil.stackTraceAllToString(th));
                    } catch (Exception e3) {
                        log.error("更新任务重试结果异常：", e3);
                    }
                }
                throw th3;
            }
        });
    }
}
