package com.zyplayer.doc.db.framework.db.transfer;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.util.IdUtil;
import com.zyplayer.doc.core.exception.ConfirmException;
import com.zyplayer.doc.data.config.security.DocUserUtil;
import com.zyplayer.doc.data.repository.manage.entity.DbTransferTask;
import com.zyplayer.doc.data.repository.support.consts.DocSysModuleType;
import com.zyplayer.doc.data.repository.support.consts.DocSysType;
import com.zyplayer.doc.data.service.manage.DbTransferTaskService;
import com.zyplayer.doc.data.utils.ThreadPoolUtil;
import com.zyplayer.doc.db.framework.consts.DbAuthType;
import com.zyplayer.doc.db.framework.db.bean.DatabaseFactoryBean;
import com.zyplayer.doc.db.framework.db.bean.DatabaseRegistrationBean;
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteParam;
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteResult;
import com.zyplayer.doc.db.framework.db.mapper.base.ExecuteType;
import com.zyplayer.doc.db.framework.db.mapper.base.SqlExecutor;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/zyplayer/doc/db/framework/db/transfer/TransferDataServer.class */
public class TransferDataServer {

    @Resource
    SqlExecutor sqlExecutor;

    @Resource
    DbTransferTaskService dbTransferTaskService;

    @Resource
    DatabaseRegistrationBean databaseRegistrationBean;
    private final Integer batchInsertNum = 100;
    private final Integer executeCountLogNum = 5000;
    private static Logger logger = LoggerFactory.getLogger(TransferDataServer.class);
    private static final Map<Long, String> taskExecuteMap = new ConcurrentHashMap();

    public void cancel(Long l) {
        String str = taskExecuteMap.get(l);
        if (StringUtils.isBlank(str)) {
            throw new ConfirmException("该任务不在执行中，取消失败");
        }
        if (!this.sqlExecutor.cancel(str)) {
            throw new ConfirmException("终止该任务失败");
        }
        this.dbTransferTaskService.addExecuteInfo(l, TransferTaskStatus.CANCEL.getCode(), String.format("[%s] %s 手动终止了此任务", DateTime.now(), DocUserUtil.getCurrentUser().getUsername()));
    }

    public void transferData(Long l) {
        DbTransferTask dbTransferTask = (DbTransferTask) this.dbTransferTaskService.getById(l);
        if (dbTransferTask == null || dbTransferTask.getDelFlag().intValue() == 1) {
            throw new ConfirmException("未找到该任务记录，创建任务失败");
        }
        if (Objects.equals(dbTransferTask.getLastExecuteStatus(), 1)) {
            throw new ConfirmException("任务正在执行中，请勿重复执行");
        }
        boolean haveAuth = DocUserUtil.haveAuth(new String[]{"DB_DATASOURCE_MANAGE"});
        boolean haveCustomAuth = DocUserUtil.haveCustomAuth(DbAuthType.SELECT.getName(), Integer.valueOf(DocSysType.DB.getType()), Integer.valueOf(DocSysModuleType.Db.DATASOURCE.getType()), dbTransferTask.getQueryDatasourceId());
        boolean haveCustomAuth2 = DocUserUtil.haveCustomAuth(DbAuthType.UPDATE.getName(), Integer.valueOf(DocSysType.DB.getType()), Integer.valueOf(DocSysModuleType.Db.DATASOURCE.getType()), dbTransferTask.getQueryDatasourceId());
        if (!haveAuth && !haveCustomAuth && !haveCustomAuth2) {
            throw new ConfirmException("没有查询数据源的查询权限，创建任务失败");
        }
        boolean haveCustomAuth3 = DocUserUtil.haveCustomAuth(DbAuthType.UPDATE.getName(), Integer.valueOf(DocSysType.DB.getType()), Integer.valueOf(DocSysModuleType.Db.DATASOURCE.getType()), dbTransferTask.getStorageDatasourceId());
        if (!haveAuth && !haveCustomAuth3) {
            throw new ConfirmException("没有目标数据源的写入权限，创建任务失败");
        }
        this.dbTransferTaskService.resetExecuteInfo(l);
        ThreadPoolUtil.getThreadPool().submit(() -> {
            this.dbTransferTaskService.addExecuteInfo(l, TransferTaskStatus.EXECUTING.getCode(), String.format("[%s] 任务开始执行", DateTime.now()));
            String simpleUUID = IdUtil.simpleUUID();
            taskExecuteMap.put(l, simpleUUID);
            transferData(dbTransferTask, simpleUUID);
        });
    }

