/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlBinlogFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(MySqlBinlogFetchTask.class);
    private final IncrementalSplit split;
    private volatile boolean taskRunning = false;

    public MySqlBinlogFetchTask(IncrementalSplit split) {
        this.split = split;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MySqlStreamingChangeEventSource mySqlStreamingChangeEventSource;
        final MySqlSourceFetchTaskContext sourceFetchContext = (MySqlSourceFetchTaskContext)context;
        this.taskRunning = true;
        StartupConfig startupConfig = sourceFetchContext.getSourceConfig().getStartupConfig();
        StartupMode startupMode = startupConfig.getStartupMode();
        if (startupMode.equals((Object)StartupMode.TIMESTAMP)) {
            log.info("Starting MySQL binlog reader,with timestamp filter {}", (Object)startupConfig.getTimestamp());
            mySqlStreamingChangeEventSource = new TimestampFilterMySqlStreamingChangeEventSource(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), sourceFetchContext.getErrorHandler(), Clock.SYSTEM, sourceFetchContext.getTaskContext(), sourceFetchContext.getStreamingChangeEventSourceMetrics(), startupConfig.getTimestamp());
        } else {
            mySqlStreamingChangeEventSource = new MySqlStreamingChangeEventSource(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getConnection(), (EventDispatcher<MySqlPartition, TableId>)sourceFetchContext.getDispatcher(), sourceFetchContext.getErrorHandler(), Clock.SYSTEM, sourceFetchContext.getTaskContext(), sourceFetchContext.getStreamingChangeEventSourceMetrics());
        }
        BinlogSplitChangeEventSourceContext changeEventSourceContext = new BinlogSplitChangeEventSourceContext();
        sourceFetchContext.getBinaryLogClient().registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener(){

            @Override
            public void onConnect(BinaryLogClient client) {
                try {
                    sourceFetchContext.getConnection().close();
                    log.info("Binlog client connected, closed idle jdbc connection.");
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        mySqlStreamingChangeEventSource.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public void shutdown() {
        this.taskRunning = false;
    }

    public SourceSplitBase getSplit() {
        return this.split;
    }

    private class BinlogSplitChangeEventSourceContext
    implements ChangeEventSource.ChangeEventSourceContext {
        private BinlogSplitChangeEventSourceContext() {
        }

        public boolean isRunning() {
            return MySqlBinlogFetchTask.this.taskRunning;
        }
    }

    private class TimestampFilterMySqlStreamingChangeEventSource
    extends MySqlStreamingChangeEventSource {
        private final Long targetTimestamp;
        private long logTimestamp;
        private boolean loggedWaitingMessage;
        private final long LOG_INTERVAL_MS = 10000L;

        public TimestampFilterMySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, JdbcSourceEventDispatcher<MySqlPartition> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, Long targetTimestamp) {
            super(connectorConfig, connection, (EventDispatcher<MySqlPartition, TableId>)dispatcher, errorHandler, clock, taskContext, metrics);
            this.LOG_INTERVAL_MS = 10000L;
            this.targetTimestamp = targetTimestamp;
        }

        @Override
        protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
            boolean shouldSkip;
            if (event == null) {
                super.handleEvent(partition, offsetContext, event);
                return;
            }
            long eventTs = event.getHeader().getTimestamp();
            if (eventTs == 0L || this.targetTimestamp == null || this.targetTimestamp == 0L) {
                super.handleEvent(partition, offsetContext, event);
                return;
            }
            boolean bl = shouldSkip = eventTs < this.targetTimestamp;
            if (shouldSkip) {
                if (!this.loggedWaitingMessage) {
                    log.info("skip binlog, currentTime:{}, filterTime:{}", (Object)eventTs, (Object)this.targetTimestamp);
                    this.loggedWaitingMessage = true;
                    this.logTimestamp = eventTs;
                }
                if (eventTs - this.logTimestamp >= 10000L) {
                    this.loggedWaitingMessage = false;
                }
                this.updateOffsetPosition(offsetContext, (EventHeader)event.getHeader());
                return;
            }
            super.handleEvent(partition, offsetContext, event);
        }

        private void updateOffsetPosition(MySqlOffsetContext offsetContext, EventHeader eventHeader) {
            try {
                if (eventHeader instanceof EventHeaderV4) {
                    EventHeaderV4 headerV4 = (EventHeaderV4)eventHeader;
                    offsetContext.setEventPosition(headerV4.getPosition(), headerV4.getEventLength());
                }
                offsetContext.setBinlogServerId(eventHeader.getServerId());
                offsetContext.completeEvent();
            }
            catch (Exception e) {
                log.warn("Failed to update offset for skipped event: {}", (Object)e.getMessage());
            }
        }
    }

    public static class MySqlBinlogSplitReadTask
    extends MySqlStreamingChangeEventSource {
        private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
        private final IncrementalSplit binlogSplit;
        private final MySqlOffsetContext offsetContext;
        private final JdbcSourceEventDispatcher<MySqlPartition> dispatcher;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;

        public MySqlBinlogSplitReadTask(MySqlConnectorConfig connectorConfig, MySqlOffsetContext offsetContext, MySqlConnection connection, JdbcSourceEventDispatcher<MySqlPartition> dispatcher, ErrorHandler errorHandler, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, IncrementalSplit binlogSplit) {
            super(connectorConfig, connection, (EventDispatcher<MySqlPartition, TableId>)dispatcher, errorHandler, Clock.SYSTEM, taskContext, metrics);
            this.binlogSplit = binlogSplit;
            this.dispatcher = dispatcher;
            this.offsetContext = offsetContext;
            this.errorHandler = errorHandler;
        }

        @Override
        public void execute(ChangeEventSource.ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
            this.context = context;
            super.execute(context, partition, this.offsetContext);
        }

        @Override
        protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
            BinlogOffset currentBinlogOffset;
            super.handleEvent(partition, offsetContext, event);
            if (this.isBoundedRead() && (currentBinlogOffset = MySqlBinlogSplitReadTask.getBinlogPosition(offsetContext.getOffset())).isAtOrAfter(this.binlogSplit.getStopOffset())) {
                try {
                    this.dispatcher.dispatchWatermarkEvent(partition.getSourcePartition(), (SourceSplitBase)this.binlogSplit, (Offset)currentBinlogOffset, WatermarkKind.END);
                }
                catch (InterruptedException e) {
                    LOG.error("Send signal event error.", (Throwable)e);
                    this.errorHandler.setProducerThrowable((Throwable)new DebeziumException("Error processing binlog signal event", (Throwable)e));
                }
                ((MySqlSnapshotFetchTask.SnapshotBinlogSplitChangeEventSourceContext)this.context).finished();
            }
        }

        private boolean isBoundedRead() {
            return !BinlogOffset.NO_STOPPING_OFFSET.equals(this.binlogSplit.getStopOffset());
        }

        public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
            HashMap<String, String> offsetStrMap = new HashMap<String, String>();
            for (Map.Entry<String, ?> entry : offset.entrySet()) {
                offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
            }
            return new BinlogOffset(offsetStrMap);
        }
    }
}

