package com.el.edp.cache.support;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.api.listener.StatusListener;
import org.redisson.connection.ConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/el/edp/cache/support/EdpNearCacheChangePropagator.class */
public class EdpNearCacheChangePropagator implements ConnectionListener, StatusListener {
    private static final Logger log = LoggerFactory.getLogger(EdpNearCacheChangePropagator.class);
    private static final String CHANNEL_NEAR_CACHE_CHANGE = "edp:cache:change";
    private final RTopic topic;
    private volatile boolean topicAvailable;
    private LinkedBlockingQueue<EdpNearCacheChange> changesUnpropagated = new LinkedBlockingQueue<>();
    private Map<EdpNearCacheChange, Integer> changesSubscribed = new ConcurrentHashMap();

    public EdpNearCacheChangePropagator(RedissonClient redissonClient) {
        redissonClient.getNodesGroup().addConnectionListener(this);
        RTopic topic = redissonClient.getTopic(CHANNEL_NEAR_CACHE_CHANGE);
        topic.addListener(this);
        this.topic = topic;
    }

    public void registerCacheChangeConsumer(MessageListener<? extends EdpNearCacheChange> messageListener) {
        this.topic.addListener(EdpNearCacheChange.class, messageListener);
    }

    public void registerChangeSubscribed(EdpNearCacheChange edpNearCacheChange) {
        this.changesSubscribed.compute(edpNearCacheChange, (edpNearCacheChange2, num) -> {
            return Integer.valueOf(num == null ? 1 : num.intValue() + 1);
        });
    }

    private boolean unregisterChangeSubscribed(EdpNearCacheChange edpNearCacheChange) {
        return this.changesSubscribed.compute(edpNearCacheChange, (edpNearCacheChange2, num) -> {
            if (num == null || num.intValue() == 0) {
                return null;
            }
            return Integer.valueOf(num.intValue() - 1);
        }) == null;
    }

    public void propagate(EdpNearCacheChange edpNearCacheChange) {
        if (!this.topicAvailable) {
            log.info("[EDP-NCS] coherent topic is BREAK, cache change will been kept: {}", edpNearCacheChange);
            this.changesUnpropagated.add(edpNearCacheChange);
        } else if (!unregisterChangeSubscribed(edpNearCacheChange)) {
            log.info("[EDP-NCS] INBOUND cache change SHOULD NOT be propagated: {}", edpNearCacheChange);
        } else {
            this.topic.publishAsync(edpNearCacheChange);
            log.trace("[EDP-NCS] OUTBOUND cache change: {}", edpNearCacheChange);
        }
    }

    public void onConnect(InetSocketAddress inetSocketAddress) {
        log.info("[EDP-NCS] redis server was (re)CONNECTED: {}", inetSocketAddress);
    }

    public void onDisconnect(InetSocketAddress inetSocketAddress) {
        this.topicAvailable = false;
        log.info("[EDP-NCS] redis server was DISCONNECTED: {}", inetSocketAddress);
    }

    public void onSubscribe(String str) {
        this.topicAvailable = true;
        log.info("[EDP-NCS] coherent topic is (re)READY: {}", str);
        propagateKeptChanges();
    }

    private void propagateKeptChanges() {
        if (this.changesUnpropagated.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList(this.changesUnpropagated.size());
        if (this.changesUnpropagated.drainTo(arrayList) > 0) {
            arrayList.forEach(this::propagate);
        }
    }

    public void onUnsubscribe(String str) {
    }
}
