/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils;

import com.mongodb.ConnectionString;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import io.debezium.relational.TableId;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.CollectionDiscoveryUtils;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongodbUtils {
    private static final Logger log = LoggerFactory.getLogger(MongodbUtils.class);
    private static final Map<TableId, MongoCollection<?>> cache = new ConcurrentHashMap();

    public static ChangeStreamDescriptor getChangeStreamDescriptor(@Nonnull MongodbSourceConfig sourceConfig, List<String> discoveredDatabases, List<String> discoveredCollections) {
        ChangeStreamDescriptor changeStreamFilter;
        List<String> databaseList = sourceConfig.getDatabaseList();
        List<String> collectionList = sourceConfig.getCollectionList();
        if (collectionList != null) {
            if (MongodbUtils.isIncludeListExplicitlySpecified(collectionList, discoveredCollections)) {
                changeStreamFilter = ChangeStreamDescriptor.collection(TableId.parse((String)discoveredCollections.get(0)));
            } else {
                Pattern namespaceRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(collectionList);
                if (databaseList != null) {
                    if (MongodbUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                        changeStreamFilter = ChangeStreamDescriptor.database(discoveredDatabases.get(0), namespaceRegex);
                    } else {
                        Pattern databaseRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(databaseList);
                        changeStreamFilter = ChangeStreamDescriptor.deployment(databaseRegex, namespaceRegex);
                    }
                } else {
                    changeStreamFilter = ChangeStreamDescriptor.deployment(null, namespaceRegex);
                }
            }
        } else if (databaseList != null) {
            if (MongodbUtils.isIncludeListExplicitlySpecified(databaseList, discoveredDatabases)) {
                changeStreamFilter = ChangeStreamDescriptor.database(discoveredDatabases.get(0));
            } else {
                Pattern databaseRegex = CollectionDiscoveryUtils.includeListAsFlatPattern(databaseList);
                changeStreamFilter = ChangeStreamDescriptor.deployment(databaseRegex);
            }
        } else {
            changeStreamFilter = ChangeStreamDescriptor.deployment();
        }
        return changeStreamFilter;
    }

    public static boolean isIncludeListExplicitlySpecified(List<String> includeList, List<String> discoveredList) {
        if (includeList == null || includeList.size() != 1) {
            return false;
        }
        if (discoveredList == null || discoveredList.size() != 1) {
            return false;
        }
        String firstOfIncludeList = includeList.get(0);
        String firstOfDiscoveredList = discoveredList.get(0);
        return firstOfDiscoveredList.equals(firstOfIncludeList);
    }

    @Nonnull
    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongodbSourceConfig sourceConfig, @Nonnull ChangeStreamDescriptor descriptor) {
        return MongodbUtils.getChangeStreamIterable(MongodbUtils.createMongoClient(sourceConfig), descriptor.getDatabase(), descriptor.getCollection(), descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), sourceConfig.getBatchSize(), sourceConfig.isUpdateLookup());
    }

    @Nonnull
    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongoClient mongoClient, @Nonnull ChangeStreamDescriptor descriptor, int batchSize, boolean updateLookup) {
        return MongodbUtils.getChangeStreamIterable(mongoClient, descriptor.getDatabase(), descriptor.getCollection(), descriptor.getDatabaseRegex(), descriptor.getNamespaceRegex(), batchSize, updateLookup);
    }

    @Nonnull
    public static ChangeStreamIterable<Document> getChangeStreamIterable(MongoClient mongoClient, String database, String collection, Pattern databaseRegex, Pattern namespaceRegex, int batchSize, boolean updateLookup) {
        ChangeStreamIterable<Document> changeStream;
        if (StringUtils.isNotEmpty((CharSequence)database) && StringUtils.isNotEmpty((CharSequence)collection)) {
            MongoCollection<Document> coll = mongoClient.getDatabase(database).getCollection(collection);
            log.info("Preparing change stream for collection {}.{}", (Object)database, (Object)collection);
            changeStream = coll.watch();
        } else if (StringUtils.isNotEmpty((CharSequence)database) && namespaceRegex != null) {
            MongoDatabase db = mongoClient.getDatabase(database);
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
            Bson nsFilter = Filters.regex("_ns_", namespaceRegex);
            pipeline.add(Aggregates.match(nsFilter));
            log.info("Preparing change stream for database {} with namespace regex filter {}", (Object)database, (Object)namespaceRegex);
            changeStream = db.watch(pipeline);
        } else if (StringUtils.isNotEmpty((CharSequence)database)) {
            MongoDatabase db = mongoClient.getDatabase(database);
            log.info("Preparing change stream for database {}", (Object)database);
            changeStream = db.watch();
        } else if (namespaceRegex != null) {
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(CollectionDiscoveryUtils.ADD_NS_FIELD);
            Bson nsFilter = Filters.regex("_ns_", namespaceRegex);
            if (databaseRegex != null) {
                Bson dbFilter = Filters.regex("ns.db", databaseRegex);
                nsFilter = Filters.and(dbFilter, nsFilter);
                log.info("Preparing change stream for deployment with database regex filter {} and namespace regex filter {}", (Object)databaseRegex, (Object)namespaceRegex);
            } else {
                log.info("Preparing change stream for deployment with namespace regex filter {}", (Object)namespaceRegex);
            }
            pipeline.add(Aggregates.match(nsFilter));
            changeStream = mongoClient.watch(pipeline);
        } else if (databaseRegex != null) {
            ArrayList<Bson> pipeline = new ArrayList<Bson>();
            pipeline.add(Aggregates.match(Filters.regex("ns.db", databaseRegex)));
            log.info("Preparing change stream for deployment  with database regex filter {}", (Object)databaseRegex);
            changeStream = mongoClient.watch(pipeline);
        } else {
            log.info("Preparing change stream for deployment");
            changeStream = mongoClient.watch();
        }
        if (batchSize > 0) {
            changeStream.batchSize(batchSize);
        }
        if (updateLookup) {
            changeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        return changeStream;
    }

    public static BsonDocument getLatestResumeToken(MongoClient mongoClient, ChangeStreamDescriptor descriptor) {
        ChangeStreamIterable<Document> changeStreamIterable = MongodbUtils.getChangeStreamIterable(mongoClient, descriptor, 1, false);
        try (MongoCursor changeStreamCursor = changeStreamIterable.cursor();){
            ChangeStreamDocument firstResult = (ChangeStreamDocument)changeStreamCursor.tryNext();
            BsonDocument bsonDocument = firstResult != null ? firstResult.getResumeToken() : changeStreamCursor.getResumeToken();
            return bsonDocument;
        }
    }

    public static boolean isCommandSucceed(BsonDocument commandResult) {
        return commandResult != null && MongodbSourceOptions.COMMAND_SUCCEED_FLAG.equals(commandResult.getDouble("ok"));
    }

    public static String commandErrorMessage(BsonDocument commandResult) {
        return Optional.ofNullable(commandResult).map(doc -> doc.getString("errmsg")).map(BsonString::getValue).orElse(null);
    }

    @Nonnull
    public static BsonDocument collStats(@Nonnull MongoClient mongoClient, @Nonnull TableId collectionId) {
        BsonDocument collStatsCommand = new BsonDocument("collStats", new BsonString(collectionId.table()));
        return mongoClient.getDatabase(collectionId.catalog()).runCommand((Bson)collStatsCommand, BsonDocument.class);
    }

    @Nonnull
    public static BsonDocument splitVector(MongoClient mongoClient, TableId collectionId, BsonDocument keyPattern, int maxChunkSizeMB) {
        return MongodbUtils.splitVector(mongoClient, collectionId, keyPattern, maxChunkSizeMB, null, null);
    }

    @Nonnull
    public static BsonDocument splitVector(@Nonnull MongoClient mongoClient, @Nonnull TableId collectionId, BsonDocument keyPattern, int maxChunkSizeMB, BsonDocument min, BsonDocument max) {
        BsonDocument splitVectorCommand = new BsonDocument("splitVector", new BsonString(collectionId.identifier())).append("keyPattern", keyPattern).append("maxChunkSize", new BsonInt32(maxChunkSizeMB));
        Optional.ofNullable(min).ifPresent(v -> splitVectorCommand.append("min", (BsonValue)v));
        Optional.ofNullable(max).ifPresent(v -> splitVectorCommand.append("max", (BsonValue)v));
        return mongoClient.getDatabase(collectionId.catalog()).runCommand((Bson)splitVectorCommand, BsonDocument.class);
    }

    public static BsonTimestamp getCurrentClusterTime(MongoClient mongoClient) {
        BsonDocument isMasterResult = MongodbUtils.isMaster(mongoClient);
        if (!MongodbUtils.isCommandSucceed(isMasterResult)) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, "Failed to execute isMaster command: " + MongodbUtils.commandErrorMessage(isMasterResult));
        }
        return isMasterResult.getDocument("$clusterTime").getTimestamp("clusterTime");
    }

    @Nonnull
    public static BsonDocument isMaster(@Nonnull MongoClient mongoClient) {
        BsonDocument isMasterCommand = new BsonDocument("isMaster", new BsonInt32(1));
        return mongoClient.getDatabase("admin").runCommand((Bson)isMasterCommand, BsonDocument.class);
    }

    @Nonnull
    public static List<BsonDocument> readChunks(MongoClient mongoClient, @Nonnull BsonDocument collectionMetadata) {
        MongoCollection<BsonDocument> chunks = MongodbUtils.getMongoCollection(mongoClient, TableId.parse((String)"config.chunks"), BsonDocument.class);
        ArrayList<BsonDocument> collectionChunks = new ArrayList<BsonDocument>();
        Bson filter = Filters.or(new BsonDocument("ns", collectionMetadata.get("_id")), new BsonDocument("uuid", collectionMetadata.get("uuid")));
        chunks.find(filter).projection(Projections.include("min", "max", "shard")).sort(Sorts.ascending("min")).into(collectionChunks);
        return collectionChunks;
    }

    public static BsonDocument readCollectionMetadata(MongoClient mongoClient, @Nonnull TableId collectionId) {
        MongoCollection<BsonDocument> collection = MongodbUtils.getMongoCollection(mongoClient, TableId.parse((String)"config.collections"), BsonDocument.class);
        return (BsonDocument)collection.find(Filters.eq("_id", collectionId.identifier())).projection(Projections.include("_id", "uuid", "dropped", "documentKey")).first();
    }

    @Nonnull
    public static <T> MongoCollection<T> getMongoCollection(MongoClient mongoClient, TableId collectionId, Class<T> documentClass) {
        return MongodbUtils.getCollection(mongoClient, collectionId, documentClass);
    }

    @Nonnull
    public static <T> MongoCollection<T> getCollection(MongoClient mongoClient, TableId collectionId, Class<T> documentClass) {
        MongoCollection<?> cachedCollection = cache.get(collectionId);
        if (cachedCollection == null) {
            MongoCollection<T> collection = mongoClient.getDatabase(collectionId.catalog()).getCollection(collectionId.table(), documentClass);
            cache.put(collectionId, collection);
            return collection;
        }
        return cachedCollection;
    }

    public static MongoClient createMongoClient(MongodbSourceConfig sourceConfig) {
        return MongodbClientProvider.INSTANCE.getOrCreateMongoClient(sourceConfig);
    }

    @Nonnull
    public static ConnectionString buildConnectionString(String username, String password, String hosts, String connectionOptions) {
        StringBuilder sb = new StringBuilder("mongodb://");
        if (MongodbUtils.hasCredentials(username, password)) {
            MongodbUtils.appendCredentials(sb, username, password);
        }
        sb.append(hosts);
        if (StringUtils.isNotEmpty((CharSequence)connectionOptions)) {
            sb.append("/?").append(connectionOptions);
        }
        return new ConnectionString(sb.toString());
    }

    private static boolean hasCredentials(String username, String password) {
        return StringUtils.isNotEmpty((CharSequence)username) && StringUtils.isNotEmpty((CharSequence)password);
    }

    private static void appendCredentials(@Nonnull StringBuilder sb, String username, String password) {
        sb.append(MongodbUtils.encodeValue(username)).append(":").append(MongodbUtils.encodeValue(password)).append("@");
    }

    public static String encodeValue(String value) {
        try {
            return URLEncoder.encode(value, StandardCharsets.UTF_8.name());
        }
        catch (UnsupportedEncodingException e) {
            throw new MongodbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, e.getMessage());
        }
    }

    public static boolean checkIfChangeStreamCursorExpires(MongoCommandException e) {
        return MongodbSourceOptions.INVALID_CHANGE_STREAM_ERRORS.contains(e.getCode());
    }

    public static boolean checkIfResumeTokenExpires(MongoCommandException e) {
        if (e.getCode() != 280) {
            return false;
        }
        String errorMessage = e.getErrorMessage().toLowerCase(Locale.ROOT);
        return errorMessage.contains("resume token") && (errorMessage.contains("not found") || errorMessage.contains("does not exist") || errorMessage.contains("invalid resume token") || errorMessage.contains("no longer be in the oplog"));
    }
}

