/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.common.transport.service;

import com.google.common.util.concurrent.ListenableFuture;
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.AbstractTransportService;
import org.thingsboard.server.common.transport.service.ToRuleEngineMsgEncoder;
import org.thingsboard.server.common.transport.service.ToTransportMsgResponseDecoder;
import org.thingsboard.server.common.transport.service.TransportApiRequestEncoder;
import org.thingsboard.server.common.transport.service.TransportApiResponseDecoder;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.kafka.AsyncCallbackTemplate;
import org.thingsboard.server.kafka.TBKafkaAdmin;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaDecoder;
import org.thingsboard.server.kafka.TbKafkaEncoder;
import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider;

@ConditionalOnExpression(value="'${transport.type:null}'=='null'")
@Service
public class RemoteTransportService
extends AbstractTransportService {
    private static final Logger log = LoggerFactory.getLogger(RemoteTransportService.class);
    @Value(value="${kafka.rule_engine.topic}")
    private String ruleEngineTopic;
    @Value(value="${kafka.notifications.topic}")
    private String notificationsTopic;
    @Value(value="${kafka.notifications.poll_interval}")
    private int notificationsPollDuration;
    @Value(value="${kafka.notifications.auto_commit_interval}")
    private int notificationsAutoCommitInterval;
    @Value(value="${kafka.transport_api.requests_topic}")
    private String transportApiRequestsTopic;
    @Value(value="${kafka.transport_api.responses_topic}")
    private String transportApiResponsesTopic;
    @Value(value="${kafka.transport_api.max_pending_requests}")
    private long maxPendingRequests;
    @Value(value="${kafka.transport_api.max_requests_timeout}")
    private long maxRequestsTimeout;
    @Value(value="${kafka.transport_api.response_poll_interval}")
    private int responsePollDuration;
    @Value(value="${kafka.transport_api.response_auto_commit_interval}")
    private int autoCommitInterval;
    @Autowired
    private TbKafkaSettings kafkaSettings;
    @Autowired
    private TbNodeIdProvider nodeIdProvider;
    private TbKafkaRequestTemplate<TransportProtos.TransportApiRequestMsg, TransportProtos.TransportApiResponseMsg> transportApiTemplate;
    private TBKafkaProducerTemplate<TransportProtos.ToRuleEngineMsg> ruleEngineProducer;
    private TBKafkaConsumerTemplate<TransportProtos.ToTransportMsg> mainConsumer;
    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
    private volatile boolean stopped = false;

    @Override
    @PostConstruct
    public void init() {
        super.init();
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder requestBuilder = TBKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("producer-transport-api-request-" + this.nodeIdProvider.getNodeId());
        requestBuilder.defaultTopic(this.transportApiRequestsTopic);
        requestBuilder.encoder((TbKafkaEncoder)new TransportApiRequestEncoder());
        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder responseBuilder = TBKafkaConsumerTemplate.builder();
        responseBuilder.settings(this.kafkaSettings);
        responseBuilder.topic(this.transportApiResponsesTopic + "." + this.nodeIdProvider.getNodeId());
        responseBuilder.clientId("transport-api-client-" + this.nodeIdProvider.getNodeId());
        responseBuilder.groupId("transport-api-client");
        responseBuilder.autoCommit(true);
        responseBuilder.autoCommitIntervalMs(this.autoCommitInterval);
        responseBuilder.decoder((TbKafkaDecoder)new TransportApiResponseDecoder());
        TbKafkaRequestTemplate.TbKafkaRequestTemplateBuilder builder = TbKafkaRequestTemplate.builder();
        builder.requestTemplate(requestBuilder.build());
        builder.responseTemplate(responseBuilder.build());
        builder.maxPendingRequests(this.maxPendingRequests);
        builder.maxRequestTimeout(this.maxRequestsTimeout);
        builder.pollInterval((long)this.responsePollDuration);
        this.transportApiTemplate = builder.build();
        this.transportApiTemplate.init();
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
        ruleEngineProducerBuilder.settings(this.kafkaSettings);
        ruleEngineProducerBuilder.clientId("producer-rule-engine-request-" + this.nodeIdProvider.getNodeId());
        ruleEngineProducerBuilder.defaultTopic(this.ruleEngineTopic);
        ruleEngineProducerBuilder.encoder((TbKafkaEncoder)new ToRuleEngineMsgEncoder());
        this.ruleEngineProducer = ruleEngineProducerBuilder.build();
        this.ruleEngineProducer.init();
        String notificationsTopicName = this.notificationsTopic + "." + this.nodeIdProvider.getNodeId();
        try {
            TBKafkaAdmin admin = new TBKafkaAdmin(this.kafkaSettings);
            CreateTopicsResult result = admin.createTopic(new NewTopic(notificationsTopicName, 1, 1));
            result.all().get();
        }
        catch (Exception e) {
            log.trace("Failed to create topic: {}", (Object)e.getMessage(), (Object)e);
        }
        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
        mainConsumerBuilder.settings(this.kafkaSettings);
        mainConsumerBuilder.topic(notificationsTopicName);
        mainConsumerBuilder.clientId("transport-" + this.nodeIdProvider.getNodeId());
        mainConsumerBuilder.groupId("transport");
        mainConsumerBuilder.autoCommit(true);
        mainConsumerBuilder.autoCommitIntervalMs(this.notificationsAutoCommitInterval);
        mainConsumerBuilder.decoder((TbKafkaDecoder)new ToTransportMsgResponseDecoder());
        this.mainConsumer = mainConsumerBuilder.build();
        this.mainConsumer.subscribe();
        this.mainConsumerExecutor.execute(() -> {
            while (!this.stopped) {
                try {
                    ConsumerRecords records = this.mainConsumer.poll(Duration.ofMillis(this.notificationsPollDuration));
                    records.forEach(record -> {
                        try {
                            TransportProtos.ToTransportMsg toTransportMsg = (TransportProtos.ToTransportMsg)this.mainConsumer.decode(record);
                            if (toTransportMsg.hasToDeviceSessionMsg()) {
                                this.processToTransportMsg(toTransportMsg.getToDeviceSessionMsg());
                            }
                        }
                        catch (Throwable e) {
                            log.warn("Failed to process the notification.", e);
                        }
                    });
                }
                catch (Exception e) {
                    log.warn("Failed to obtain messages from queue.", (Throwable)e);
                    try {
                        Thread.sleep(this.notificationsPollDuration);
                    }
                    catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e2);
                    }
                }
            }
        });
    }

    @Override
    @PreDestroy
    public void destroy() {
        super.destroy();
        this.stopped = true;
        if (this.transportApiTemplate != null) {
            this.transportApiTemplate.stop();
        }
        if (this.mainConsumer != null) {
            this.mainConsumer.unsubscribe();
        }
        if (this.mainConsumerExecutor != null) {
            this.mainConsumerExecutor.shutdownNow();
        }
    }

    @Override
    public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiTemplate.post(msg.getToken(), (Object)TransportProtos.TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build()), response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiTemplate.post(msg.getHash(), (Object)TransportProtos.TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()), response -> callback.onSuccess(response.getValidateTokenResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiTemplate.post(msg.getDeviceName(), (Object)TransportProtos.TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build()), response -> callback.onSuccess(response.getGetOrCreateDeviceResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
        if (log.isTraceEnabled()) {
            log.trace("[{}] Processing msg: {}", (Object)this.toId(sessionInfo), (Object)msg);
        }
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostTelemetry(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setPostAttributes(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void doProcess(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToServerRPCCallRequest(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    @Override
    protected void registerClaimingInfo(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
        TransportProtos.ToRuleEngineMsg toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setClaimDevice(msg).build()).build();
        this.send(sessionInfo, toRuleEngineMsg, callback);
    }

    private void send(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
        this.ruleEngineProducer.send(this.getRoutingKey(sessionInfo), (Object)toRuleEngineMsg, (metadata, exception) -> {
            if (callback != null) {
                if (exception == null) {
                    this.transportCallbackExecutor.submit(() -> callback.onSuccess(null));
                } else {
                    this.transportCallbackExecutor.submit(() -> callback.onError(exception));
                }
            }
        });
    }
}

