/*
 * Decompiled with CFR 0.152.
 */
package com.elitesland.boot.elasticsearch.canal.factory;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.elitesland.boot.elasticsearch.CanalClient;
import com.elitesland.boot.elasticsearch.canal.config.CanalProperties;
import com.elitesland.boot.elasticsearch.canal.config.support.CanalHandlerCustomizer;
import com.elitesland.boot.elasticsearch.canal.model.RowData;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.StopWatch;

public class CanalClientFactory {
    private static final Logger logger = LoggerFactory.getLogger(CanalClientFactory.class);
    private final BeanFactory beanFactory;
    private final CanalProperties canalProperties;
    private Map<String, Set<CanalClient>> canalClients;
    private Map<CanalClient, Class<? extends Serializable>> clientType;
    private ObjectMapper objectMapper;
    private CanalHandlerCustomizer canalHandlerCustomizer;

    public CanalClientFactory(BeanFactory beanFactory, CanalProperties canalProperties) {
        this.beanFactory = beanFactory;
        this.canalProperties = canalProperties;
    }

    public void build() {
        StopWatch watch = new StopWatch();
        logger.info("start build Elasticsearch Canal Client factory...");
        watch.start();
        Set<CanalClient> clients = this.getClients();
        if (clients.isEmpty()) {
            logger.info("not found available Canal Client");
            watch.stop();
            return;
        }
        this.buildObjectMapper();
        this.registerCanalClient(clients);
        CompletableFuture.runAsync(this::registerListener);
        watch.stop();
        logger.info("finish build Elasticsearch Canal Client factory in {}ms", (Object)watch.getTotalTimeMillis());
    }

    private void buildObjectMapper() {
        Jackson2ObjectMapperBuilder builder = Jackson2ObjectMapperBuilder.json();
        this.canalHandlerCustomizer = (CanalHandlerCustomizer)this.beanFactory.getBean(CanalHandlerCustomizer.class);
        this.canalHandlerCustomizer.objectMapperBuilder(builder);
        this.objectMapper = builder.build();
    }

    private void registerListener() {
        CanalConnector connector = this.getConnector();
        this.loadData(connector);
    }

    private void loadData(CanalConnector connector) {
        while (true) {
            try {
                Message message = connector.getWithoutAck(this.canalProperties.getBatchSize().intValue());
                long batchId = message.getId();
                if (batchId != -1L && message.getEntries().size() > 0) {
                    try {
                        this.dispatch(message.getEntries());
                        connector.ack(batchId);
                    }
                    catch (Exception exception) {
                        logger.error("\u6570\u636e\u5904\u7406\u5931\u8d25", (Throwable)exception);
                        connector.rollback(batchId);
                    }
                }
            }
            catch (Exception e) {
                logger.error("Elasticsearch Canal \u52a0\u8f7d\u6570\u636e\u5931\u8d25", (Throwable)e);
            }
            try {
                TimeUnit.SECONDS.sleep(this.canalProperties.getPeriodRefresh().getSeconds());
                continue;
            }
            catch (InterruptedException interruptedException) {
                logger.error("sleep\u5f02\u5e38", (Throwable)interruptedException);
                continue;
            }
            break;
        }
    }

