package org.thingsboard.server.service.edge.rpc;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.service.edge.EdgeContextComponent;

/* loaded from: input_file:org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.class */
public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
    private static final Logger log = LoggerFactory.getLogger(KafkaEdgeGrpcSession.class);
    private final TopicService topicService;
    private final TbCoreQueueFactory tbCoreQueueFactory;
    private final TbKafkaSettings kafkaSettings;
    private final TbKafkaTopicConfigs kafkaTopicConfigs;
    private volatile boolean isHighPriorityProcessing;
    private QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> consumer;
    private ExecutorService consumerExecutor;

    public KafkaEdgeGrpcSession(EdgeContextComponent edgeContextComponent, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory, TbKafkaSettings tbKafkaSettings, TbKafkaTopicConfigs tbKafkaTopicConfigs, StreamObserver<ResponseMsg> streamObserver, BiConsumer<EdgeId, EdgeGrpcSession> biConsumer, BiConsumer<Edge, UUID> biConsumer2, ScheduledExecutorService scheduledExecutorService, int i, int i2) {
        super(edgeContextComponent, streamObserver, biConsumer, biConsumer2, scheduledExecutorService, i, i2);
        this.topicService = topicService;
        this.tbCoreQueueFactory = tbCoreQueueFactory;
        this.kafkaSettings = tbKafkaSettings;
        this.kafkaTopicConfigs = tbKafkaTopicConfigs;
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> tbQueueConsumer) {
        log.trace("[{}][{}] starting processing edge events", this.tenantId, this.sessionId);
        if (!isConnected() || isSyncInProgress() || this.isHighPriorityProcessing) {
            try {
                Thread.sleep(this.ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval());
            } catch (InterruptedException e) {
                log.trace("Failed to wait until the server has capacity to handle new requests", e);
            }
            log.trace("[{}][{}] edge is not connected or sync is not completed. Skipping iteration", this.tenantId, this.sessionId);
            return;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(ProtoUtils.fromProto(it.next().getValue().getEdgeEventMsg()));
        }
        try {
            if (((Boolean) sendDownlinkMsgsPack(convertToDownlinkMsgsPack(arrayList)).get()).booleanValue()) {
                log.debug("[{}][{}][{}] Send downlink messages task was interrupted", new Object[]{this.tenantId, this.edge.getId(), this.sessionId});
            } else {
                tbQueueConsumer.commit();
            }
        } catch (Exception e2) {
            log.error("[{}] Failed to process all downlink messages", this.sessionId, e2);
        }
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeGrpcSession
    public ListenableFuture<Boolean> migrateEdgeEvents() throws Exception {
        return super.processEdgeEvents();
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeGrpcSession
    public ListenableFuture<Boolean> processEdgeEvents() {
        if (this.consumer == null) {
            this.consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-event-consumer"));
            this.consumer = QueueConsumerManager.builder().name("TB Edge events").msgPackProcessor(this::processMsgs).pollInterval(this.ctx.getEdgeEventStorageSettings().getNoRecordsSleepInterval()).consumerCreator(() -> {
                return this.tbCoreQueueFactory.createEdgeEventMsgConsumer(this.tenantId, this.edge.getId());
            }).consumerExecutor(this.consumerExecutor).threadPrefix("edge-events").build();
            this.consumer.subscribe();
            this.consumer.launch();
        }
        return Futures.immediateFuture(Boolean.FALSE);
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeGrpcSession
    public void processHighPriorityEvents() {
        this.isHighPriorityProcessing = true;
        super.processHighPriorityEvents();
        this.isHighPriorityProcessing = false;
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeGrpcSession
    public void destroy() {
        this.consumer.stop();
        this.consumerExecutor.shutdown();
    }

    @Override // org.thingsboard.server.service.edge.rpc.EdgeGrpcSession
    public void cleanUp() {
        String topic = this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(this.tenantId, this.edge.getId()).getTopic();
        TbKafkaAdmin tbKafkaAdmin = new TbKafkaAdmin(this.kafkaSettings, this.kafkaTopicConfigs.getEdgeEventConfigs());
        tbKafkaAdmin.deleteTopic(topic);
        tbKafkaAdmin.deleteConsumerGroup(topic);
    }
}
