/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.session;

import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.InvalidKeyspaceException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metadata.DistanceEvent;
import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent;
import com.datastax.oss.driver.internal.core.metadata.TopologyEvent;
import com.datastax.oss.driver.internal.core.pool.ChannelPool;
import com.datastax.oss.driver.internal.core.pool.ChannelPoolFactory;
import com.datastax.oss.driver.internal.core.session.ReprepareOnUp;
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.ReplayingEventFilter;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
import com.datastax.oss.driver.shaded.guava.common.collect.MapMaker;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.util.concurrent.EventExecutor;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class PoolManager
implements AsyncAutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(PoolManager.class);
    private volatile CqlIdentifier keyspace;
    private final ConcurrentMap<Node, ChannelPool> pools = new ConcurrentHashMap<Node, ChannelPool>(16, 0.75f, 1);
    private final ConcurrentMap<ByteBuffer, RepreparePayload> repreparePayloads;
    private final String logPrefix;
    private final EventExecutor adminExecutor;
    private final DriverExecutionProfile config;
    private final SingleThreaded singleThreaded;

    public PoolManager(InternalDriverContext context) {
        this.logPrefix = context.getSessionName();
        this.adminExecutor = context.getNettyOptions().adminEventExecutorGroup().next();
        this.config = context.getConfig().getDefaultProfile();
        this.singleThreaded = new SingleThreaded(context);
        if (this.config.getBoolean(DefaultDriverOption.PREPARED_CACHE_WEAK_VALUES, true)) {
            LOG.debug("[{}] Prepared statements cache configured to use weak values", (Object)this.logPrefix);
            this.repreparePayloads = new MapMaker().weakValues().makeMap();
        } else {
            LOG.debug("[{}] Prepared statements cache configured to use strong values", (Object)this.logPrefix);
            this.repreparePayloads = new MapMaker().makeMap();
        }
    }

    public CompletionStage<Void> init(CqlIdentifier keyspace) {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.init(keyspace));
        return this.singleThreaded.initFuture;
    }

    public CqlIdentifier getKeyspace() {
        return this.keyspace;
    }

    public CompletionStage<Void> setKeyspace(CqlIdentifier newKeyspace) {
        CqlIdentifier oldKeyspace = this.keyspace;
        if (Objects.equals(oldKeyspace, newKeyspace)) {
            return CompletableFuture.completedFuture(null);
        }
        if (this.config.getBoolean(DefaultDriverOption.REQUEST_WARN_IF_SET_KEYSPACE)) {
            LOG.warn("[{}] Detected a keyspace change at runtime ({} => {}). This is an anti-pattern that should be avoided in production (see '{}' in the configuration).", new Object[]{this.logPrefix, oldKeyspace == null ? "<none>" : oldKeyspace.asInternal(), newKeyspace.asInternal(), DefaultDriverOption.REQUEST_WARN_IF_SET_KEYSPACE.getPath()});
        }
        this.keyspace = newKeyspace;
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.setKeyspace(newKeyspace, result));
        return result;
    }

    public Map<Node, ChannelPool> getPools() {
        return this.pools;
    }

    public ConcurrentMap<ByteBuffer, RepreparePayload> getRepreparePayloads() {
        return this.repreparePayloads;
    }

    @Override
    @NonNull
    public CompletionStage<Void> closeFuture() {
        return this.singleThreaded.closeFuture;
    }

    @Override
    @NonNull
    public CompletionStage<Void> closeAsync() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.close());
        return this.singleThreaded.closeFuture;
    }

    @Override
    @NonNull
    public CompletionStage<Void> forceCloseAsync() {
        RunOrSchedule.on(this.adminExecutor, () -> this.singleThreaded.forceClose());
        return this.singleThreaded.closeFuture;
    }

    private class SingleThreaded {
        private final InternalDriverContext context;
        private final ChannelPoolFactory channelPoolFactory;
        private final CompletableFuture<Void> initFuture = new CompletableFuture();
        private boolean initWasCalled;
        private final CompletableFuture<Void> closeFuture = new CompletableFuture();
        private boolean closeWasCalled;
        private boolean forceCloseWasCalled;
        private final Object distanceListenerKey;
        private final ReplayingEventFilter<DistanceEvent> distanceEventFilter = new ReplayingEventFilter<DistanceEvent>(this::processDistanceEvent);
        private final Object stateListenerKey;
        private final ReplayingEventFilter<NodeStateEvent> stateEventFilter = new ReplayingEventFilter<NodeStateEvent>(this::processStateEvent);
        private final Object topologyListenerKey;
        private final Map<Node, CompletionStage<ChannelPool>> pending = new HashMap<Node, CompletionStage<ChannelPool>>();
        private final Map<Node, DistanceEvent> pendingDistanceEvents = new WeakHashMap<Node, DistanceEvent>();
        private final Map<Node, NodeStateEvent> pendingStateEvents = new WeakHashMap<Node, NodeStateEvent>();

        private SingleThreaded(InternalDriverContext context) {
            this.context = context;
            this.channelPoolFactory = context.getChannelPoolFactory();
            this.distanceListenerKey = context.getEventBus().register(DistanceEvent.class, RunOrSchedule.on(PoolManager.this.adminExecutor, this::onDistanceEvent));
            this.stateListenerKey = context.getEventBus().register(NodeStateEvent.class, RunOrSchedule.on(PoolManager.this.adminExecutor, this::onStateEvent));
            this.topologyListenerKey = context.getEventBus().register(TopologyEvent.class, RunOrSchedule.on(PoolManager.this.adminExecutor, this::onTopologyEvent));
        }

        private void init(CqlIdentifier keyspace) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (this.initWasCalled) {
                return;
            }
            this.initWasCalled = true;
            LOG.debug("[{}] Starting initialization", (Object)PoolManager.this.logPrefix);
            PoolManager.this.keyspace = keyspace;
            this.distanceEventFilter.start();
            this.stateEventFilter.start();
            Collection<Node> nodes = this.context.getMetadataManager().getMetadata().getNodes().values();
            ArrayList poolStages = new ArrayList(nodes.size());
            for (Node node : nodes) {
                NodeDistance distance = node.getDistance();
                if (distance == NodeDistance.IGNORED) {
                    LOG.debug("[{}] Skipping {} because it is IGNORED", (Object)PoolManager.this.logPrefix, (Object)node);
                    continue;
                }
                if (node.getState() == NodeState.FORCED_DOWN) {
                    LOG.debug("[{}] Skipping {} because it is FORCED_DOWN", (Object)PoolManager.this.logPrefix, (Object)node);
                    continue;
                }
                LOG.debug("[{}] Creating a pool for {}", (Object)PoolManager.this.logPrefix, (Object)node);
                poolStages.add(this.channelPoolFactory.init(node, keyspace, distance, this.context, PoolManager.this.logPrefix));
            }
            CompletableFutures.whenAllDone(poolStages, () -> this.onPoolsInit(poolStages), PoolManager.this.adminExecutor);
        }

        private void onPoolsInit(List<CompletionStage<ChannelPool>> poolStages) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            LOG.debug("[{}] All pools have finished initializing", (Object)PoolManager.this.logPrefix);
            boolean allInvalidKeyspaces = poolStages.size() > 0;
            for (CompletionStage<ChannelPool> poolStage : poolStages) {
                ChannelPool pool = CompletableFutures.getCompleted(poolStage.toCompletableFuture());
                boolean invalidKeyspace = pool.isInvalidKeyspace();
                if (invalidKeyspace) {
                    LOG.debug("[{}] Pool to {} reports an invalid keyspace", (Object)PoolManager.this.logPrefix, (Object)pool.getNode());
                }
                allInvalidKeyspaces &= invalidKeyspace;
                PoolManager.this.pools.put(pool.getNode(), pool);
            }
            if (allInvalidKeyspaces) {
                this.initFuture.completeExceptionally(new InvalidKeyspaceException("Invalid keyspace " + PoolManager.this.keyspace.asCql(true)));
                this.forceClose();
            } else {
                LOG.debug("[{}] Initialization complete, ready", (Object)PoolManager.this.logPrefix);
                this.initFuture.complete(null);
                this.distanceEventFilter.markReady();
                this.stateEventFilter.markReady();
            }
        }

        private void onDistanceEvent(DistanceEvent event) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            this.distanceEventFilter.accept(event);
        }

        private void onStateEvent(NodeStateEvent event) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            this.stateEventFilter.accept(event);
        }

        private void processDistanceEvent(DistanceEvent event) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            DefaultNode node = event.node;
            NodeDistance newDistance = event.distance;
            if (this.pending.containsKey(node)) {
                this.pendingDistanceEvents.put(node, event);
            } else if (newDistance == NodeDistance.IGNORED) {
                ChannelPool pool = (ChannelPool)PoolManager.this.pools.remove(node);
                if (pool != null) {
                    LOG.debug("[{}] {} became IGNORED, destroying pool", (Object)PoolManager.this.logPrefix, (Object)node);
                    pool.closeAsync().exceptionally(error -> {
                        Loggers.warnWithException(LOG, "[{}] Error closing pool", PoolManager.this.logPrefix, error);
                        return null;
                    });
                }
            } else {
                NodeState state = node.getState();
                if (state == NodeState.FORCED_DOWN) {
                    LOG.warn("[{}] {} became {} but it is FORCED_DOWN, ignoring", new Object[]{PoolManager.this.logPrefix, node, newDistance});
                    return;
                }
                ChannelPool pool = (ChannelPool)PoolManager.this.pools.get(node);
                if (pool == null) {
                    LOG.debug("[{}] {} became {} and no pool found, initializing it", new Object[]{PoolManager.this.logPrefix, node, newDistance});
                    CompletionStage<ChannelPool> poolFuture = this.channelPoolFactory.init(node, PoolManager.this.keyspace, newDistance, this.context, PoolManager.this.logPrefix);
                    this.pending.put(node, poolFuture);
                    poolFuture.thenAcceptAsync(this::onPoolInitialized, PoolManager.this.adminExecutor).exceptionally(UncaughtExceptions::log);
                } else {
                    LOG.debug("[{}] {} became {}, resizing it", new Object[]{PoolManager.this.logPrefix, node, newDistance});
                    pool.resize(newDistance);
                }
            }
        }

        private void processStateEvent(NodeStateEvent event) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            DefaultNode node = event.node;
            NodeState oldState = event.oldState;
            NodeState newState = event.newState;
            if (this.pending.containsKey(node)) {
                this.pendingStateEvents.put(node, event);
            } else if (newState == null || newState == NodeState.FORCED_DOWN) {
                ChannelPool pool = (ChannelPool)PoolManager.this.pools.remove(node);
                if (pool != null) {
                    LOG.debug("[{}] {} was {}, destroying pool", new Object[]{PoolManager.this.logPrefix, node, newState == null ? "removed" : newState.name()});
                    pool.closeAsync().exceptionally(error -> {
                        Loggers.warnWithException(LOG, "[{}] Error closing pool", PoolManager.this.logPrefix, error);
                        return null;
                    });
                }
            } else if (oldState == NodeState.FORCED_DOWN && newState == NodeState.UP && node.getDistance() != NodeDistance.IGNORED) {
                LOG.debug("[{}] {} was forced back UP, initializing pool", (Object)PoolManager.this.logPrefix, (Object)node);
                this.createOrReconnectPool(node);
            }
        }

        private void onTopologyEvent(TopologyEvent event) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (event.type == TopologyEvent.Type.SUGGEST_UP) {
                this.context.getMetadataManager().getMetadata().findNode(event.broadcastRpcAddress).ifPresent(node -> {
                    if (node.getDistance() != NodeDistance.IGNORED) {
                        LOG.debug("[{}] Received a SUGGEST_UP event for {}, reconnecting pool now", (Object)PoolManager.this.logPrefix, node);
                        ChannelPool pool = (ChannelPool)PoolManager.this.pools.get(node);
                        if (pool != null) {
                            pool.reconnectNow();
                        }
                    }
                });
            }
        }

        private void createOrReconnectPool(Node node) {
            ChannelPool pool = (ChannelPool)PoolManager.this.pools.get(node);
            if (pool == null) {
                CompletionStage<ChannelPool> poolFuture = this.channelPoolFactory.init(node, PoolManager.this.keyspace, node.getDistance(), this.context, PoolManager.this.logPrefix);
                this.pending.put(node, poolFuture);
                poolFuture.thenAcceptAsync(this::onPoolInitialized, PoolManager.this.adminExecutor).exceptionally(UncaughtExceptions::log);
            } else {
                pool.reconnectNow();
            }
        }

        private void onPoolInitialized(ChannelPool pool) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            Node node = pool.getNode();
            if (this.closeWasCalled) {
                LOG.debug("[{}] Session closed while a pool to {} was initializing, closing it", (Object)PoolManager.this.logPrefix, (Object)node);
                pool.forceCloseAsync();
            } else {
                LOG.debug("[{}] New pool to {} initialized", (Object)PoolManager.this.logPrefix, (Object)node);
                if (Objects.equals(PoolManager.this.keyspace, pool.getInitialKeyspaceName())) {
                    this.reprepareStatements(pool);
                } else {
                    pool.setKeyspace(PoolManager.this.keyspace).handleAsync((result, error) -> {
                        if (error != null) {
                            Loggers.warnWithException(LOG, "Error while switching keyspace to " + PoolManager.this.keyspace, error);
                        }
                        this.reprepareStatements(pool);
                        return null;
                    }, PoolManager.this.adminExecutor);
                }
            }
        }

        private void reprepareStatements(ChannelPool pool) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (PoolManager.this.config.getBoolean(DefaultDriverOption.REPREPARE_ENABLED)) {
                new ReprepareOnUp(PoolManager.this.logPrefix + "|" + pool.getNode().getEndPoint(), pool, PoolManager.this.adminExecutor, PoolManager.this.repreparePayloads, this.context, () -> RunOrSchedule.on(PoolManager.this.adminExecutor, () -> this.onPoolReady(pool))).start();
            } else {
                LOG.debug("[{}] Reprepare on up is disabled, skipping", (Object)PoolManager.this.logPrefix);
                this.onPoolReady(pool);
            }
        }

        private void onPoolReady(ChannelPool pool) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            Node node = pool.getNode();
            this.pending.remove(node);
            PoolManager.this.pools.put(node, pool);
            DistanceEvent distanceEvent = this.pendingDistanceEvents.remove(node);
            NodeStateEvent stateEvent = this.pendingStateEvents.remove(node);
            if (stateEvent != null && stateEvent.newState == NodeState.FORCED_DOWN) {
                LOG.debug("[{}] Received {} while the pool was initializing, processing it now", (Object)PoolManager.this.logPrefix, (Object)stateEvent);
                this.processStateEvent(stateEvent);
            } else if (distanceEvent != null) {
                LOG.debug("[{}] Received {} while the pool was initializing, processing it now", (Object)PoolManager.this.logPrefix, (Object)distanceEvent);
                this.processDistanceEvent(distanceEvent);
            }
        }

        private void setKeyspace(CqlIdentifier newKeyspace, CompletableFuture<Void> doneFuture) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (this.closeWasCalled) {
                doneFuture.complete(null);
                return;
            }
            LOG.debug("[{}] Switching to keyspace {}", (Object)PoolManager.this.logPrefix, (Object)newKeyspace);
            ArrayList poolReadyFutures = Lists.newArrayListWithCapacity(PoolManager.this.pools.size());
            for (ChannelPool pool : PoolManager.this.pools.values()) {
                poolReadyFutures.add(pool.setKeyspace(newKeyspace));
            }
            CompletableFutures.completeFrom(CompletableFutures.allDone(poolReadyFutures), doneFuture);
        }

        private void close() {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (this.closeWasCalled) {
                return;
            }
            this.closeWasCalled = true;
            LOG.debug("[{}] Starting shutdown", (Object)PoolManager.this.logPrefix);
            this.context.getEventBus().unregister(this.distanceListenerKey, DistanceEvent.class);
            this.context.getEventBus().unregister(this.stateListenerKey, NodeStateEvent.class);
            this.context.getEventBus().unregister(this.topologyListenerKey, TopologyEvent.class);
            ArrayList closePoolStages = new ArrayList(PoolManager.this.pools.size());
            for (ChannelPool pool : PoolManager.this.pools.values()) {
                closePoolStages.add(pool.closeAsync());
            }
            CompletableFutures.whenAllDone(closePoolStages, () -> this.onAllPoolsClosed(closePoolStages), PoolManager.this.adminExecutor);
        }

        private void forceClose() {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            if (this.forceCloseWasCalled) {
                return;
            }
            this.forceCloseWasCalled = true;
            LOG.debug("[{}] Starting forced shutdown (was {}closed before)", (Object)PoolManager.this.logPrefix, (Object)(this.closeWasCalled ? "" : "not "));
            if (this.closeWasCalled) {
                for (ChannelPool pool : PoolManager.this.pools.values()) {
                    pool.forceCloseAsync();
                }
            } else {
                ArrayList closePoolStages = new ArrayList(PoolManager.this.pools.size());
                for (ChannelPool pool : PoolManager.this.pools.values()) {
                    closePoolStages.add(pool.forceCloseAsync());
                }
                CompletableFutures.whenAllDone(closePoolStages, () -> this.onAllPoolsClosed(closePoolStages), PoolManager.this.adminExecutor);
            }
        }

        private void onAllPoolsClosed(List<CompletionStage<Void>> closePoolStages) {
            assert (PoolManager.this.adminExecutor.inEventLoop());
            Throwable firstError = null;
            for (CompletionStage<Void> closePoolStage : closePoolStages) {
                CompletableFuture<Void> closePoolFuture = closePoolStage.toCompletableFuture();
                assert (closePoolFuture.isDone());
                if (!closePoolFuture.isCompletedExceptionally()) continue;
                Throwable error = CompletableFutures.getFailed(closePoolFuture);
                if (firstError == null) {
                    firstError = error;
                    continue;
                }
                firstError.addSuppressed(error);
            }
            if (firstError != null) {
                this.closeFuture.completeExceptionally(firstError);
            } else {
                LOG.debug("[{}] Shutdown complete", (Object)PoolManager.this.logPrefix);
                this.closeFuture.complete(null);
            }
        }
    }
}

