package org.thingsboard.server.service.rpc;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;

@Service
@TbRuleEngineComponent
/* loaded from: input_file:org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.class */
public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTbRuleEngineRpcService.class);
    private final PartitionService partitionService;
    private final TbClusterService clusterService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final RpcService rpcService;
    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> toDeviceRpcRequests = new ConcurrentHashMap();
    private Optional<TbCoreDeviceRpcService> tbCoreRpcService;
    private ScheduledExecutorService scheduler;
    private String serviceId;

    public DefaultTbRuleEngineRpcService(PartitionService partitionService, TbClusterService tbClusterService, TbServiceInfoProvider tbServiceInfoProvider, RpcService rpcService) {
        this.partitionService = partitionService;
        this.clusterService = tbClusterService;
        this.serviceInfoProvider = tbServiceInfoProvider;
        this.rpcService = rpcService;
    }

    @Autowired(required = false)
    public void setTbCoreRpcService(Optional<TbCoreDeviceRpcService> optional) {
        this.tbCoreRpcService = optional;
    }

    @PostConstruct
    public void initExecutor() {
        this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("rule-engine-rpc-scheduler");
        this.serviceId = this.serviceInfoProvider.getServiceId();
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    public void sendRpcReplyToDevice(String str, UUID uuid, int i, String str2) {
        if (str == null || str.isEmpty()) {
            log.trace("sendRpcReplyToDevice: skipping message without serviceId [{}], sessionId[{}], requestId[{}], body[{}]", new Object[]{str, uuid, Integer.valueOf(i), str2});
        } else {
            this.clusterService.pushNotificationToTransport(str, TransportProtos.ToTransportMsg.newBuilder().setSessionIdMSB(uuid.getMostSignificantBits()).setSessionIdLSB(uuid.getLeastSignificantBits()).setToServerResponse(TransportProtos.ToServerRpcResponseMsg.newBuilder().setRequestId(i).setPayload(str2).build()).build(), (TbQueueCallback) null);
        }
    }

    public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest ruleEngineDeviceRpcRequest, Consumer<RuleEngineDeviceRpcResponse> consumer) {
        forwardRpcRequestToDeviceActor(new ToDeviceRpcRequest(ruleEngineDeviceRpcRequest.getRequestUUID(), ruleEngineDeviceRpcRequest.getTenantId(), ruleEngineDeviceRpcRequest.getDeviceId(), ruleEngineDeviceRpcRequest.isOneway(), ruleEngineDeviceRpcRequest.getExpirationTime(), new ToDeviceRpcRequestBody(ruleEngineDeviceRpcRequest.getMethod(), ruleEngineDeviceRpcRequest.getBody()), ruleEngineDeviceRpcRequest.isPersisted(), ruleEngineDeviceRpcRequest.getRetries(), ruleEngineDeviceRpcRequest.getAdditionalInfo()), fromDeviceRpcResponse -> {
            String originServiceId = ruleEngineDeviceRpcRequest.getOriginServiceId();
            if (ruleEngineDeviceRpcRequest.isRestApiCall() && originServiceId != null) {
                sendRpcResponseToTbCore(originServiceId, fromDeviceRpcResponse);
            }
            consumer.accept(RuleEngineDeviceRpcResponse.builder().deviceId(ruleEngineDeviceRpcRequest.getDeviceId()).requestId(ruleEngineDeviceRpcRequest.getRequestId()).error(fromDeviceRpcResponse.getError()).response(fromDeviceRpcResponse.getResponse()).build());
        });
    }

    public void sendRestApiCallReply(String str, UUID uuid, TbMsg tbMsg) {
        this.clusterService.pushNotificationToCore(str, TransportProtos.RestApiCallResponseMsgProto.newBuilder().setRequestIdMSB(uuid.getMostSignificantBits()).setRequestIdLSB(uuid.getLeastSignificantBits()).setResponse(TbMsg.toByteString(tbMsg)).build(), (TbQueueCallback) null);
    }

    public Rpc findRpcById(TenantId tenantId, RpcId rpcId) {
        return this.rpcService.findById(tenantId, rpcId);
    }

    @Override // org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService
    public void processRpcResponseFromDevice(FromDeviceRpcResponse fromDeviceRpcResponse) {
        log.trace("[{}] Received response to server-side RPC request from Core RPC Service", fromDeviceRpcResponse.getId());
        UUID id = fromDeviceRpcResponse.getId();
        Consumer<FromDeviceRpcResponse> remove = this.toDeviceRpcRequests.remove(id);
        if (remove != null) {
            this.scheduler.submit(() -> {
                remove.accept(fromDeviceRpcResponse);
            });
        } else {
            log.trace("[{}] Unknown or stale rpc response received [{}]", id, fromDeviceRpcResponse);
        }
    }

    private void forwardRpcRequestToDeviceActor(ToDeviceRpcRequest toDeviceRpcRequest, Consumer<FromDeviceRpcResponse> consumer) {
        log.trace("[{}][{}] Processing local rpc call to device actor [{}]", new Object[]{toDeviceRpcRequest.getTenantId(), toDeviceRpcRequest.getId(), toDeviceRpcRequest.getDeviceId()});
        UUID id = toDeviceRpcRequest.getId();
        this.toDeviceRpcRequests.put(id, consumer);
        sendRpcRequestToDevice(toDeviceRpcRequest);
        scheduleTimeout(toDeviceRpcRequest, id);
    }

    private void sendRpcRequestToDevice(ToDeviceRpcRequest toDeviceRpcRequest) {
        TopicPartitionInfo resolve = this.partitionService.resolve(ServiceType.TB_CORE, toDeviceRpcRequest.getTenantId(), toDeviceRpcRequest.getDeviceId());
        ToDeviceRpcRequestActorMsg toDeviceRpcRequestActorMsg = new ToDeviceRpcRequestActorMsg(this.serviceId, toDeviceRpcRequest);
        if (!resolve.isMyPartition()) {
            log.trace("[{}] Forwarding msg {} to queue actor!", toDeviceRpcRequest.getDeviceId(), toDeviceRpcRequest);
            this.clusterService.pushMsgToCore(toDeviceRpcRequestActorMsg, (TbQueueCallback) null);
            return;
        }
        log.trace("[{}] Forwarding msg {} to device actor!", toDeviceRpcRequest.getDeviceId(), toDeviceRpcRequest);
        if (this.tbCoreRpcService.isPresent()) {
            this.tbCoreRpcService.get().forwardRpcRequestToDeviceActor(toDeviceRpcRequestActorMsg);
        } else {
            log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
        }
    }

    private void sendRpcResponseToTbCore(String str, FromDeviceRpcResponse fromDeviceRpcResponse) {
        if (!this.serviceId.equals(str)) {
            this.clusterService.pushNotificationToCore(str, fromDeviceRpcResponse, (TbQueueCallback) null);
        } else if (this.tbCoreRpcService.isPresent()) {
            this.tbCoreRpcService.get().processRpcResponseFromRuleEngine(fromDeviceRpcResponse);
        } else {
            log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
        }
    }

    private void scheduleTimeout(ToDeviceRpcRequest toDeviceRpcRequest, UUID uuid) {
        long max = Math.max(0L, toDeviceRpcRequest.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1L);
        log.trace("[{}] processing the request: [{}]", Integer.valueOf(hashCode()), uuid);
        this.scheduler.schedule(() -> {
            log.trace("[{}] timeout the request: [{}]", Integer.valueOf(hashCode()), uuid);
            Consumer<FromDeviceRpcResponse> remove = this.toDeviceRpcRequests.remove(uuid);
            if (remove != null) {
                this.scheduler.submit(() -> {
                    remove.accept(new FromDeviceRpcResponse(uuid, (String) null, RpcError.TIMEOUT));
                });
            }
        }, max, TimeUnit.MILLISECONDS);
    }
}
