package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import com.mongodb.MongoException;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.WriteModel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbWriter.class */
public class MongodbWriter implements SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> {
    private static final Logger log = LoggerFactory.getLogger(MongodbWriter.class);
    private MongodbClientProvider collectionProvider;
    private final DocumentSerializer<SeaTunnelRow> serializer;
    private long bulkActions;
    private final List<WriteModel<BsonDocument>> bulkRequests;
    private int maxRetries;
    private long retryIntervalMs;
    private long batchIntervalMs;
    private volatile long lastSendTime = 0;
    private boolean transaction;
    private final SinkWriter.Context context;

    public MongodbWriter(DocumentSerializer<SeaTunnelRow> documentSerializer, MongodbWriterOptions mongodbWriterOptions, SinkWriter.Context context) {
        initOptions(mongodbWriterOptions);
        this.context = context;
        this.serializer = documentSerializer;
        this.bulkRequests = new ArrayList();
    }

    private void initOptions(MongodbWriterOptions mongodbWriterOptions) {
        this.maxRetries = mongodbWriterOptions.getRetryMax();
        this.retryIntervalMs = mongodbWriterOptions.getRetryInterval();
        this.collectionProvider = MongodbCollectionProvider.builder().connectionString(mongodbWriterOptions.getConnectString()).database(mongodbWriterOptions.getDatabase()).collection(mongodbWriterOptions.getCollection()).build();
        this.bulkActions = mongodbWriterOptions.getFlushSize();
        this.batchIntervalMs = mongodbWriterOptions.getBatchIntervalMs();
        this.transaction = mongodbWriterOptions.transaction;
    }

    public void write(SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow.getRowKind() != RowKind.UPDATE_BEFORE) {
            this.bulkRequests.add(this.serializer.serializeToWriteModel(seaTunnelRow));
            if (this.transaction) {
                return;
            }
            if (isOverMaxBatchSizeLimit() || isOverMaxBatchIntervalLimit()) {
                doBulkWrite();
            }
        }
    }

    public Optional<MongodbCommitInfo> prepareCommit() {
        if (!this.transaction) {
            doBulkWrite();
            return Optional.empty();
        }
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger();
        Stream map = ((Map) this.bulkRequests.stream().map(this::convertModelToBsonDocument).collect(Collectors.groupingBy(bsonDocument -> {
            return Integer.valueOf(atomicInteger.getAndIncrement() / 1024);
        }))).values().stream().map(this::convertBsonDocumentListToDocumentBulk);
        Objects.requireNonNull(arrayList);
        map.forEach((v1) -> {
            r1.add(v1);
        });
        this.bulkRequests.clear();
        return Optional.of(new MongodbCommitInfo(arrayList));
    }

    private BsonDocument convertModelToBsonDocument(WriteModel<BsonDocument> writeModel) {
        if (writeModel instanceof InsertOneModel) {
            return (BsonDocument) ((InsertOneModel) writeModel).getDocument();
        }
        if (writeModel instanceof UpdateOneModel) {
            return (BsonDocument) ((UpdateOneModel) writeModel).getUpdate();
        }
        return null;
    }

    private DocumentBulk convertBsonDocumentListToDocumentBulk(List<BsonDocument> list) {
        DocumentBulk documentBulk = new DocumentBulk();
        Objects.requireNonNull(documentBulk);
        list.forEach(documentBulk::add);
        return documentBulk;
    }

    public void abortPrepare() {
    }

    public void close() {
        if (!this.transaction) {
            doBulkWrite();
        }
        if (this.collectionProvider != null) {
            this.collectionProvider.close();
        }
    }

    synchronized void doBulkWrite() {
        if (!this.bulkRequests.isEmpty() && !IntStream.rangeClosed(0, this.maxRetries).anyMatch(i -> {
            try {
                this.lastSendTime = System.currentTimeMillis();
                this.collectionProvider.getDefaultCollection().bulkWrite(this.bulkRequests, new BulkWriteOptions().ordered(true));
                this.bulkRequests.clear();
                return true;
            } catch (MongoException e) {
                log.debug("Bulk Write to MongoDB failed, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.maxRetries) {
                    throw new MongodbConnectorException(CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Bulk Write to MongoDB failed", e);
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(this.retryIntervalMs * (i + 1));
                    return false;
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new MongodbConnectorException(CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Unable to flush; interrupted while doing another attempt", e);
                }
            }
        })) {
            throw new MongodbConnectorException(CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, "Bulk Write to MongoDB failed after max retries");
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        return this.bulkActions != -1 && ((long) this.bulkRequests.size()) >= this.bulkActions;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        return this.batchIntervalMs != -1 && System.currentTimeMillis() - this.lastSendTime >= this.batchIntervalMs;
    }
}
