package org.thingsboard.server.actors.rpc;

import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.rpc.RpcSessionActor;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.common.msg.cluster.ServerType;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:org/thingsboard/server/actors/rpc/RpcManagerActor.class */
public class RpcManagerActor extends ContextAwareActor {
    private final Map<ServerAddress, SessionActorInfo> sessionActors;
    private final Map<ServerAddress, Queue<ClusterAPIProtos.ClusterMessage>> pendingMsgs;
    private final ServerAddress instance;
    private final SupervisorStrategy strategy;

    /* loaded from: input_file:org/thingsboard/server/actors/rpc/RpcManagerActor$ActorCreator.class */
    public static class ActorCreator extends ContextBasedCreator<RpcManagerActor> {
        private static final long serialVersionUID = 1;

        public ActorCreator(ActorSystemContext actorSystemContext) {
            super(actorSystemContext);
        }

        /* renamed from: create, reason: merged with bridge method [inline-methods] */
        public RpcManagerActor m11create() {
            return new RpcManagerActor(this.context);
        }
    }

    private RpcManagerActor(ActorSystemContext actorSystemContext) {
        super(actorSystemContext);
        this.strategy = new OneForOneStrategy(3, Duration.create("1 minute"), th -> {
            this.log.warn("Unknown failure", th);
            return SupervisorStrategy.resume();
        });
        this.sessionActors = new HashMap();
        this.pendingMsgs = new HashMap();
        this.instance = actorSystemContext.getDiscoveryService().getCurrentServer().getServerAddress();
        actorSystemContext.getDiscoveryService().getOtherServers().stream().filter(serverInstance -> {
            return serverInstance.getServerAddress().compareTo(this.instance) > 0;
        }).forEach(serverInstance2 -> {
            onCreateSessionRequest(new RpcSessionCreateRequestMsg(UUID.randomUUID(), serverInstance2.getServerAddress(), null));
        });
    }

    @Override // org.thingsboard.server.actors.service.ContextAwareActor
    protected boolean process(TbActorMsg tbActorMsg) {
        return false;
    }

    @Override // org.thingsboard.server.actors.service.ContextAwareActor
    public void onReceive(Object obj) {
        if (obj instanceof ClusterAPIProtos.ClusterMessage) {
            onMsg((ClusterAPIProtos.ClusterMessage) obj);
            return;
        }
        if (obj instanceof RpcBroadcastMsg) {
            onMsg((RpcBroadcastMsg) obj);
            return;
        }
        if (obj instanceof RpcSessionCreateRequestMsg) {
            onCreateSessionRequest((RpcSessionCreateRequestMsg) obj);
            return;
        }
        if (obj instanceof RpcSessionConnectedMsg) {
            onSessionConnected((RpcSessionConnectedMsg) obj);
            return;
        }
        if (obj instanceof RpcSessionDisconnectedMsg) {
            onSessionDisconnected((RpcSessionDisconnectedMsg) obj);
        } else if (obj instanceof RpcSessionClosedMsg) {
            onSessionClosed((RpcSessionClosedMsg) obj);
        } else if (obj instanceof ClusterEventMsg) {
            onClusterEvent((ClusterEventMsg) obj);
        }
    }

    private void onMsg(RpcBroadcastMsg rpcBroadcastMsg) {
        this.log.debug("Forwarding msg to session actors {}", rpcBroadcastMsg);
        this.sessionActors.keySet().forEach(serverAddress -> {
            onMsg(rpcBroadcastMsg.getMsg().m99toBuilder().setServerAddress(ClusterAPIProtos.ServerAddress.newBuilder().setHost(serverAddress.getHost()).setPort(serverAddress.getPort()).build()).m135build());
        });
        this.pendingMsgs.values().forEach(queue -> {
            queue.add(rpcBroadcastMsg.getMsg());
        });
    }

