/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.time.Instant;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbStreamFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(MongodbStreamFetchTask.class);
    private final IncrementalSplit streamSplit;
    private volatile boolean taskRunning = false;
    private MongodbSourceConfig sourceConfig;
    private final Time time = new SystemTime();
    private boolean supportsStartAtOperationTime = true;
    private boolean supportsStartAfter = true;

    public MongodbStreamFetchTask(IncrementalSplit streamSplit) {
        this.streamSplit = streamSplit;
    }

    public void execute(FetchTask.Context context) {
        MongodbFetchTaskContext taskContext = (MongodbFetchTaskContext)context;
        this.sourceConfig = taskContext.getSourceConfig();
        ChangeStreamDescriptor descriptor = taskContext.getChangeStreamDescriptor();
        ChangeEventQueue<DataChangeEvent> queue = taskContext.getQueue();
        MongoClient mongoClient = MongodbUtils.createMongoClient(this.sourceConfig);
        MongoChangeStreamCursor<BsonDocument> changeStreamCursor = this.openChangeStreamCursor(descriptor);
        HeartbeatManager heartbeatManager = this.openHeartbeatManagerIfNeeded(changeStreamCursor);
        long startPoll = this.time.milliseconds();
        long nextUpdate = startPoll + (long)this.sourceConfig.getPollAwaitTimeMillis();
        this.taskRunning = true;
        try {
            while (this.taskRunning) {
                ChangeStreamOffset currentOffset;
                Optional next;
                try {
                    next = Optional.ofNullable(changeStreamCursor.tryNext());
                }
                catch (MongoCommandException e) {
                    if (MongodbUtils.checkIfChangeStreamCursorExpires(e)) {
                        log.warn("Change stream cursor has expired, trying to recreate cursor");
                        boolean resumeTokenExpires = MongodbUtils.checkIfResumeTokenExpires(e);
                        if (resumeTokenExpires) {
                            log.warn("Resume token has expired, fallback to timestamp restart mode");
                        }
                        changeStreamCursor = this.openChangeStreamCursor(descriptor, resumeTokenExpires);
                        next = Optional.ofNullable(changeStreamCursor.tryNext());
                    }
                    throw e;
                }
                SourceRecord changeRecord = null;
                if (!next.isPresent()) {
                    long untilNext = nextUpdate - this.time.milliseconds();
                    if (untilNext > 0L) {
                        log.debug("Waiting {} ms to poll change records", (Object)untilNext);
                        this.time.sleep(untilNext);
                        continue;
                    }
                    if (heartbeatManager != null) {
                        changeRecord = heartbeatManager.heartbeat().map(this::normalizeHeartbeatRecord).orElse(null);
                    }
                    nextUpdate = this.time.milliseconds() + (long)this.sourceConfig.getPollAwaitTimeMillis();
                } else {
                    BsonDocument changeStreamDocument = (BsonDocument)next.get();
                    OperationType operationType = this.getOperationType(changeStreamDocument);
                    switch (operationType) {
                        case INSERT: 
                        case UPDATE: 
                        case REPLACE: 
                        case DELETE: {
                            MongoNamespace namespace = this.getMongoNamespace(changeStreamDocument);
                            BsonDocument resumeToken = changeStreamDocument.getDocument("_id");
                            BsonDocument valueDocument = this.normalizeChangeStreamDocument(changeStreamDocument);
                            log.trace("Adding {} to {}", (Object)valueDocument, (Object)namespace.getFullName());
                            changeRecord = MongodbRecordUtils.buildSourceRecord(MongodbRecordUtils.createPartitionMap(this.sourceConfig.getHosts(), namespace.getDatabaseName(), namespace.getCollectionName()), MongodbRecordUtils.createSourceOffsetMap(resumeToken, false), namespace.getFullName(), changeStreamDocument.getDocument("_id"), valueDocument);
                            break;
                        }
                        default: {
                            log.info("Ignored {} record: {}", (Object)operationType, (Object)changeStreamDocument);
                        }
                    }
                }
                if (changeRecord != null && !this.isBoundedRead()) {
                    queue.enqueue((Object)new DataChangeEvent(changeRecord));
                }
                if (!this.isBoundedRead()) continue;
                if (changeRecord != null) {
                    currentOffset = new ChangeStreamOffset(MongodbRecordUtils.getResumeToken(changeRecord));
                    if (currentOffset.isAtOrBefore(this.streamSplit.getStopOffset())) {
                        queue.enqueue((Object)new DataChangeEvent(changeRecord));
                    }
                } else {
                    currentOffset = new ChangeStreamOffset(MongodbUtils.getCurrentClusterTime(mongoClient));
                }
                if (!currentOffset.isAtOrAfter(this.streamSplit.getStopOffset())) continue;
                SourceRecord watermark = WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(descriptor.toString()), (String)"__mongodb_watermarks", (String)this.streamSplit.splitId(), (WatermarkKind)WatermarkKind.END, (Offset)currentOffset);
                queue.enqueue((Object)new DataChangeEvent(watermark));
                break;
            }
        }
        catch (Exception e) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Poll change stream records failed");
        }
        finally {
            this.taskRunning = false;
            if (changeStreamCursor != null) {
                changeStreamCursor.close();
            }
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public void shutdown() {
        this.taskRunning = false;
    }

    public IncrementalSplit getSplit() {
        return this.streamSplit;
    }

    private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(ChangeStreamDescriptor changeStreamDescriptor) {
        return this.openChangeStreamCursor(changeStreamDescriptor, false);
    }

    private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(ChangeStreamDescriptor changeStreamDescriptor, boolean forceTimestampStartup) {
        ChangeStreamOffset offset = new ChangeStreamOffset(this.streamSplit.getStartupOffset().getOffset());
        ChangeStreamIterable<Document> changeStreamIterable = MongodbUtils.getChangeStreamIterable(this.sourceConfig, changeStreamDescriptor);
        BsonDocument resumeToken = offset.getResumeToken();
        BsonTimestamp timestamp = offset.getTimestamp();
        if (resumeToken != null && !forceTimestampStartup) {
            if (this.supportsStartAfter) {
                log.info("Open the change stream after the previous offset: {}", (Object)resumeToken);
                changeStreamIterable.startAfter(resumeToken);
            } else {
                log.info("Open the change stream after the previous offset using resumeAfter: {}", (Object)resumeToken);
                changeStreamIterable.resumeAfter(resumeToken);
            }
        } else if (this.supportsStartAtOperationTime) {
            log.info("Open the change stream at the timestamp: {}", (Object)timestamp);
            changeStreamIterable.startAtOperationTime(timestamp);
        } else {
            if (forceTimestampStartup) {
                log.error("Open change stream failed. Unable to resume from timestamp");
                throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Open change stream failed. Unable to resume from timestamp");
            }
            log.warn("Open the change stream of the latest offset");
        }
        try {
            return (MongoChangeStreamCursor)changeStreamIterable.withDocumentClass(BsonDocument.class).cursor();
        }
        catch (MongoCommandException e) {
            if (e.getErrorCode() == 9 || e.getErrorCode() == 40415) {
                if (e.getErrorMessage().contains("startAtOperationTime")) {
                    this.supportsStartAtOperationTime = false;
                    return this.openChangeStreamCursor(changeStreamDescriptor);
                }
                if (e.getErrorMessage().contains("startAfter")) {
                    this.supportsStartAfter = false;
                    return this.openChangeStreamCursor(changeStreamDescriptor);
                }
                throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Open change stream failed");
            }
            if (e.getErrorCode() == 20) {
                throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Illegal $changeStream operation: %s %s", e.getErrorMessage(), e.getErrorCode()));
            }
            if (e.getErrorCode() == 13) {
                throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unauthorized $changeStream operation: %s %s", e.getErrorMessage(), e.getErrorCode()));
            }
            if (!forceTimestampStartup && MongodbUtils.checkIfResumeTokenExpires(e)) {
                log.info("Failed to open cursor with resume token, fallback to timestamp startup");
                return this.openChangeStreamCursor(changeStreamDescriptor, true);
            }
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Open change stream failed");
        }
    }

    @Nullable
    private HeartbeatManager openHeartbeatManagerIfNeeded(MongoChangeStreamCursor<BsonDocument> changeStreamCursor) {
        if (this.sourceConfig.getHeartbeatIntervalMillis() > 0) {
            return new HeartbeatManager(this.time, changeStreamCursor, this.sourceConfig.getHeartbeatIntervalMillis(), "__mongodb_heartbeats", MongodbRecordUtils.createHeartbeatPartitionMap(this.sourceConfig.getHosts()));
        }
        return null;
    }

    @Nonnull
    private BsonDocument normalizeChangeStreamDocument(@Nonnull BsonDocument changeStreamDocument) {
        BsonDocument normalizedDocument = this.normalizeKeyDocument(changeStreamDocument);
        changeStreamDocument.put("_id", normalizedDocument);
        changeStreamDocument.put("ts_ms", new BsonInt64(System.currentTimeMillis()));
        BsonDocument source = new BsonDocument();
        source.put("snapshot", new BsonString("false"));
        if (!changeStreamDocument.containsKey("clusterTime")) {
            log.warn("Cannot extract clusterTime from change stream event, fallback to current timestamp.");
            changeStreamDocument.put("clusterTime", MongodbRecordUtils.currentBsonTimestamp());
        }
        BsonTimestamp clusterTime = changeStreamDocument.getTimestamp("clusterTime");
        Instant clusterInstant = Instant.ofEpochSecond(clusterTime.getTime());
        source.put("ts_ms", new BsonInt64(clusterInstant.toEpochMilli()));
        changeStreamDocument.put("source", source);
        return changeStreamDocument;
    }

    @Nonnull
    private BsonDocument normalizeKeyDocument(@Nonnull BsonDocument changeStreamDocument) {
        BsonDocument documentKey = changeStreamDocument.getDocument("documentKey");
        BsonDocument primaryKey = new BsonDocument("_id", documentKey.get("_id"));
        return new BsonDocument("_id", primaryKey);
    }

    @Nonnull
    private SourceRecord normalizeHeartbeatRecord(@Nonnull SourceRecord heartbeatRecord) {
        Struct heartbeatValue = new Struct(SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build());
        heartbeatValue.put("ts_ms", (Object)Instant.now().toEpochMilli());
        return new SourceRecord(heartbeatRecord.sourcePartition(), heartbeatRecord.sourceOffset(), heartbeatRecord.topic(), heartbeatRecord.keySchema(), heartbeatRecord.key(), SchemaBuilder.struct().field("ts_ms", Schema.INT64_SCHEMA).build(), (Object)heartbeatValue);
    }

    @Nonnull
    private MongoNamespace getMongoNamespace(@Nonnull BsonDocument changeStreamDocument) {
        BsonDocument ns = changeStreamDocument.getDocument("ns");
        return new MongoNamespace(ns.getString("db").getValue(), ns.getString("coll").getValue());
    }

    private OperationType getOperationType(BsonDocument changeStreamDocument) {
        return OperationType.fromString(changeStreamDocument.getString("operationType").getValue());
    }

    private boolean isBoundedRead() {
        return !ChangeStreamOffset.NO_STOPPING_OFFSET.equals(this.streamSplit.getStopOffset());
    }
}

