package com.el.core.event.sock;

import com.el.core.event.EventConsumer;
import com.el.core.event.EventMessage;
import com.el.core.event.EventMessageConverter;
import com.el.core.event.EventType;
import java.io.IOException;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

/* loaded from: input_file:com/el/core/event/sock/WebSocketEventConsumer.class */
public class WebSocketEventConsumer implements EventConsumer {
    private static final Logger log = LoggerFactory.getLogger(WebSocketEventConsumer.class);
    private final WebSocketSessionManager sessionManager;
    private final EventMessageConverter messageConverter;

    public WebSocketEventConsumer(WebSocketSessionManager webSocketSessionManager, EventMessageConverter eventMessageConverter) {
        this.sessionManager = webSocketSessionManager;
        this.messageConverter = eventMessageConverter;
    }

    protected boolean supports(EventType eventType) {
        return eventType.byInbox();
    }

    @Override // com.el.core.event.EventConsumer
    public void consume(EventMessage eventMessage) {
        if (supports(eventMessage.getType())) {
            Set<?> recipients = eventMessage.getRecipients();
            if (recipients.isEmpty()) {
                log.trace("[CORE-SOCK] event WILL be BROADCAST...");
                send(eventMessage, this.sessionManager.sessions());
                return;
            }
            Stream<?> stream = recipients.stream();
            EventMessageConverter eventMessageConverter = this.messageConverter;
            Objects.requireNonNull(eventMessageConverter);
            Set set = (Set) stream.map(eventMessageConverter::toPrincipalName).collect(Collectors.toSet());
            log.trace("[CORE-SOCK] event WILL be SENT to {}...", set);
            send(eventMessage, this.sessionManager.sessionsOf(set));
        }
    }

    private void send(EventMessage eventMessage, Stream<WebSocketSession> stream) {
        stream.forEach(webSocketSession -> {
            if (webSocketSession.isOpen()) {
                try {
                    log.trace("[CORE-SOCK] SENDING event to wss-{} of usr-{} at {} ...", new Object[]{webSocketSession.getId(), webSocketSession.getPrincipal().getName(), webSocketSession.getRemoteAddress()});
                    webSocketSession.sendMessage(new TextMessage(this.messageConverter.toJSON(eventMessage)));
                    log.trace("[CORE-SOCK] event SENT.");
                } catch (IOException e) {
                    log.error("[CORE-SOCK] event sent FAILED: {}", e.getMessage());
                }
            }
        });
    }
}
