/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.dialect.MongodbDialect;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.fetch.MongodbStreamFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbScanFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(MongodbScanFetchTask.class);
    private final SnapshotSplit snapshotSplit;
    private volatile boolean taskRunning = false;

    public MongodbScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MongodbFetchTaskContext taskContext = (MongodbFetchTaskContext)context;
        MongodbSourceConfig sourceConfig = taskContext.getSourceConfig();
        MongodbDialect dialect = taskContext.getDialect();
        ChangeEventQueue<DataChangeEvent> changeEventQueue = taskContext.getQueue();
        this.taskRunning = true;
        TableId collectionId = this.snapshotSplit.getTableId();
        ChangeStreamOffset lowWatermark = dialect.displayCurrentOffset(sourceConfig);
        log.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
        changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)this.snapshotSplit.splitId(), (WatermarkKind)WatermarkKind.LOW, (Offset)lowWatermark)));
        log.info("Snapshot step 2 - Snapshotting data");
        try (MongoCursor<RawBsonDocument> cursor = this.getSnapshotCursor(this.snapshotSplit, sourceConfig);){
            while (cursor.hasNext()) {
                this.checkTaskRunning();
                BsonDocument valueDocument = this.normalizeSnapshotDocument(collectionId, cursor.next());
                BsonDocument keyDocument = new BsonDocument("_id", valueDocument.get("_id"));
                SourceRecord snapshotRecord = this.buildSourceRecord(sourceConfig, collectionId, keyDocument, valueDocument);
                changeEventQueue.enqueue((Object)new DataChangeEvent(snapshotRecord));
            }
            ChangeStreamOffset highWatermark = dialect.displayCurrentOffset(sourceConfig);
            log.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
            changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)this.snapshotSplit.splitId(), (WatermarkKind)WatermarkKind.HIGH, (Offset)highWatermark)));
            log.info("Snapshot step 4 - Back fill stream split for snapshot split {}", (Object)this.snapshotSplit);
            IncrementalSplit dataBackfillSplit = this.createBackfillStreamSplit(lowWatermark, highWatermark);
            boolean streamBackfillRequired = dataBackfillSplit.getStopOffset().isAfter(dataBackfillSplit.getStartupOffset());
            if (!streamBackfillRequired) {
                changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongodbRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)dataBackfillSplit.splitId(), (WatermarkKind)WatermarkKind.END, (Offset)dataBackfillSplit.getStopOffset())));
            } else {
                MongodbStreamFetchTask dataBackfillTask = new MongodbStreamFetchTask(dataBackfillSplit);
                dataBackfillTask.execute(taskContext);
            }
        }
        catch (Exception e) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Execute snapshot read subtask for mongodb split %s fail", this.snapshotSplit));
        }
        finally {
            this.taskRunning = false;
        }
    }

    @Nonnull
    private MongoCursor<RawBsonDocument> getSnapshotCursor(@Nonnull SnapshotSplit snapshotSplit, MongodbSourceConfig sourceConfig) {
        MongoClient mongoClient = MongodbUtils.createMongoClient(sourceConfig);
        MongoCollection<RawBsonDocument> collection = MongodbUtils.getMongoCollection(mongoClient, snapshotSplit.getTableId(), RawBsonDocument.class);
        BsonDocument startKey = (BsonDocument)snapshotSplit.getSplitStart()[1];
        BsonDocument endKey = (BsonDocument)snapshotSplit.getSplitEnd()[1];
        BsonDocument hint = (BsonDocument)snapshotSplit.getSplitStart()[0];
        log.info("Initializing snapshot split processing: TableId={}, StartKey={}, EndKey={}, Hint={}", new Object[]{snapshotSplit.getTableId(), startKey, endKey, hint});
        return collection.find().min(startKey).max(endKey).hint(hint).batchSize(sourceConfig.getBatchSize()).noCursorTimeout(true).cursor();
    }

    @Nonnull
    private SourceRecord buildSourceRecord(@Nonnull MongodbSourceConfig sourceConfig, @Nonnull TableId collectionId, BsonDocument keyDocument, BsonDocument valueDocument) {
        return MongodbRecordUtils.buildSourceRecord(MongodbRecordUtils.createPartitionMap(sourceConfig.getHosts(), collectionId.catalog(), collectionId.table()), MongodbRecordUtils.createSourceOffsetMap(keyDocument.getDocument("_id"), true), collectionId.identifier(), keyDocument, valueDocument);
    }

    private void checkTaskRunning() {
        if (!this.taskRunning) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Interrupted while snapshotting collection");
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public void shutdown() {
        this.taskRunning = false;
    }

    public SnapshotSplit getSplit() {
        return this.snapshotSplit;
    }

    private IncrementalSplit createBackfillStreamSplit(ChangeStreamOffset lowWatermark, ChangeStreamOffset highWatermark) {
        return new IncrementalSplit(this.snapshotSplit.splitId(), Collections.singletonList(this.snapshotSplit.getTableId()), (Offset)lowWatermark, (Offset)highWatermark, new ArrayList());
    }

    private BsonDocument normalizeSnapshotDocument(@Nonnull TableId collectionId, @Nonnull BsonDocument originalDocument) {
        return new BsonDocument().append("_id", new BsonDocument("_id", originalDocument.get("_id"))).append("operationType", new BsonString("insert")).append("ns", new BsonDocument("db", new BsonString(collectionId.catalog())).append("coll", new BsonString(collectionId.table()))).append("documentKey", new BsonDocument("_id", originalDocument.get("_id"))).append("fullDocument", originalDocument).append("ts_ms", new BsonInt64(System.currentTimeMillis())).append("source", new BsonDocument("snapshot", new BsonString("true")).append("ts_ms", new BsonInt64(0L)));
    }
}

