/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection.wal2json;

import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.TransactionMessage;
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
import io.debezium.connector.postgresql.connection.wal2json.Wal2JsonReplicationMessage;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.function.Function;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.seatunnel.shade.org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingWal2JsonMessageDecoder
extends AbstractMessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamingWal2JsonMessageDecoder.class);
    private static final byte TAB = 9;
    private static final byte CR = 13;
    private static final byte SPACE = 32;
    private static final byte COMMA = 44;
    private static final byte RIGHT_BRACKET = 93;
    private static final byte LEFT_BRACE = 123;
    private static final byte RIGHT_BRACE = 125;
    private static final long UNDEFINED_LONG = -1L;
    private final DateTimeFormat dateTime = DateTimeFormat.get();
    private boolean containsMetadata = false;
    private boolean messageInProgress = false;
    private byte[] currentChunk;
    private Long txId;
    private Instant commitTime;

    @Override
    public void processNotEmptyMessage(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
        try {
            if (!buffer.hasArray()) {
                throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
            }
            byte[] source = buffer.array();
            byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length + 2);
            int lastPos = content.length - 1;
            content[lastPos - 1] = 32;
            content[lastPos] = 32;
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Chunk arrived from database {}", (Object)new String(content));
            }
            if (!this.messageInProgress) {
                byte firstChar = this.getFirstNonWhiteChar(content);
                if (firstChar != 123) {
                    this.outOfOrderChunk(content);
                    this.nonInitialChunk(processor, typeRegistry, content);
                } else {
                    Document message;
                    if (this.getLastNonWhiteChar(content) != 125) {
                        content[lastPos - 1] = 93;
                        content[lastPos] = 125;
                    }
                    if ((message = DocumentReader.defaultReader().read(content)).has((CharSequence)"kind")) {
                        this.outOfOrderChunk(content);
                        this.nonInitialChunk(processor, typeRegistry, content);
                    } else {
                        this.txId = message.getLong((CharSequence)"xid");
                        String timestamp = message.getString((CharSequence)"timestamp");
                        this.commitTime = this.dateTime.systemTimestampToInstant(timestamp);
                        this.messageInProgress = true;
                        this.currentChunk = null;
                        processor.process(new TransactionMessage(ReplicationMessage.Operation.BEGIN, this.txId, this.commitTime));
                    }
                }
            } else {
                this.nonInitialChunk(processor, typeRegistry, content);
            }
        }
        catch (IOException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    protected void nonInitialChunk(ReplicationStream.ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content) throws IOException, SQLException, InterruptedException {
        byte firstChar = this.getFirstNonWhiteChar(content);
        if (firstChar == 123) {
            this.currentChunk = content;
        } else if (firstChar == 44) {
            if (this.currentChunk != null) {
                this.doProcessMessage(processor, typeRegistry, this.currentChunk, false);
            }
            this.replaceFirstNonWhiteChar(content, (byte)32);
            this.currentChunk = content;
        } else if (firstChar == 93) {
            this.doProcessMessage(processor, typeRegistry, this.currentChunk, true);
            this.messageInProgress = false;
            processor.process(new TransactionMessage(ReplicationMessage.Operation.COMMIT, this.txId, this.commitTime));
        } else {
            throw new ConnectException("Chunk arrived in unexpected state");
        }
    }

    protected void outOfOrderChunk(byte[] content) {
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("Got out of order chunk {}, recording artifical TX", (Object)new String(content));
        }
        this.txId = -1L;
        this.commitTime = Instant.now();
        this.messageInProgress = true;
        this.currentChunk = null;
    }

    private byte getLastNonWhiteChar(byte[] array) throws IllegalArgumentException {
        for (int i = array.length - 1; i >= 0; --i) {
            if (this.isWhitespace(array[i])) continue;
            return array[i];
        }
        throw new IllegalArgumentException("No non-white char");
    }

    private byte getFirstNonWhiteChar(byte[] array) throws IllegalArgumentException {
        for (int i = 0; i < array.length; ++i) {
            if (this.isWhitespace(array[i])) continue;
            return array[i];
        }
        throw new IllegalArgumentException("No non-white char");
    }

    private void replaceFirstNonWhiteChar(byte[] array, byte to) {
        for (int i = 0; i < array.length; ++i) {
            if (this.isWhitespace(array[i])) continue;
            array[i] = to;
            return;
        }
    }

    private boolean isWhitespace(byte c) {
        return c >= 9 && c <= 13 || c == 32;
    }

    private void doProcessMessage(ReplicationStream.ReplicationMessageProcessor processor, TypeRegistry typeRegistry, byte[] content, boolean lastMessage) throws IOException, SQLException, InterruptedException {
        if (content != null) {
            Document change = DocumentReader.floatNumbersAsTextReader().read(content);
            LOGGER.trace("Change arrived for decoding {}", (Object)change);
            processor.process(new Wal2JsonReplicationMessage(this.txId, this.commitTime, change, this.containsMetadata, lastMessage, typeRegistry));
        } else {
            LOGGER.trace("Empty change arrived");
            processor.process(new ReplicationMessage.NoopMessage(this.txId, this.commitTime));
        }
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
        return this.optionsWithoutMetadata(builder, hasMinimumServerVersion).withSlotOption("include-not-null", "true");
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder, Function<Integer, Boolean> hasMinimumServerVersion) {
        return builder.withSlotOption("pretty-print", 1).withSlotOption("write-in-chunks", 1).withSlotOption("include-xids", 1).withSlotOption("include-timestamp", 1);
    }

    @Override
    public void setContainsMetadata(boolean containsMetadata) {
        this.containsMetadata = containsMetadata;
    }
}

