package com.elitesland.boot.elasticsearch.canal.factory;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.elitesland.boot.elasticsearch.CanalClient;
import com.elitesland.boot.elasticsearch.canal.config.CanalProperties;
import com.elitesland.boot.elasticsearch.canal.config.support.CanalHandlerCustomizer;
import com.elitesland.boot.elasticsearch.canal.model.RowData;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.StopWatch;

/* loaded from: input_file:com/elitesland/boot/elasticsearch/canal/factory/CanalClientFactory.class */
public class CanalClientFactory {
    private static final Logger logger = LoggerFactory.getLogger(CanalClientFactory.class);
    private final BeanFactory beanFactory;
    private final CanalProperties canalProperties;
    private Map<String, Set<CanalClient>> canalClients;
    private Map<CanalClient, Class<? extends Serializable>> clientType;
    private ObjectMapper objectMapper;
    private CanalHandlerCustomizer canalHandlerCustomizer;

    public CanalClientFactory(BeanFactory beanFactory, CanalProperties canalProperties) {
        this.beanFactory = beanFactory;
        this.canalProperties = canalProperties;
    }

    public void build() {
        StopWatch stopWatch = new StopWatch();
        logger.info("start build Elasticsearch Canal Client factory...");
        stopWatch.start();
        Set<CanalClient> clients = getClients();
        if (clients.isEmpty()) {
            logger.info("not found available Canal Client");
            stopWatch.stop();
            return;
        }
        buildObjectMapper();
        registerCanalClient(clients);
        CompletableFuture.runAsync(this::registerListener);
        stopWatch.stop();
        logger.info("finish build Elasticsearch Canal Client factory in {}ms", Long.valueOf(stopWatch.getTotalTimeMillis()));
    }

    private void buildObjectMapper() {
        Jackson2ObjectMapperBuilder json = Jackson2ObjectMapperBuilder.json();
        this.canalHandlerCustomizer = (CanalHandlerCustomizer) this.beanFactory.getBean(CanalHandlerCustomizer.class);
        this.canalHandlerCustomizer.objectMapperBuilder(json);
        this.objectMapper = json.build();
    }

    private void registerListener() {
        loadData(getConnector());
    }

    private void loadData(CanalConnector canalConnector) {
        while (true) {
            try {
                Message withoutAck = canalConnector.getWithoutAck(this.canalProperties.getBatchSize().intValue());
                long id = withoutAck.getId();
                if (id != -1 && withoutAck.getEntries().size() > 0) {
                    try {
                        dispatch(withoutAck.getEntries());
                        canalConnector.ack(id);
                    } catch (Exception e) {
                        logger.error("数据处理失败", e);
                        canalConnector.rollback(id);
                    }
                }
            } catch (Exception e2) {
                logger.error("Elasticsearch Canal 加载数据失败", e2);
            }
            try {
                TimeUnit.SECONDS.sleep(this.canalProperties.getPeriodRefresh().getSeconds());
            } catch (InterruptedException e3) {
                logger.error("sleep异常", e3);
            }
        }
    }

    private void dispatch(List<CanalEntry.Entry> list) {
        for (CanalEntry.Entry entry : list) {
            if (entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONBEGIN && entry.getEntryType() != CanalEntry.EntryType.TRANSACTIONEND) {
                Set<CanalClient> set = this.canalClients.get(entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName());
                if (CollUtil.isEmpty(set)) {
                    continue;
                } else {
                    try {
                        CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        CanalEntry.EventType eventType = parseFrom.getEventType();
                        Iterator it = parseFrom.getRowDatasList().iterator();
                        while (it.hasNext()) {
                            process(set, eventType, (CanalEntry.RowData) it.next());
                        }
                    } catch (Exception e) {
                        throw new RuntimeException("Elasticsearch Canal 解析数据异常, data:" + entry, e);
                    }
                }
            }
        }
    }

