/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.rpc;

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RpcId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.Rpc;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.dao.rpc.RpcService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;

@Service
@TbRuleEngineComponent
public class DefaultTbRuleEngineRpcService
implements TbRuleEngineDeviceRpcService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultTbRuleEngineRpcService.class);
    private final PartitionService partitionService;
    private final TbClusterService clusterService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final RpcService rpcService;
    private final ConcurrentMap<UUID, Consumer<FromDeviceRpcResponse>> toDeviceRpcRequests = new ConcurrentHashMap<UUID, Consumer<FromDeviceRpcResponse>>();
    private Optional<TbCoreDeviceRpcService> tbCoreRpcService;
    private ScheduledExecutorService scheduler;
    private String serviceId;

    public DefaultTbRuleEngineRpcService(PartitionService partitionService, TbClusterService clusterService, TbServiceInfoProvider serviceInfoProvider, RpcService rpcService) {
        this.partitionService = partitionService;
        this.clusterService = clusterService;
        this.serviceInfoProvider = serviceInfoProvider;
        this.rpcService = rpcService;
    }

    @Autowired(required=false)
    public void setTbCoreRpcService(Optional<TbCoreDeviceRpcService> tbCoreRpcService) {
        this.tbCoreRpcService = tbCoreRpcService;
    }

    @PostConstruct
    public void initExecutor() {
        this.scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor((String)"rule-engine-rpc-scheduler");
        this.serviceId = this.serviceInfoProvider.getServiceId();
    }

    @PreDestroy
    public void shutdownExecutor() {
        if (this.scheduler != null) {
            this.scheduler.shutdownNow();
        }
    }

    public void sendRpcReplyToDevice(String serviceId, UUID sessionId, int requestId, String body) {
        if (serviceId == null || serviceId.isEmpty()) {
            log.trace("sendRpcReplyToDevice: skipping message without serviceId [{}], sessionId[{}], requestId[{}], body[{}]", new Object[]{serviceId, sessionId, requestId, body});
            return;
        }
        TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder().setRequestId(requestId).setPayload(body).build();
        TransportProtos.ToTransportMsg msg = TransportProtos.ToTransportMsg.newBuilder().setSessionIdMSB(sessionId.getMostSignificantBits()).setSessionIdLSB(sessionId.getLeastSignificantBits()).setToServerResponse(responseMsg).build();
        this.clusterService.pushNotificationToTransport(serviceId, msg, null);
    }

    public void sendRpcRequestToDevice(RuleEngineDeviceRpcRequest src, Consumer<RuleEngineDeviceRpcResponse> consumer) {
        ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo());
        this.forwardRpcRequestToDeviceActor(request, response -> {
            String originServiceId = src.getOriginServiceId();
            if (src.isRestApiCall() && originServiceId != null) {
                this.sendRpcResponseToTbCore(originServiceId, (FromDeviceRpcResponse)response);
            }
            consumer.accept(RuleEngineDeviceRpcResponse.builder().deviceId(src.getDeviceId()).requestId(src.getRequestId()).error(response.getError()).response(response.getResponse()).build());
        });
    }

    public void sendRestApiCallReply(String serviceId, UUID requestId, TbMsg tbMsg) {
        TransportProtos.RestApiCallResponseMsgProto msg = TransportProtos.RestApiCallResponseMsgProto.newBuilder().setRequestIdMSB(requestId.getMostSignificantBits()).setRequestIdLSB(requestId.getLeastSignificantBits()).setResponseProto(TbMsg.toProto((TbMsg)tbMsg)).build();
        this.clusterService.pushNotificationToCore(serviceId, msg, null);
    }

    public Rpc findRpcById(TenantId tenantId, RpcId id) {
        return this.rpcService.findById(tenantId, id);
    }

    @Override
    public void processRpcResponseFromDevice(FromDeviceRpcResponse response) {
        log.trace("[{}] Received response to server-side RPC request from Core RPC Service", (Object)response.getId());
        UUID requestId = response.getId();
        Consumer consumer = (Consumer)this.toDeviceRpcRequests.remove(requestId);
        if (consumer != null) {
            this.scheduler.submit(() -> consumer.accept(response));
        } else {
            log.trace("[{}] Unknown or stale rpc response received [{}]", (Object)requestId, (Object)response);
        }
    }

    private void forwardRpcRequestToDeviceActor(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer) {
        log.trace("[{}][{}] Processing local rpc call to device actor [{}]", new Object[]{request.getTenantId(), request.getId(), request.getDeviceId()});
        UUID requestId = request.getId();
        this.toDeviceRpcRequests.put(requestId, responseConsumer);
        this.sendRpcRequestToDevice(request);
        this.scheduleTimeout(request, requestId);
    }

    private void sendRpcRequestToDevice(ToDeviceRpcRequest msg) {
        TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), (EntityId)msg.getDeviceId());
        ToDeviceRpcRequestActorMsg rpcMsg = new ToDeviceRpcRequestActorMsg(this.serviceId, msg);
        if (tpi.isMyPartition()) {
            log.trace("[{}] Forwarding msg {} to device actor!", (Object)msg.getDeviceId(), (Object)msg);
            if (this.tbCoreRpcService.isPresent()) {
                this.tbCoreRpcService.get().forwardRpcRequestToDeviceActor(rpcMsg);
            } else {
                log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
            }
        } else {
            log.trace("[{}] Forwarding msg {} to queue actor!", (Object)msg.getDeviceId(), (Object)msg);
            this.clusterService.pushMsgToCore((ToDeviceActorNotificationMsg)rpcMsg, null);
        }
    }

    private void sendRpcResponseToTbCore(String originServiceId, FromDeviceRpcResponse response) {
        if (this.serviceId.equals(originServiceId)) {
            if (this.tbCoreRpcService.isPresent()) {
                this.tbCoreRpcService.get().processRpcResponseFromRuleEngine(response);
            } else {
                log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds.");
            }
        } else {
            this.clusterService.pushNotificationToCore(originServiceId, response, null);
        }
    }

    private void scheduleTimeout(ToDeviceRpcRequest request, UUID requestId) {
        long timeout = Math.max(0L, request.getExpirationTime() - System.currentTimeMillis()) + TimeUnit.SECONDS.toMillis(1L);
        log.trace("[{}] processing the request: [{}]", (Object)this.hashCode(), (Object)requestId);
        this.scheduler.schedule(() -> {
            log.trace("[{}] timeout the request: [{}]", (Object)this.hashCode(), (Object)requestId);
            Consumer consumer = (Consumer)this.toDeviceRpcRequests.remove(requestId);
            if (consumer != null) {
                this.scheduler.submit(() -> consumer.accept(new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT)));
            }
        }, timeout, TimeUnit.MILLISECONDS);
    }
}