    private void transferData(DbTransferTask dbTransferTask, String str) {
        Long queryDatasourceId = dbTransferTask.getQueryDatasourceId();
        Long storageDatasourceId = dbTransferTask.getStorageDatasourceId();
        String querySql = dbTransferTask.getQuerySql();
        String storageSql = dbTransferTask.getStorageSql();
        LinkedList linkedList = new LinkedList();
        DatabaseFactoryBean orCreateFactoryById = this.databaseRegistrationBean.getOrCreateFactoryById(queryDatasourceId);
        ExecuteParam executeParam = new ExecuteParam();
        executeParam.setDatasourceId(queryDatasourceId);
        executeParam.setExecuteType(ExecuteType.SELECT);
        executeParam.setExecuteId(str);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (Objects.equals(dbTransferTask.getNeedCount(), 1)) {
                executeParam.setSql(SqlParseUtil.getSelectCountSql(querySql));
                ExecuteResult execute = this.sqlExecutor.execute(executeParam);
                if (CollectionUtils.isEmpty(execute.getResult())) {
                    this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.ERROR.getCode(), String.format("[%s] 获取总条数失败", DateTime.now()));
                    return;
                }
                this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), String.format("[%s] 待处理总条数：%s，查询总条数耗时：%sms", DateTime.now(), execute.getResult().get(0).values().stream().findAny().orElse(0), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
            } else {
                this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), String.format("[%s] 未开启查询总条数，跳过条数查询", DateTime.now()));
            }
            AtomicLong atomicLong = new AtomicLong(0L);
            executeParam.setSql(querySql);
            ExecuteResult execute2 = this.sqlExecutor.execute(orCreateFactoryById, executeParam, map -> {
                linkedList.add(map);
                if (atomicLong.incrementAndGet() % this.executeCountLogNum.intValue() == 0) {
                    this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.EXECUTING.getCode(), String.format("[%s] 已处理条数：%s", DateTime.now(), Long.valueOf(atomicLong.get())));
                }
                if (linkedList.size() >= this.batchInsertNum.intValue()) {
                    writeData(storageDatasourceId, storageSql, linkedList);
                }
            });
            if (linkedList.size() > 0) {
                writeData(storageDatasourceId, storageSql, linkedList);
            }
            if (StringUtils.isNotBlank(execute2.getErrMsg())) {
                this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.ERROR.getCode(), String.format("[%s] 执行出错：%s", DateTime.now(), execute2.getErrMsg()));
            } else {
                this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.SUCCESS.getCode(), String.format("[%s] 任务执行成功，处理总条数：%s，总耗时：%sms", DateTime.now(), Long.valueOf(atomicLong.get()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)));
            }
        } catch (Exception e) {
            logger.error("SQL执行出错：", e);
            this.dbTransferTaskService.addExecuteInfo(dbTransferTask.getId(), TransferTaskStatus.ERROR.getCode(), String.format("[%s] 处理出错：%s", DateTime.now(), ExceptionUtils.getStackTrace(e)));
        }
    }

    private void writeData(Long l, String str, List<Map<String, Object>> list) {
        for (ExecuteParam executeParam : SqlParseUtil.getExecuteParamList(str, list)) {
            executeParam.setDatasourceId(l);
            executeParam.setExecuteId(IdUtil.simpleUUID());
            this.sqlExecutor.execute(executeParam);
        }
        list.clear();
    }
}