    private void process(Set<CanalClient> set, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        if (eventType == CanalEntry.EventType.DELETE) {
            RowData convertRow = convertRow(rowData.getBeforeColumnsList());
            for (CanalClient canalClient : set) {
                convertRow.setData((Serializable) this.objectMapper.convertValue(convertRow.getValueMap(), this.clientType.get(canalClient)));
                canalClient.onDelete(convertRow);
            }
            return;
        }
        if (eventType == CanalEntry.EventType.INSERT) {
            RowData convertRow2 = convertRow(rowData.getAfterColumnsList());
            for (CanalClient canalClient2 : set) {
                convertRow2.setData((Serializable) this.objectMapper.convertValue(convertRow2.getValueMap(), this.clientType.get(canalClient2)));
                canalClient2.onInsert(convertRow2);
            }
            return;
        }
        RowData convertRow3 = convertRow(rowData.getBeforeColumnsList());
        RowData convertRow4 = convertRow(rowData.getAfterColumnsList());
        for (CanalClient canalClient3 : set) {
            convertRow3.setData((Serializable) this.objectMapper.convertValue(convertRow3.getValueMap(), this.clientType.get(canalClient3)));
            convertRow4.setData((Serializable) this.objectMapper.convertValue(convertRow4.getValueMap(), this.clientType.get(canalClient3)));
            canalClient3.onUpdate(convertRow3, convertRow4);
        }
    }

    private RowData convertRow(List<CanalEntry.Column> list) {
        RowData rowData = new RowData();
        int size = list.size();
        ArrayList arrayList = new ArrayList(size);
        HashMap hashMap = new HashMap(size);
        rowData.setColumnList(arrayList);
        rowData.setValueMap(hashMap);
        if (size == 0) {
            return rowData;
        }
        for (CanalEntry.Column column : list) {
            RowData.Column column2 = new RowData.Column();
            column2.setName(column.getName());
            column2.setValue(column.getValue());
            arrayList.add(column2);
            hashMap.put(this.canalHandlerCustomizer.fieldNameConvert(column.getName()), column.getValue());
        }
        return rowData;
    }

    private CanalConnector getConnector() {
        CanalConnector canalConnector = null;
        while (true) {
            try {
                canalConnector = createConnector();
                if (!canalConnector.checkValid()) {
                    logger.warn("canal server 不可用");
                    canalConnector = null;
                }
            } catch (Exception e) {
                logger.error("创建Canal Connector失败", e);
            }
            if (canalConnector != null) {
                return canalConnector;
            }
            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e2) {
                logger.error("sleep 中断", e2);
            }
        }
    }

    private CanalConnector createConnector() {
        String str = (String) Assert.notBlank(this.canalProperties.getServer().getHost(), "Canal Server的配置host为空", new Object[0]);
        CanalConnector newSingleConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(str, this.canalProperties.getServer().getPort()), (String) Assert.notBlank(this.canalProperties.getServer().getDestination(), "Canal Server的配置destination为空", new Object[0]), StrUtil.nullToDefault(this.canalProperties.getServer().getUsername(), ""), StrUtil.nullToDefault(this.canalProperties.getServer().getPassword(), ""));
        newSingleConnector.connect();
        newSingleConnector.subscribe((String) this.canalProperties.getDatabaseSet().stream().map(str2 -> {
            return str2 + "\\..*";
        }).collect(Collectors.joining(",")));
        newSingleConnector.rollback();
        return newSingleConnector;
    }

    private void registerCanalClient(Set<CanalClient> set) {
        int size = set.size();
        this.canalClients = new HashMap(size);
        this.clientType = new HashMap(size);
        for (CanalClient canalClient : set) {
            String blankToDefault = StrUtil.blankToDefault(canalClient.database(), getDefaultDatabase());
            String str = (StrUtil.isBlank(blankToDefault) ? "" : blankToDefault + ".") + ((String) Assert.notBlank(canalClient.table(), "{}中的table为空", new Object[]{canalClient.getClass().getName()}));
            this.canalClients.computeIfAbsent(str, str2 -> {
                return new HashSet(4);
            }).add(canalClient);
            this.clientType.put(canalClient, (Class) ((ParameterizedType) canalClient.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0]);
            logger.info("register Elasticsearch Canal Client 【{}】 listen to 【{}】", canalClient.getClass().getName(), str);
        }
    }

    private Set<CanalClient> getClients() {
        return (Set) this.beanFactory.getBeanProvider(CanalClient.class).stream().collect(Collectors.toSet());
    }

    private String getDefaultDatabase() {
        Assert.notEmpty(this.canalProperties.getDatabaseSet(), "databaseSet不可为空", new Object[0]);
        return StrUtil.isNotBlank(this.canalProperties.getDatabase()) ? this.canalProperties.getDatabase() : this.canalProperties.getDatabaseSet().get(0);
    }
}
