package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;

import com.google.auto.service.AutoService;
import java.util.Optional;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataDocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.commit.MongodbSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.DocumentBulk;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.state.MongodbCommitInfo;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

@AutoService({SeaTunnelSink.class})
/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/mongodb/sink/MongodbSink.class */
public class MongodbSink implements SeaTunnelSink<SeaTunnelRow, DocumentBulk, MongodbCommitInfo, MongodbAggregatedCommitInfo> {
    private MongodbWriterOptions options;
    private SeaTunnelRowType seaTunnelRowType;

    public void prepare(Config config) throws PrepareFailException {
        if (config.hasPath(MongodbConfig.URI.key()) && config.hasPath(MongodbConfig.DATABASE.key()) && config.hasPath(MongodbConfig.COLLECTION.key())) {
            String string = config.getString(MongodbConfig.URI.key());
            String string2 = config.getString(MongodbConfig.DATABASE.key());
            MongodbWriterOptions.Builder withCollection = MongodbWriterOptions.builder().withConnectString(string).withDatabase(string2).withCollection(config.getString(MongodbConfig.COLLECTION.key()));
            if (config.hasPath(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key())) {
                withCollection.withFlushSize(config.getInt(MongodbConfig.BUFFER_FLUSH_MAX_ROWS.key()));
            }
            if (config.hasPath(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())) {
                withCollection.withBatchIntervalMs(Long.valueOf(config.getLong(MongodbConfig.BUFFER_FLUSH_INTERVAL.key())));
            }
            if (config.hasPath(MongodbConfig.PRIMARY_KEY.key())) {
                withCollection.withPrimaryKey((String[]) config.getStringList(MongodbConfig.PRIMARY_KEY.key()).toArray(new String[0]));
            }
            MongodbConfig.PRIMARY_KEY.getFallbackKeys().forEach(str -> {
                if (config.hasPath(str)) {
                    withCollection.withPrimaryKey((String[]) config.getStringList(str).toArray(new String[0]));
                }
            });
            if (config.hasPath(MongodbConfig.UPSERT_ENABLE.key())) {
                withCollection.withUpsertEnable(config.getBoolean(MongodbConfig.UPSERT_ENABLE.key()));
            }
            if (config.hasPath(MongodbConfig.RETRY_MAX.key())) {
                withCollection.withRetryMax(config.getInt(MongodbConfig.RETRY_MAX.key()));
            }
            if (config.hasPath(MongodbConfig.RETRY_INTERVAL.key())) {
                withCollection.withRetryInterval(Long.valueOf(config.getLong(MongodbConfig.RETRY_INTERVAL.key())));
            }
            if (config.hasPath(MongodbConfig.TRANSACTION.key())) {
                withCollection.withTransaction(config.getBoolean(MongodbConfig.TRANSACTION.key()));
            }
            this.options = withCollection.build();
        }
    }

    public String getPluginName() {
        return MongodbConfig.CONNECTOR_IDENTITY;
    }

    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        this.seaTunnelRowType = seaTunnelRowType;
    }

    public SinkWriter<SeaTunnelRow, MongodbCommitInfo, DocumentBulk> createWriter(SinkWriter.Context context) {
        return new MongodbWriter(new RowDataDocumentSerializer(RowDataToBsonConverters.createConverter(this.seaTunnelRowType), this.options, new MongoKeyExtractor(this.options)), this.options, context);
    }

    public Optional<Serializer<DocumentBulk>> getWriterStateSerializer() {
        return this.options.transaction ? Optional.of(new DefaultSerializer()) : Optional.empty();
    }

    public Optional<SinkAggregatedCommitter<MongodbCommitInfo, MongodbAggregatedCommitInfo>> createAggregatedCommitter() {
        return this.options.transaction ? Optional.of(new MongodbSinkAggregatedCommitter(this.options)) : Optional.empty();
    }

    public Optional<Serializer<MongodbAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
        return this.options.transaction ? Optional.of(new DefaultSerializer()) : Optional.empty();
    }

    public Optional<Serializer<MongodbCommitInfo>> getCommitInfoSerializer() {
        return this.options.transaction ? Optional.of(new DefaultSerializer()) : Optional.empty();
    }
}
