package com.hccake.ballcat.common.core.thread;

import com.hccake.ballcat.common.util.JsonUtils;
import java.util.ArrayList;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/hccake/ballcat/common/core/thread/AbstractQueueThread.class */
public abstract class AbstractQueueThread<E> extends Thread implements InitializingBean, ApplicationListener<ContextClosedEvent> {
    private static final Logger log = LoggerFactory.getLogger(AbstractQueueThread.class);
    private static final int DEFAULT_BATCH_SIZE = 500;
    private static final long DEFAULT_BATCH_TIMEOUT_MS = 30000;
    private static final long POLL_TIMEOUT_MS = 5000;

    public int getBatchSize() {
        return DEFAULT_BATCH_SIZE;
    }

    public long getBatchTimeout() {
        return DEFAULT_BATCH_TIMEOUT_MS;
    }

    public static long getPollTimeoutMs() {
        return POLL_TIMEOUT_MS;
    }

    public abstract void put(@NotNull E e);

    public void init() {
    }

    public boolean isRun() {
        return !isInterrupted();
    }

    public void preProcess() {
    }

    @Nullable
    public abstract E poll(long j) throws InterruptedException;

    public void receiveProcess(List<E> list, E e) {
        list.add(e);
    }

    public abstract void process(List<E> list) throws Exception;

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        init();
        while (isRun()) {
            ArrayList arrayList = new ArrayList(getBatchSize());
            try {
                preProcess();
                fillList(arrayList);
                if (isRun()) {
                    process(arrayList);
                } else {
                    shutdownHandler(arrayList);
                }
            } catch (Throwable th) {
                error(th, arrayList);
            }
        }
    }

    protected void fillList(List<E> list) {
        long j = 0;
        int i = 0;
        while (i < getBatchSize()) {
            E e = get();
            if (e != null) {
                int i2 = i;
                i++;
                if (i2 == 0) {
                    j = System.currentTimeMillis();
                }
                receiveProcess(list, e);
            }
            if (!isRun() || (!CollectionUtils.isEmpty(list) && System.currentTimeMillis() - j >= getBatchTimeout())) {
                return;
            }
        }
    }

    private E get() {
        E e = null;
        try {
            e = poll(getPollTimeoutMs());
        } catch (InterruptedException e2) {
            interrupt();
            log.error("{} 类的poll线程被中断!id: {}", getClass().getSimpleName(), Long.valueOf(getId()));
        }
        return e;
    }

    public abstract void error(Throwable th, List<E> list);

    public void afterPropertiesSet() throws Exception {
        setName(getClass().getSimpleName());
        if (isAlive()) {
            return;
        }
        start();
    }

    public void onApplicationEvent(ContextClosedEvent contextClosedEvent) {
        log.warn("{} 类的线程开始关闭! id: {} ", getClass().getSimpleName(), Long.valueOf(getId()));
        shutdown();
    }

    public void shutdown() {
        interrupt();
    }

    public void shutdownHandler(List<E> list) {
        try {
            log.error("{} 类 线程: {} 被关闭. 数据:{}", new Object[]{getClass().getSimpleName(), Long.valueOf(getId()), JsonUtils.toJson(list)});
        } catch (Throwable th) {
            log.error("{} 类 线程: {} 被关闭. 数据:{}", new Object[]{getClass().getSimpleName(), Long.valueOf(getId()), list});
        }
    }
}
