package org.thingsboard.server.queue.kafka;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate;

/* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.class */
public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQueueConsumerTemplate<ConsumerRecord<String, byte[]>, T> {
    private static final Logger log = LoggerFactory.getLogger(TbKafkaConsumerTemplate.class);
    private final TbQueueAdmin admin;
    private final KafkaConsumer<String, byte[]> consumer;
    private final TbKafkaDecoder<T> decoder;
    private final TbKafkaConsumerStatsService statsService;
    private final String groupId;

    /* loaded from: input_file:org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate$TbKafkaConsumerTemplateBuilder.class */
    public static class TbKafkaConsumerTemplateBuilder<T extends TbQueueMsg> {
        private TbKafkaSettings settings;
        private TbKafkaDecoder<T> decoder;
        private String clientId;
        private String groupId;
        private String topic;
        private TbQueueAdmin admin;
        private TbKafkaConsumerStatsService statsService;

        TbKafkaConsumerTemplateBuilder() {
        }

        public TbKafkaConsumerTemplateBuilder<T> settings(TbKafkaSettings tbKafkaSettings) {
            this.settings = tbKafkaSettings;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> decoder(TbKafkaDecoder<T> tbKafkaDecoder) {
            this.decoder = tbKafkaDecoder;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> clientId(String str) {
            this.clientId = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> groupId(String str) {
            this.groupId = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> topic(String str) {
            this.topic = str;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> admin(TbQueueAdmin tbQueueAdmin) {
            this.admin = tbQueueAdmin;
            return this;
        }

        public TbKafkaConsumerTemplateBuilder<T> statsService(TbKafkaConsumerStatsService tbKafkaConsumerStatsService) {
            this.statsService = tbKafkaConsumerStatsService;
            return this;
        }

        public TbKafkaConsumerTemplate<T> build() {
            return new TbKafkaConsumerTemplate<>(this.settings, this.decoder, this.clientId, this.groupId, this.topic, this.admin, this.statsService);
        }

        public String toString() {
            return "TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder(settings=" + String.valueOf(this.settings) + ", decoder=" + String.valueOf(this.decoder) + ", clientId=" + this.clientId + ", groupId=" + this.groupId + ", topic=" + this.topic + ", admin=" + String.valueOf(this.admin) + ", statsService=" + String.valueOf(this.statsService) + ")";
        }
    }

    private TbKafkaConsumerTemplate(TbKafkaSettings tbKafkaSettings, TbKafkaDecoder<T> tbKafkaDecoder, String str, String str2, String str3, TbQueueAdmin tbQueueAdmin, TbKafkaConsumerStatsService tbKafkaConsumerStatsService) {
        super(str3);
        Properties consumerProps = tbKafkaSettings.toConsumerProps(str3);
        consumerProps.put("client.id", str);
        if (str2 != null) {
            consumerProps.put("group.id", str2);
        }
        this.statsService = tbKafkaConsumerStatsService;
        this.groupId = str2;
        if (tbKafkaConsumerStatsService != null) {
            tbKafkaConsumerStatsService.registerClientGroup(str2);
        }
        this.admin = tbQueueAdmin;
        this.consumer = new KafkaConsumer<>(consumerProps);
        this.decoder = tbKafkaDecoder;
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doSubscribe(List<String> list) {
        if (list.isEmpty()) {
            log.info("unsubscribe due to empty topic list");
            this.consumer.unsubscribe();
        } else {
            TbQueueAdmin tbQueueAdmin = this.admin;
            Objects.requireNonNull(tbQueueAdmin);
            list.forEach(tbQueueAdmin::createTopicIfNotExists);
            this.consumer.subscribe(list);
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected List<ConsumerRecord<String, byte[]>> doPoll(long j) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        log.trace("poll topic {} maxDuration {}", getTopic(), Long.valueOf(j));
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(j));
        stopWatch.stop();
        log.trace("poll topic {} took {}ms", getTopic(), Long.valueOf(stopWatch.getTotalTimeMillis()));
        if (poll.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(256);
        Objects.requireNonNull(arrayList);
        poll.forEach((v1) -> {
            r1.add(v1);
        });
        return arrayList;
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public T decode(ConsumerRecord<String, byte[]> consumerRecord) throws IOException {
        return this.decoder.decode(new KafkaTbQueueMsg(consumerRecord));
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doCommit() {
        this.consumer.commitSync();
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    protected void doUnsubscribe() {
        if (this.consumer != null) {
            this.consumer.unsubscribe();
            this.consumer.close();
        }
        if (this.statsService != null) {
            this.statsService.unregisterClientGroup(this.groupId);
        }
    }

    @Override // org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate
    public boolean isLongPollingSupported() {
        return true;
    }

    public static <T extends TbQueueMsg> TbKafkaConsumerTemplateBuilder<T> builder() {
        return new TbKafkaConsumerTemplateBuilder<>();
    }
}
