package org.thingsboard.server.common.transport.service;

import com.google.common.util.concurrent.ListenableFuture;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider;

@ConditionalOnExpression("'${transport.type:null}'=='null'")
@Service
/* loaded from: input_file:org/thingsboard/server/common/transport/service/RemoteTransportService.class */
public class RemoteTransportService extends AbstractTransportService {
    private static final Logger log = LoggerFactory.getLogger(RemoteTransportService.class);

    @Value("${kafka.rule_engine.topic}")
    private String ruleEngineTopic;

    @Value("${kafka.notifications.topic}")
    private String notificationsTopic;

    @Value("${kafka.notifications.poll_interval}")
    private int notificationsPollDuration;

    @Value("${kafka.notifications.auto_commit_interval}")
    private int notificationsAutoCommitInterval;

    @Value("${kafka.transport_api.requests_topic}")
    private String transportApiRequestsTopic;

    @Value("${kafka.transport_api.responses_topic}")
    private String transportApiResponsesTopic;

    @Value("${kafka.transport_api.max_pending_requests}")
    private long maxPendingRequests;

    @Value("${kafka.transport_api.max_requests_timeout}")
    private long maxRequestsTimeout;

    @Value("${kafka.transport_api.response_poll_interval}")
    private int responsePollDuration;

    @Value("${kafka.transport_api.response_auto_commit_interval}")
    private int autoCommitInterval;

    @Autowired
    private TbKafkaSettings kafkaSettings;

    @Autowired
    private TbNodeIdProvider nodeIdProvider;
    private TbKafkaRequestTemplate<TransportProtos.TransportApiRequestMsg, TransportProtos.TransportApiResponseMsg> transportApiTemplate;
    private TBKafkaProducerTemplate<TransportProtos.ToRuleEngineMsg> ruleEngineProducer;
    private TBKafkaConsumerTemplate<TransportProtos.ToTransportMsg> mainConsumer;
    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
    private volatile boolean stopped = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/thingsboard/server/common/transport/service/RemoteTransportService$TransportCallbackAdaptor.class */
    public static class TransportCallbackAdaptor implements Callback {
        private final TransportServiceCallback<Void> callback;