    private void onMsg(ClusterAPIProtos.ClusterMessage clusterMessage) {
        if (!clusterMessage.hasServerAddress()) {
            this.log.warn("Cluster msg doesn't have server address [{}]", clusterMessage);
            return;
        }
        ServerAddress serverAddress = new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE);
        SessionActorInfo sessionActorInfo = this.sessionActors.get(serverAddress);
        if (sessionActorInfo != null) {
            this.log.debug("{} Forwarding msg to session actor: {}", serverAddress, clusterMessage);
            sessionActorInfo.getActor().tell(clusterMessage, ActorRef.noSender());
            return;
        }
        this.log.debug("{} Storing msg to pending queue: {}", serverAddress, clusterMessage);
        Queue<ClusterAPIProtos.ClusterMessage> queue = this.pendingMsgs.get(serverAddress);
        if (queue == null) {
            queue = new LinkedList();
            this.pendingMsgs.put(new ServerAddress(clusterMessage.getServerAddress().getHost(), clusterMessage.getServerAddress().getPort(), ServerType.CORE), queue);
        }
        queue.add(clusterMessage);
    }

    public void postStop() {
        this.sessionActors.clear();
        this.pendingMsgs.clear();
    }

    private void onClusterEvent(ClusterEventMsg clusterEventMsg) {
        ServerAddress serverAddress = clusterEventMsg.getServerAddress();
        if (serverAddress.compareTo(this.instance) > 0) {
            if (clusterEventMsg.isAdded()) {
                onCreateSessionRequest(new RpcSessionCreateRequestMsg(UUID.randomUUID(), serverAddress, null));
            } else {
                onSessionClose(false, serverAddress);
            }
        }
    }

    private void onSessionConnected(RpcSessionConnectedMsg rpcSessionConnectedMsg) {
        register(rpcSessionConnectedMsg.getRemoteAddress(), rpcSessionConnectedMsg.getId(), context().sender());
    }

    private void onSessionDisconnected(RpcSessionDisconnectedMsg rpcSessionDisconnectedMsg) {
        onSessionClose(rpcSessionDisconnectedMsg.isClient() && isRegistered(rpcSessionDisconnectedMsg.getRemoteAddress()), rpcSessionDisconnectedMsg.getRemoteAddress());
    }

    private void onSessionClosed(RpcSessionClosedMsg rpcSessionClosedMsg) {
        onSessionClose(rpcSessionClosedMsg.isClient() && isRegistered(rpcSessionClosedMsg.getRemoteAddress()), rpcSessionClosedMsg.getRemoteAddress());
    }

    private boolean isRegistered(ServerAddress serverAddress) {
        Iterator<ServerInstance> it = this.systemContext.getDiscoveryService().getOtherServers().iterator();
        while (it.hasNext()) {
            if (it.next().getServerAddress().equals(serverAddress)) {
                return true;
            }
        }
        return false;
    }

    private void onSessionClose(boolean z, ServerAddress serverAddress) {
        this.log.info("[{}] session closed. Should reconnect: {}", serverAddress, Boolean.valueOf(z));
        SessionActorInfo sessionActorInfo = this.sessionActors.get(serverAddress);
        if (sessionActorInfo == null || context().sender() == null || !context().sender().equals(sessionActorInfo.actor)) {
            return;
        }
        context().stop(sessionActorInfo.actor);
        this.sessionActors.remove(serverAddress);
        this.pendingMsgs.remove(serverAddress);
        if (z) {
            onCreateSessionRequest(new RpcSessionCreateRequestMsg(sessionActorInfo.sessionId, serverAddress, null));
        }
    }

    private void onCreateSessionRequest(RpcSessionCreateRequestMsg rpcSessionCreateRequestMsg) {
        if (rpcSessionCreateRequestMsg.getRemoteAddress() == null) {
            createSessionActor(rpcSessionCreateRequestMsg);
        } else {
            if (this.sessionActors.containsKey(rpcSessionCreateRequestMsg.getRemoteAddress())) {
                return;
            }
            register(rpcSessionCreateRequestMsg.getRemoteAddress(), rpcSessionCreateRequestMsg.getMsgUid(), createSessionActor(rpcSessionCreateRequestMsg));
        }
    }

    private void register(ServerAddress serverAddress, UUID uuid, ActorRef actorRef) {
        this.sessionActors.put(serverAddress, new SessionActorInfo(uuid, actorRef));
        this.log.info("[{}][{}] Registering session actor.", serverAddress, uuid);
        Queue<ClusterAPIProtos.ClusterMessage> remove = this.pendingMsgs.remove(serverAddress);
        if (remove == null) {
            this.log.info("[{}][{}] No pending messages to forward.", serverAddress, uuid);
        } else {
            this.log.info("[{}][{}] Forwarding {} pending messages.", new Object[]{serverAddress, uuid, Integer.valueOf(remove.size())});
            remove.forEach(clusterMessage -> {
                actorRef.tell(new RpcSessionTellMsg(clusterMessage), ActorRef.noSender());
            });
        }
    }

    private ActorRef createSessionActor(RpcSessionCreateRequestMsg rpcSessionCreateRequestMsg) {
        this.log.info("[{}] Creating session actor.", rpcSessionCreateRequestMsg.getMsgUid());
        ActorRef actorOf = context().actorOf(Props.create(new RpcSessionActor.ActorCreator(this.systemContext, rpcSessionCreateRequestMsg.getMsgUid())).withDispatcher(DefaultActorService.RPC_DISPATCHER_NAME));
        actorOf.tell(rpcSessionCreateRequestMsg, context().self());
        return actorOf;
    }

    public SupervisorStrategy supervisorStrategy() {
        return this.strategy;
    }
}
