package org.thingsboard.server.service.cluster.rpc;

import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
import org.thingsboard.server.service.cluster.discovery.ServerInstance;
import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;

@Service
/* loaded from: input_file:org/thingsboard/server/service/cluster/rpc/ClusterGrpcService.class */
public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceImplBase implements ClusterRpcService {
    private static final Logger log = LoggerFactory.getLogger(ClusterGrpcService.class);

    @Autowired
    private ServerInstanceService instanceService;

    @Autowired
    private DataDecodingEncodingService encodingService;
    private RpcMsgListener listener;
    private Server server;
    private ServerInstance instance;
    private ConcurrentMap<UUID, BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>>> pendingSessionMap = new ConcurrentHashMap();

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void init(RpcMsgListener rpcMsgListener) {
        this.listener = rpcMsgListener;
        log.info("Initializing RPC service!");
        this.instance = this.instanceService.getSelf();
        this.server = ServerBuilder.forPort(this.instance.getPort()).addService(this).build();
        log.info("Going to start RPC server using port: {}", Integer.valueOf(this.instance.getPort()));
        try {
            this.server.start();
            log.info("RPC service initialized!");
        } catch (IOException e) {
            log.error("Failed to start RPC server!", e);
            throw new RuntimeException("Failed to start RPC server!");
        }
    }

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void onSessionCreated(UUID uuid, StreamObserver<ClusterAPIProtos.ClusterMessage> streamObserver) {
        BlockingQueue<StreamObserver<ClusterAPIProtos.ClusterMessage>> remove = this.pendingSessionMap.remove(uuid);
        if (remove == null) {
            log.warn("Failed to lookup pending session!");
            return;
        }
        try {
            remove.put(streamObserver);
        } catch (InterruptedException e) {
            log.warn("Failed to report created session!");
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc.ClusterRpcServiceImplBase
    public StreamObserver<ClusterAPIProtos.ClusterMessage> handleMsgs(StreamObserver<ClusterAPIProtos.ClusterMessage> streamObserver) {
        log.info("Processing new session.");
        return createSession(new RpcSessionCreateRequestMsg(UUID.randomUUID(), null, streamObserver));
    }

    @PreDestroy
    public void stop() {
        if (this.server != null) {
            log.info("Going to onStop RPC server");
            this.server.shutdownNow();
            try {
                this.server.awaitTermination();
                log.info("RPC server stopped!");
            } catch (InterruptedException e) {
                log.warn("Failed to onStop RPC server!");
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void broadcast(RpcBroadcastMsg rpcBroadcastMsg) {
        this.listener.onBroadcastMsg(rpcBroadcastMsg);
    }

    private StreamObserver<ClusterAPIProtos.ClusterMessage> createSession(RpcSessionCreateRequestMsg rpcSessionCreateRequestMsg) {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
        this.pendingSessionMap.put(rpcSessionCreateRequestMsg.getMsgUid(), arrayBlockingQueue);
        this.listener.onRpcSessionCreateRequestMsg(rpcSessionCreateRequestMsg);
        try {
            StreamObserver<ClusterAPIProtos.ClusterMessage> streamObserver = (StreamObserver) arrayBlockingQueue.take();
            log.info("Processed new session.");
            return streamObserver;
        } catch (Exception e) {
            log.info("Failed to process session.", e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void tell(ClusterAPIProtos.ClusterMessage clusterMessage) {
        this.listener.onSendMsg(clusterMessage);
    }

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void tell(ServerAddress serverAddress, TbActorMsg tbActorMsg) {
        this.listener.onSendMsg(this.encodingService.convertToProtoDataMessage(serverAddress, tbActorMsg));
    }

    @Override // org.thingsboard.server.service.cluster.rpc.ClusterRpcService
    public void tell(ServerAddress serverAddress, ClusterAPIProtos.MessageType messageType, byte[] bArr) {
        this.listener.onSendMsg(ClusterAPIProtos.ClusterMessage.newBuilder().setServerAddress(ClusterAPIProtos.ServerAddress.newBuilder().setHost(serverAddress.getHost()).setPort(serverAddress.getPort()).build()).setMessageType(messageType).setPayload(ByteString.copyFrom(bArr)).m135build());
    }
}
