package org.thingsboard.server.service.transport;

import akka.actor.ActorRef;
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.BlockingBucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.local.LocalBucket;
import io.github.bucket4j.local.LocalBucketBuilder;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.msg.cluster.ServerAddress;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
import org.thingsboard.server.kafka.TbKafkaSettings;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.install.DatabaseHelper;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;

@ConditionalOnProperty(prefix = "transport", value = {DatabaseHelper.TYPE}, havingValue = "remote")
@Service
/* loaded from: input_file:org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.class */
public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
    private static final Logger log = LoggerFactory.getLogger(RemoteRuleEngineTransportService.class);

    @Value("${transport.remote.rule_engine.topic}")
    private String ruleEngineTopic;

    @Value("${transport.remote.notifications.topic}")
    private String notificationsTopic;

    @Value("${transport.remote.rule_engine.poll_interval}")
    private int pollDuration;

    @Value("${transport.remote.rule_engine.auto_commit_interval}")
    private int autoCommitInterval;

    @Value("${transport.remote.rule_engine.poll_records_pack_size}")
    private int pollRecordsPackSize;

    @Value("${transport.remote.rule_engine.max_poll_records_per_second}")
    private long pollRecordsPerSecond;

    @Value("${transport.remote.rule_engine.max_poll_records_per_minute}")
    private long pollRecordsPerMinute;

    @Autowired
    private TbKafkaSettings kafkaSettings;

    @Autowired
    private TbNodeIdProvider nodeIdProvider;

    @Autowired
    private ActorSystemContext actorContext;

    @Autowired
    private ClusterRoutingService routingService;

    @Autowired
    private ClusterRpcService rpcService;

    @Autowired
    private DataDecodingEncodingService encodingService;
    private TBKafkaConsumerTemplate<TransportProtos.ToRuleEngineMsg> ruleEngineConsumer;
    private TBKafkaProducerTemplate<TransportProtos.ToTransportMsg> notificationsProducer;
    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
    private volatile boolean stopped = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/thingsboard/server/service/transport/RemoteRuleEngineTransportService$QueueCallbackAdaptor.class */
    public static class QueueCallbackAdaptor implements Callback {
        private final Runnable onSuccess;
        private final Consumer<Throwable> onFailure;

        QueueCallbackAdaptor(Runnable runnable, Consumer<Throwable> consumer) {
            this.onSuccess = runnable;
            this.onFailure = consumer;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc == null) {
                if (this.onSuccess != null) {
                    this.onSuccess.run();
                }
            } else if (this.onFailure != null) {
                this.onFailure.accept(exc);
            }
        }
    }

    @PostConstruct
    public void init() {
        TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder builder = TBKafkaProducerTemplate.builder();
        builder.settings(this.kafkaSettings);
        builder.clientId("producer-transport-notification-" + this.nodeIdProvider.getNodeId());
        builder.encoder(new ToTransportMsgEncoder());
        this.notificationsProducer = builder.build();
        this.notificationsProducer.init();
        TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder builder2 = TBKafkaConsumerTemplate.builder();
        builder2.settings(this.kafkaSettings);
        builder2.topic(this.ruleEngineTopic);
        builder2.clientId("transport-" + this.nodeIdProvider.getNodeId());
        builder2.groupId("tb-node");
        builder2.autoCommit(true);
        builder2.autoCommitIntervalMs(this.autoCommitInterval);
        builder2.maxPollRecords(this.pollRecordsPackSize);
        builder2.decoder(new ToRuleEngineMsgDecoder());
        this.ruleEngineConsumer = builder2.build();
        this.ruleEngineConsumer.subscribe();
    }

    @EventListener({ApplicationReadyEvent.class})
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        log.info("Received application ready event. Starting polling for events.");
        LocalBucketBuilder builder = Bucket4j.builder();
        builder.addLimit(Bandwidth.simple(this.pollRecordsPerSecond, Duration.ofSeconds(1L)));
        builder.addLimit(Bandwidth.simple(this.pollRecordsPerMinute, Duration.ofMinutes(1L)));
        LocalBucket build = builder.build();
        BlockingBucket asScheduler = build.asScheduler();
        this.mainConsumerExecutor.execute(() -> {
            while (!this.stopped) {
                try {
                    ConsumerRecords poll = this.ruleEngineConsumer.poll(Duration.ofMillis(this.pollDuration));
                    int count = poll.count();
                    if (count > 0) {
                        while (!asScheduler.tryConsume(count, TimeUnit.SECONDS.toNanos(5L))) {
                            log.info("Rule Engine consumer is busy. Required tokens: [{}]. Available tokens: [{}].", Integer.valueOf(count), Long.valueOf(build.getAvailableTokens()));
                            Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
                        }
                        log.trace("Processing {} records", Integer.valueOf(count));
                    }
                    poll.forEach(consumerRecord -> {
                        try {
                            TransportProtos.ToRuleEngineMsg toRuleEngineMsg = (TransportProtos.ToRuleEngineMsg) this.ruleEngineConsumer.decode(consumerRecord);
                            log.trace("Forwarding message to rule engine {}", toRuleEngineMsg);
                            if (toRuleEngineMsg.hasToDeviceActorMsg()) {
                                forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
                            }
                        } catch (Throwable th) {
                            log.warn("Failed to process the notification.", th);
                        }
                    });
                } catch (Exception e) {
                    log.warn("Failed to obtain messages from queue.", e);
                    try {
                        Thread.sleep(this.pollDuration);
                    } catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", e2);
                    }
                }
            }
        });
    }

    @Override // org.thingsboard.server.service.transport.RuleEngineTransportService
    public void process(String str, TransportProtos.DeviceActorToTransportMsg deviceActorToTransportMsg) {
        process(str, deviceActorToTransportMsg, null, null);
    }

    @Override // org.thingsboard.server.service.transport.RuleEngineTransportService
    public void process(String str, TransportProtos.DeviceActorToTransportMsg deviceActorToTransportMsg, Runnable runnable, Consumer<Throwable> consumer) {
        String str2 = this.notificationsTopic + "." + str;
        UUID uuid = new UUID(deviceActorToTransportMsg.getSessionIdMSB(), deviceActorToTransportMsg.getSessionIdLSB());
        TransportProtos.ToTransportMsg build = TransportProtos.ToTransportMsg.newBuilder().setToDeviceSessionMsg(deviceActorToTransportMsg).build();
        log.trace("[{}][{}] Pushing session data to topic: {}", new Object[]{str2, uuid, build});
        this.notificationsProducer.send(str2, uuid.toString(), build, new QueueCallbackAdaptor(runnable, consumer));
    }

    private void forwardToDeviceActor(TransportProtos.TransportToDeviceActorMsg transportToDeviceActorMsg) {
        TransportToDeviceActorMsgWrapper transportToDeviceActorMsgWrapper = new TransportToDeviceActorMsgWrapper(transportToDeviceActorMsg);
        Optional<ServerAddress> resolveById = this.routingService.resolveById(transportToDeviceActorMsgWrapper.getDeviceId());
        if (resolveById.isPresent()) {
            log.trace("[{}] Pushing message to remote server: {}", resolveById.get(), transportToDeviceActorMsg);
            this.rpcService.tell(this.encodingService.convertToProtoDataMessage(resolveById.get(), transportToDeviceActorMsgWrapper));
        } else {
            log.trace("Pushing message to local server: {}", transportToDeviceActorMsg);
            this.actorContext.getAppActor().tell(transportToDeviceActorMsgWrapper, ActorRef.noSender());
        }
    }

    @PreDestroy
    public void destroy() {
        this.stopped = true;
        if (this.ruleEngineConsumer != null) {
            this.ruleEngineConsumer.unsubscribe();
        }
        if (this.mainConsumerExecutor != null) {
            this.mainConsumerExecutor.shutdownNow();
        }
    }
}
