/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.sink;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.kafka.connect.sink.MongoSinkConfig;
import com.mongodb.kafka.connect.sink.StartedMongoSinkTask;
import com.mongodb.kafka.connect.sink.dlq.ErrorReporter;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ServerApiConfig;
import com.mongodb.kafka.connect.util.SslConfigs;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoSinkTask
extends SinkTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoSinkTask.class);
    private static final String CONNECTOR_TYPE = "sink";
    private StartedMongoSinkTask startedTask;

    public String version() {
        return "1.10.1";
    }

    public void start(Map<String, String> props) {
        LOGGER.info("Starting MongoDB sink task");
        MongoClient client = null;
        try {
            MongoSinkConfig sinkConfig = new MongoSinkConfig(props);
            client = MongoSinkTask.createMongoClient(sinkConfig);
            this.startedTask = new StartedMongoSinkTask(sinkConfig, client, this.createErrorReporter());
        }
        catch (RuntimeException taskStartingException) {
            try {
                MongoClient autoCloseableClient = client;
                if (autoCloseableClient != null) {
                    autoCloseableClient.close();
                }
            }
            catch (RuntimeException resourceReleasingException) {
                taskStartingException.addSuppressed(resourceReleasingException);
            }
            throw new ConnectException("Failed to start MongoDB sink task", (Throwable)taskStartingException);
        }
        LOGGER.debug("Started MongoDB sink task");
    }

    public void put(Collection<SinkRecord> records) {
        this.startedTask.put(records);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        LOGGER.debug("Flush called - noop");
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB sink task");
        if (this.startedTask != null) {
            this.startedTask.close();
        }
    }

    private ErrorReporter createErrorReporter() {
        ErrorReporter result = MongoSinkTask.nopErrorReporter();
        if (this.context != null) {
            try {
                ErrantRecordReporter errantRecordReporter = this.context.errantRecordReporter();
                if (errantRecordReporter != null) {
                    result = (arg_0, arg_1) -> ((ErrantRecordReporter)errantRecordReporter).report(arg_0, arg_1);
                } else {
                    LOGGER.info("Errant record reporter not configured.");
                }
            }
            catch (NoClassDefFoundError | NoSuchMethodError e) {
                LOGGER.info("Kafka versions prior to 2.6 do not support the errant record reporter.");
            }
        }
        return result;
    }

    static ErrorReporter nopErrorReporter() {
        return (record, e) -> {};
    }

    private static MongoClient createMongoClient(MongoSinkConfig sinkConfig) {
        MongoClientSettings.Builder builder = MongoClientSettings.builder().applyConnectionString(sinkConfig.getConnectionString()).applyToSslSettings(sslBuilder -> SslConfigs.setupSsl(sslBuilder, sinkConfig));
        ServerApiConfig.setServerApi(builder, sinkConfig);
        return MongoClients.create(builder.build(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE, sinkConfig.getString("provider")));
    }
}