    private void dispatch(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            String key;
            Set<CanalClient> clients;
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND || CollUtil.isEmpty(clients = this.canalClients.get(key = entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName()))) continue;
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
            }
            catch (Exception e) {
                throw new RuntimeException("Elasticsearch Canal \u89e3\u6790\u6570\u636e\u5f02\u5e38, data:" + entry, e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                this.process(clients, eventType, rowData);
            }
        }
    }

    private void process(Set<CanalClient> clients, CanalEntry.EventType eventType, CanalEntry.RowData canalRowData) {
        if (eventType == CanalEntry.EventType.DELETE) {
            RowData rowData = this.convertRow(canalRowData.getBeforeColumnsList());
            for (CanalClient client : clients) {
                rowData.setData((Serializable)this.objectMapper.convertValue(rowData.getValueMap(), this.clientType.get(client)));
                client.onDelete(rowData);
            }
            return;
        }
        if (eventType == CanalEntry.EventType.INSERT) {
            RowData rowData = this.convertRow(canalRowData.getAfterColumnsList());
            for (CanalClient client : clients) {
                rowData.setData((Serializable)this.objectMapper.convertValue(rowData.getValueMap(), this.clientType.get(client)));
                client.onInsert(rowData);
            }
            return;
        }
        RowData rowDataBefore = this.convertRow(canalRowData.getBeforeColumnsList());
        RowData rowDataAfter = this.convertRow(canalRowData.getAfterColumnsList());
        for (CanalClient client : clients) {
            rowDataBefore.setData((Serializable)this.objectMapper.convertValue(rowDataBefore.getValueMap(), this.clientType.get(client)));
            rowDataAfter.setData((Serializable)this.objectMapper.convertValue(rowDataAfter.getValueMap(), this.clientType.get(client)));
            client.onUpdate(rowDataBefore, rowDataAfter);
        }
    }

    private RowData convertRow(List<CanalEntry.Column> canalColumns) {
        RowData rowData = new RowData();
        int size = canalColumns.size();
        ArrayList<RowData.Column> columnList = new ArrayList<RowData.Column>(size);
        HashMap<String, Serializable> valueMap = new HashMap<String, Serializable>(size);
        rowData.setColumnList(columnList);
        rowData.setValueMap(valueMap);
        if (size == 0) {
            return rowData;
        }
        RowData.Column column = null;
        for (CanalEntry.Column col : canalColumns) {
            column = new RowData.Column();
            column.setName(col.getName());
            column.setValue((Serializable)((Object)col.getValue()));
            columnList.add(column);
            valueMap.put(this.canalHandlerCustomizer.fieldNameConvert(col.getName()), (Serializable)((Object)col.getValue()));
        }
        return rowData;
    }

    private CanalConnector getConnector() {
        CanalConnector connector = null;
        while (true) {
            try {
                connector = this.createConnector();
                if (!connector.checkValid()) {
                    logger.warn("canal server \u4e0d\u53ef\u7528");
                    connector = null;
                }
            }
            catch (Exception e) {
                logger.error("\u521b\u5efaCanal Connector\u5931\u8d25", (Throwable)e);
            }
            if (connector != null) break;
            try {
                TimeUnit.SECONDS.sleep(5L);
            }
            catch (InterruptedException e) {
                logger.error("sleep \u4e2d\u65ad", (Throwable)e);
            }
        }
        return connector;
    }

    private CanalConnector createConnector() {
        String host = (String)Assert.notBlank((CharSequence)this.canalProperties.getServer().getHost(), (String)"Canal Server\u7684\u914d\u7f6ehost\u4e3a\u7a7a", (Object[])new Object[0]);
        String destination = (String)Assert.notBlank((CharSequence)this.canalProperties.getServer().getDestination(), (String)"Canal Server\u7684\u914d\u7f6edestination\u4e3a\u7a7a", (Object[])new Object[0]);
        String username = StrUtil.nullToDefault((CharSequence)this.canalProperties.getServer().getUsername(), (String)"");
        String password = StrUtil.nullToDefault((CharSequence)this.canalProperties.getServer().getPassword(), (String)"");
        CanalConnector connector = CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(host, this.canalProperties.getServer().getPort()), (String)destination, (String)username, (String)password);
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        return connector;
    }

    private void registerCanalClient(Set<CanalClient> clients) {
        int size = clients.size();
        this.canalClients = new HashMap<String, Set<CanalClient>>(size);
        this.clientType = new HashMap<CanalClient, Class<? extends Serializable>>(size);
        for (CanalClient client : clients) {
            Object key = StrUtil.blankToDefault((CharSequence)client.database(), (String)this.canalProperties.getDatabase());
            key = (String)(StrUtil.isBlank((CharSequence)key) ? "" : (String)key + ".") + (String)Assert.notBlank((CharSequence)client.table(), (String)"{}\u4e2d\u7684table\u4e3a\u7a7a", (Object[])new Object[]{client.getClass().getName()});
            this.canalClients.computeIfAbsent((String)key, k -> new HashSet(4)).add(client);
            this.clientType.put(client, (Class)((ParameterizedType)client.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0]);
            logger.info("register Elasticsearch Canal Client \u3010{}\u3011 listen to \u3010{}\u3011", (Object)client.getClass().getName(), key);
        }
    }

    private Set<CanalClient> getClients() {
        return this.beanFactory.getBeanProvider(CanalClient.class).stream().collect(Collectors.toSet());
    }
}