        TransportCallbackAdaptor(TransportServiceCallback<Void> transportServiceCallback) {
            this.callback = transportServiceCallback;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                if (this.callback != null) {
                    this.callback.onSuccess(null);
                }
            } else if (this.callback != null) {
                this.callback.onError(exc);
            }
        }
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    @PostConstruct
    public void init() {
        super.init();
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder builder = TBKafkaProducerTemplate.builder();
        builder.settings(this.kafkaSettings);
        builder.clientId("producer-transport-api-request-" + this.nodeIdProvider.getNodeId());
        builder.defaultTopic(this.transportApiRequestsTopic);
        builder.encoder(new TransportApiRequestEncoder());
        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder builder2 = TBKafkaConsumerTemplate.builder();
        builder2.settings(this.kafkaSettings);
        builder2.topic(this.transportApiResponsesTopic + "." + this.nodeIdProvider.getNodeId());
        builder2.clientId("transport-api-client-" + this.nodeIdProvider.getNodeId());
        builder2.groupId("transport-api-client");
        builder2.autoCommit(true);
        builder2.autoCommitIntervalMs(this.autoCommitInterval);
        builder2.decoder(new TransportApiResponseDecoder());
        TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder builder3 = TbKafkaRequestTemplate.builder();
        builder3.requestTemplate(builder.build());
        builder3.responseTemplate(builder2.build());
        builder3.maxPendingRequests(this.maxPendingRequests);
        builder3.maxRequestTimeout(this.maxRequestsTimeout);
        builder3.pollInterval(this.responsePollDuration);
        this.transportApiTemplate = builder3.build();
        this.transportApiTemplate.init();
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder builder4 = TBKafkaProducerTemplate.builder();
        builder4.settings(this.kafkaSettings);
        builder4.clientId("producer-rule-engine-request-" + this.nodeIdProvider.getNodeId());
        builder4.defaultTopic(this.ruleEngineTopic);
        builder4.encoder(new ToRuleEngineMsgEncoder());
        this.ruleEngineProducer = builder4.build();
        this.ruleEngineProducer.init();
        String str = this.notificationsTopic + "." + this.nodeIdProvider.getNodeId();
        try {
            new TBKafkaAdmin(this.kafkaSettings).createTopic(new NewTopic(str, 1, (short) 1)).all().get();
        } catch (Exception e) {
            log.trace("Failed to create topic: {}", e.getMessage(), e);
        }
        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder builder5 = TBKafkaConsumerTemplate.builder();
        builder5.settings(this.kafkaSettings);
        builder5.topic(str);
        builder5.clientId("transport-" + this.nodeIdProvider.getNodeId());
        builder5.groupId("transport");
        builder5.autoCommit(true);
        builder5.autoCommitIntervalMs(this.notificationsAutoCommitInterval);
        builder5.decoder(new ToTransportMsgResponseDecoder());
        this.mainConsumer = builder5.build();
        this.mainConsumer.subscribe();
        this.mainConsumerExecutor.execute(() -> {
            while (!this.stopped) {
                try {
                    this.mainConsumer.poll(Duration.ofMillis(this.notificationsPollDuration)).forEach(consumerRecord -> {
                        try {
                            TransportProtos.ToTransportMsg toTransportMsg = (TransportProtos.ToTransportMsg) this.mainConsumer.decode(consumerRecord);
                            if (toTransportMsg.hasToDeviceSessionMsg()) {
                                processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
                            }
                        } catch (Throwable th) {
                            log.warn("Failed to process the notification.", th);
                        }
                    });
                } catch (Exception e2) {
                    log.warn("Failed to obtain messages from queue.", e2);
                    try {
                        Thread.sleep(this.notificationsPollDuration);
                    } catch (InterruptedException e3) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", e3);
                    }
                }
            }
        });
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    @PreDestroy
    public void destroy() {
        super.destroy();
        this.stopped = true;
        if (this.transportApiTemplate != null) {
            this.transportApiTemplate.stop();
        }
        if (this.mainConsumer != null) {
            this.mainConsumer.unsubscribe();
        }
        if (this.mainConsumerExecutor != null) {
            this.mainConsumerExecutor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.common.transport.TransportService
    public void process(TransportProtos.ValidateDeviceTokenRequestMsg validateDeviceTokenRequestMsg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> transportServiceCallback) {
        log.trace("Processing msg: {}", validateDeviceTokenRequestMsg);
        ListenableFuture post = this.transportApiTemplate.post(validateDeviceTokenRequestMsg.getToken(), TransportProtos.TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(validateDeviceTokenRequestMsg).build());
        Consumer consumer = transportApiResponseMsg -> {
            transportServiceCallback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
        };
        transportServiceCallback.getClass();
        AsyncCallbackTemplate.withCallback(post, consumer, transportServiceCallback::onError, this.transportCallbackExecutor);
    }

    @Override // org.thingsboard.server.common.transport.TransportService
    public void process(TransportProtos.ValidateDeviceX509CertRequestMsg validateDeviceX509CertRequestMsg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> transportServiceCallback) {
        log.trace("Processing msg: {}", validateDeviceX509CertRequestMsg);
        ListenableFuture post = this.transportApiTemplate.post(validateDeviceX509CertRequestMsg.getHash(), TransportProtos.TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(validateDeviceX509CertRequestMsg).build());
        Consumer consumer = transportApiResponseMsg -> {
            transportServiceCallback.onSuccess(transportApiResponseMsg.getValidateTokenResponseMsg());
        };
        transportServiceCallback.getClass();
        AsyncCallbackTemplate.withCallback(post, consumer, transportServiceCallback::onError, this.transportCallbackExecutor);
    }

    @Override // org.thingsboard.server.common.transport.TransportService
    public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceFromGatewayRequestMsg, TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> transportServiceCallback) {
        log.trace("Processing msg: {}", getOrCreateDeviceFromGatewayRequestMsg);
        ListenableFuture post = this.transportApiTemplate.post(getOrCreateDeviceFromGatewayRequestMsg.getDeviceName(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(getOrCreateDeviceFromGatewayRequestMsg).build());
        Consumer consumer = transportApiResponseMsg -> {
            transportServiceCallback.onSuccess(transportApiResponseMsg.getGetOrCreateDeviceResponseMsg());
        };
        transportServiceCallback.getClass();
        AsyncCallbackTemplate.withCallback(post, consumer, transportServiceCallback::onError, this.transportCallbackExecutor);
    }

    @Override // org.thingsboard.server.common.transport.TransportService
    public void process(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.SubscriptionInfoProto subscriptionInfoProto, TransportServiceCallback<Void> transportServiceCallback) {
        if (log.isTraceEnabled()) {
            log.trace("[{}] Processing msg: {}", toId(sessionInfoProto), subscriptionInfoProto);
        }
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setSubscriptionInfo(subscriptionInfoProto).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.SessionEventMsg sessionEventMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setSessionEvent(sessionEventMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.PostTelemetryMsg postTelemetryMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setPostTelemetry(postTelemetryMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.PostAttributeMsg postAttributeMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setPostAttributes(postAttributeMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.GetAttributeRequestMsg getAttributeRequestMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setGetAttributes(getAttributeRequestMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.SubscribeToAttributeUpdatesMsg subscribeToAttributeUpdatesMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setSubscribeToAttributes(subscribeToAttributeUpdatesMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.SubscribeToRPCMsg subscribeToRPCMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setSubscribeToRPC(subscribeToRPCMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.ToDeviceRpcResponseMsg toDeviceRpcResponseMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setToDeviceRPCCallResponse(toDeviceRpcResponseMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.ToServerRpcRequestMsg toServerRpcRequestMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setToServerRPCCallRequest(toServerRpcRequestMsg).build()).build(), transportServiceCallback);
    }

    @Override // org.thingsboard.server.common.transport.service.AbstractTransportService
    protected void registerClaimingInfo(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.ClaimDeviceMsg claimDeviceMsg, TransportServiceCallback<Void> transportServiceCallback) {
        send(sessionInfoProto, TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfoProto).setClaimDevice(claimDeviceMsg).build()).build(), transportServiceCallback);
    }

    private void send(TransportProtos.SessionInfoProto sessionInfoProto, TransportProtos.ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> transportServiceCallback) {
        this.ruleEngineProducer.send(getRoutingKey(sessionInfoProto), toRuleEngineMsg, new TransportCallbackAdaptor(transportServiceCallback));
    }
}
