/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.kafka.connect.source;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.event.CommandFailedEvent;
import com.mongodb.event.CommandListener;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.kafka.connect.source.MongoCopyDataManager;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.StartedMongoSourceTask;
import com.mongodb.kafka.connect.source.statistics.JmxStatisticsManager;
import com.mongodb.kafka.connect.source.statistics.StatisticsManager;
import com.mongodb.kafka.connect.util.Assertions;
import com.mongodb.kafka.connect.util.ConfigHelper;
import com.mongodb.kafka.connect.util.ResumeTokenUtils;
import com.mongodb.kafka.connect.util.ServerApiConfig;
import com.mongodb.kafka.connect.util.SslConfigs;
import com.mongodb.kafka.connect.util.jmx.SourceTaskStatistics;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MongoSourceTask
extends SourceTask {
    static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
    private static final String CONNECTOR_TYPE = "source";
    public static final String ID_FIELD = "_id";
    static final String COPY_KEY = "copy";
    private static final String NS_KEY = "ns";
    private static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final int FAILED_TO_PARSE_ERROR = 9;
    private StartedMongoSourceTask startedTask;

    public String version() {
        return "1.10.1";
    }

    public void start(Map<String, String> props) {
        LOGGER.info("Starting MongoDB source task");
        StatisticsManager statisticsManager = null;
        MongoClient mongoClient = null;
        MongoCopyDataManager copyDataManager = null;
        try {
            MongoSourceConfig sourceConfig = new MongoSourceConfig(props);
            boolean shouldCopyData = MongoSourceTask.shouldCopyData(this.context, sourceConfig);
            String connectorName = JmxStatisticsManager.getConnectorName(props);
            final StatisticsManager statsManager = statisticsManager = new JmxStatisticsManager(shouldCopyData, connectorName);
            CommandListener statisticsCommandListener = new CommandListener(){

                @Override
                public void commandSucceeded(CommandSucceededEvent event) {
                    MongoSourceTask.mongoCommandSucceeded(event, statsManager.currentStatistics());
                }

                @Override
                public void commandFailed(CommandFailedEvent event) {
                    MongoSourceTask.mongoCommandFailed(event, statsManager.currentStatistics());
                }
            };
            MongoClientSettings.Builder builder = MongoClientSettings.builder().applyConnectionString(sourceConfig.getConnectionString()).addCommandListener(statisticsCommandListener).applyToSslSettings(sslBuilder -> SslConfigs.setupSsl(sslBuilder, sourceConfig));
            ServerApiConfig.setServerApi(builder, sourceConfig);
            mongoClient = MongoClients.create(builder.build(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE, sourceConfig.getString("provider")));
            copyDataManager = shouldCopyData ? new MongoCopyDataManager(sourceConfig, mongoClient) : null;
            this.startedTask = new StartedMongoSourceTask(() -> this.context, sourceConfig, mongoClient, copyDataManager, statisticsManager);
        }
        catch (RuntimeException taskStartingException) {
            try (StatisticsManager autoCloseableStatisticsManager = statisticsManager;
                 MongoClient autoCloseableMongoClient = mongoClient;){
                MongoCopyDataManager autoCloseableCopyDataManager = copyDataManager;
                if (autoCloseableCopyDataManager != null) {
                    autoCloseableCopyDataManager.close();
                }
            }
            catch (RuntimeException resourceReleasingException) {
                taskStartingException.addSuppressed(resourceReleasingException);
            }
            throw new ConnectException("Failed to start MongoDB source task", (Throwable)taskStartingException);
        }
        LOGGER.info("Started MongoDB source task");
    }

    StartedMongoSourceTask startedTask() {
        return Assertions.assertNotNull(this.startedTask);
    }

    public List<SourceRecord> poll() {
        return this.startedTask.poll();
    }

    public void stop() {
        LOGGER.info("Stopping MongoDB source task");
        if (this.startedTask != null) {
            this.startedTask.close();
        }
    }

    static boolean doesNotSupportsStartAfter(MongoCommandException e) {
        return (e.getErrorCode() == 9 || e.getErrorCode() == 40415) && e.getErrorMessage().contains("startAfter");
    }

    static Map<String, Object> createPartitionMap(MongoSourceConfig sourceConfig) {
        String partitionName = sourceConfig.getString("offset.partition.name");
        if (partitionName.isEmpty()) {
            partitionName = MongoSourceTask.createDefaultPartitionName(sourceConfig);
        }
        return Collections.singletonMap(NS_KEY, partitionName);
    }

    static Map<String, Object> createLegacyPartitionMap(MongoSourceConfig sourceConfig) {
        return Collections.singletonMap(NS_KEY, MongoSourceTask.createLegacyPartitionName(sourceConfig));
    }

    static String createLegacyPartitionName(MongoSourceConfig sourceConfig) {
        return String.format("%s/%s.%s", sourceConfig.getString("connection.uri"), sourceConfig.getString("database"), sourceConfig.getString("collection"));
    }

    static String createDefaultPartitionName(MongoSourceConfig sourceConfig) {
        ConnectionString connectionString = sourceConfig.getConnectionString();
        StringBuilder builder = new StringBuilder();
        builder.append(connectionString.isSrvProtocol() ? "mongodb+srv://" : "mongodb://");
        builder.append(String.join((CharSequence)",", connectionString.getHosts()));
        builder.append("/");
        builder.append(sourceConfig.getString("database"));
        if (!sourceConfig.getString("collection").isEmpty()) {
            builder.append(".");
            builder.append(sourceConfig.getString("collection"));
        }
        return builder.toString();
    }

    private static boolean shouldCopyData(SourceTaskContext context, MongoSourceConfig sourceConfig) {
        Map<String, Object> offset = MongoSourceTask.getOffset(context, sourceConfig);
        return sourceConfig.getStartupConfig().startupMode() == MongoSourceConfig.StartupConfig.StartupMode.COPY_EXISTING && (offset == null || offset.containsKey(COPY_KEY));
    }

    static Map<String, Object> getOffset(SourceTaskContext context, MongoSourceConfig sourceConfig) {
        if (context != null) {
            Map offset = context.offsetStorageReader().offset(MongoSourceTask.createPartitionMap(sourceConfig));
            if (offset == null && sourceConfig.getString("offset.partition.name").isEmpty()) {
                offset = context.offsetStorageReader().offset(MongoSourceTask.createLegacyPartitionMap(sourceConfig));
            }
            return offset;
        }
        return null;
    }

    public void commitRecord(SourceRecord record, RecordMetadata metadata) {
        this.startedTask.commitRecord(record, metadata);
    }

    static void mongoCommandSucceeded(CommandSucceededEvent event, SourceTaskStatistics currentStatistics) {
        String commandName = event.getCommandName();
        long elapsedTimeMs = event.getElapsedTime(TimeUnit.MILLISECONDS);
        if ("getMore".equals(commandName)) {
            currentStatistics.getGetmoreCommandsSuccessful().sample(elapsedTimeMs);
        } else if ("aggregate".equals(commandName) || "find".equals(commandName)) {
            currentStatistics.getInitialCommandsSuccessful().sample(elapsedTimeMs);
        }
        ResumeTokenUtils.getResponseOffsetSecs(event.getResponse()).ifPresent(offset -> currentStatistics.getLatestMongodbTimeDifferenceSecs().sample(offset));
    }

    static void mongoCommandFailed(CommandFailedEvent event, SourceTaskStatistics currentStatistics) {
        Throwable e = event.getThrowable();
        if (e instanceof MongoCommandException && MongoSourceTask.doesNotSupportsStartAfter((MongoCommandException)e)) {
            return;
        }
        String commandName = event.getCommandName();
        long elapsedTimeMs = event.getElapsedTime(TimeUnit.MILLISECONDS);
        if ("getMore".equals(commandName)) {
            currentStatistics.getGetmoreCommandsFailed().sample(elapsedTimeMs);
        } else if ("aggregate".equals(commandName) || "find".equals(commandName)) {
            currentStatistics.getInitialCommandsFailed().sample(elapsedTimeMs);
        }
    }
}

