package io.choerodon.eureka.event;

import com.netflix.appinfo.InstanceInfo;
import io.choerodon.eureka.event.endpoint.EurekaEventService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.remoting.RemoteAccessException;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClientException;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/choerodon/eureka/event/AbstractEurekaEventObserver.class */
public abstract class AbstractEurekaEventObserver implements Observer, EurekaEventService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEurekaEventObserver.class);
    private final AntPathMatcher matcher = new AntPathMatcher();
    private final LinkedList<EurekaEventPayload> eventCache = new SynchronizedLinkedList(new LinkedList());

    @Autowired
    private EurekaEventProperties properties;

    public AbstractEurekaEventObserver() {
        EurekaEventHandler.getInstance().getObservable().addObserver(this);
    }

    public void setProperties(EurekaEventProperties eurekaEventProperties) {
        this.properties = eurekaEventProperties;
    }

    public List<EurekaEventPayload> getEventCache() {
        return this.eventCache;
    }

    @Override // io.choerodon.eureka.event.endpoint.EurekaEventService
    public List<EurekaEventPayload> unfinishedEvents(String str) {
        if (StringUtils.isEmpty(str)) {
            return this.eventCache;
        }
        ArrayList arrayList = new ArrayList(4);
        Iterator<EurekaEventPayload> it = this.eventCache.iterator();
        while (it.hasNext()) {
            EurekaEventPayload next = it.next();
            if (str.equalsIgnoreCase(next.getAppName())) {
                arrayList.add(next);
            }
        }
        return arrayList;
    }

    @Override // io.choerodon.eureka.event.endpoint.EurekaEventService
    public List<EurekaEventPayload> retryEvents(String str, String str2) {
        return !StringUtils.isEmpty(str) ? Collections.singletonList(manualRetryEventById(str)) : !StringUtils.isEmpty(str2) ? manualRetryEventsByService(str2) : manualRetryAllEvents();
    }

    private EurekaEventPayload manualRetryEventById(String str) {
        for (EurekaEventPayload eurekaEventPayload : new ArrayList(this.eventCache)) {
            if (str.equals(eurekaEventPayload.getId())) {
                consumerEvent(eurekaEventPayload);
                return eurekaEventPayload;
            }
        }
        return null;
    }

    private List<EurekaEventPayload> manualRetryEventsByService(String str) {
        ArrayList arrayList = new ArrayList();
        for (EurekaEventPayload eurekaEventPayload : new ArrayList(this.eventCache)) {
            if (str.equalsIgnoreCase(eurekaEventPayload.getAppName())) {
                consumerEvent(eurekaEventPayload);
                arrayList.add(eurekaEventPayload);
            }
        }
        return arrayList;
    }

    private List<EurekaEventPayload> manualRetryAllEvents() {
        ArrayList arrayList = new ArrayList();
        for (EurekaEventPayload eurekaEventPayload : new ArrayList(this.eventCache)) {
            consumerEvent(eurekaEventPayload);
            arrayList.add(eurekaEventPayload);
        }
        return arrayList;
    }

    @Override // java.util.Observer
    public void update(Observable observable, Object obj) {
        if (obj instanceof EurekaEventPayload) {
            EurekaEventPayload eurekaEventPayload = (EurekaEventPayload) obj;
            if (Arrays.stream(this.properties.getSkipServices()).anyMatch(str -> {
                return this.matcher.match(str, eurekaEventPayload.getAppName());
            })) {
                LOGGER.info("Skip event that is skipServices, {}", eurekaEventPayload);
                return;
            }
            if (InstanceInfo.InstanceStatus.UP.name().equals(eurekaEventPayload.getStatus())) {
                LOGGER.info("Receive UP event, payload: {}", eurekaEventPayload);
            } else {
                LOGGER.info("Receive DOWN event, payload: {}", eurekaEventPayload);
            }
            putPayloadInCache(eurekaEventPayload);
            consumerEventWithAutoRetry(eurekaEventPayload);
        }
    }

    public void putPayloadInCache(EurekaEventPayload eurekaEventPayload) {
        if (this.eventCache.size() >= this.properties.getMaxCacheSize().intValue()) {
            LOGGER.warn("Remove first payload because of maxCacheSize limit, remove payload: {}", this.eventCache.getFirst());
            this.eventCache.removeFirst();
        }
        this.eventCache.add(eurekaEventPayload);
    }

    private void consumerEvent(EurekaEventPayload eurekaEventPayload) {
        if (InstanceInfo.InstanceStatus.UP.name().equals(eurekaEventPayload.getStatus())) {
            LOGGER.debug("Consumer UP event, payload: {}", eurekaEventPayload);
            receiveUpEvent(eurekaEventPayload);
        } else {
            LOGGER.debug("Consumer DOWN event, payload: {}", eurekaEventPayload);
            receiveDownEvent(eurekaEventPayload);
        }
        this.eventCache.remove(eurekaEventPayload);
    }

    private void consumerEventWithAutoRetry(EurekaEventPayload eurekaEventPayload) {
        rx.Observable.just(eurekaEventPayload).map(eurekaEventPayload2 -> {
            consumerEvent(eurekaEventPayload);
            return eurekaEventPayload;
        }).retryWhen(observable -> {
            return observable.zipWith(rx.Observable.range(1, this.properties.getRetryTime().intValue()), (th, num) -> {
                if (num.intValue() >= this.properties.getRetryTime().intValue()) {
                    if ((th instanceof RemoteAccessException) || (th instanceof RestClientException)) {
                        LOGGER.warn("error.eurekaEventObserver.fetchError, payload {}", eurekaEventPayload, th);
                    } else {
                        LOGGER.warn("error.eurekaEventObserver.consumerError, payload {}", eurekaEventPayload, th);
                    }
                }
                return num;
            }).flatMap(num2 -> {
                return rx.Observable.timer(this.properties.getRetryInterval().intValue(), TimeUnit.SECONDS);
            });
        }).subscribeOn(Schedulers.io()).subscribe(eurekaEventPayload3 -> {
        });
    }

    public abstract void receiveUpEvent(EurekaEventPayload eurekaEventPayload);

    public abstract void receiveDownEvent(EurekaEventPayload eurekaEventPayload);
}
