/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.edqs;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueHandler;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.PartitionedQueueResponseTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.EdqsQueueFactory;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;

@Component
@KafkaEdqsComponent
public class KafkaEdqsQueueFactory
implements EdqsQueueFactory {
    private final TbKafkaSettings kafkaSettings;
    private final TbKafkaAdmin edqsEventsAdmin;
    private final TbKafkaAdmin edqsRequestsAdmin;
    private final TbKafkaAdmin edqsStateAdmin;
    private final EdqsConfig edqsConfig;
    private final EdqsExecutors edqsExecutors;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbKafkaConsumerStatsService consumerStatsService;
    private final TopicService topicService;
    private final StatsFactory statsFactory;
    private final AtomicInteger consumerCounter = new AtomicInteger();

    public KafkaEdqsQueueFactory(TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs topicConfigs, EdqsConfig edqsConfig, EdqsExecutors edqsExecutors, TbServiceInfoProvider serviceInfoProvider, TbKafkaConsumerStatsService consumerStatsService, TopicService topicService, StatsFactory statsFactory) {
        this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsEventsConfigs());
        this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsRequestsConfigs());
        this.edqsStateAdmin = new TbKafkaAdmin(kafkaSettings, topicConfigs.getEdqsStateConfigs());
        this.kafkaSettings = kafkaSettings;
        this.edqsConfig = edqsConfig;
        this.edqsExecutors = edqsExecutors;
        this.serviceInfoProvider = serviceInfoProvider;
        this.consumerStatsService = consumerStatsService;
        this.topicService = topicService;
        this.statsFactory = statsFactory;
    }

    public TbKafkaConsumerTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsEventsConsumer() {
        return this.createEdqsMsgConsumer(this.edqsConfig.getEventsTopic(), "edqs-events-" + this.consumerCounter.getAndIncrement() + "-consumer-" + this.serviceInfoProvider.getServiceId(), null, false, this.edqsEventsAdmin);
    }

    public TbKafkaConsumerTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsEventsToBackupConsumer() {
        return this.createEdqsMsgConsumer(this.edqsConfig.getEventsTopic(), "edqs-events-to-backup-consumer-" + this.serviceInfoProvider.getServiceId(), "edqs-events-to-backup-consumer-group", false, this.edqsEventsAdmin);
    }

    public TbKafkaConsumerTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsStateConsumer() {
        return this.createEdqsMsgConsumer(this.edqsConfig.getStateTopic(), "edqs-state-" + this.consumerCounter.getAndIncrement() + "-consumer-" + this.serviceInfoProvider.getServiceId(), null, true, this.edqsStateAdmin);
    }

    public TbKafkaConsumerTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsMsgConsumer(String topic, String clientId, String group, boolean readFullAndStop, TbKafkaAdmin admin) {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(topic)).readFromBeginning(readFullAndStop).stopWhenRead(readFullAndStop).clientId(clientId).groupId(this.topicService.buildTopicName(group)).decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdqsMsg>(msg.getKey(), TransportProtos.ToEdqsMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(admin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsStateProducer() {
        return TbKafkaProducerTemplate.builder().clientId("edqs-state-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getStateTopic())).settings(this.kafkaSettings).admin(this.edqsStateAdmin).build();
    }

    @Override
    public PartitionedQueueResponseTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> createEdqsResponseTemplate(TbQueueHandler<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> handler) {
        TbKafkaProducerTemplate responseProducer = TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("edqs-response-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getResponsesTopic())).admin(this.edqsRequestsAdmin).build();
        return PartitionedQueueResponseTemplate.builder().key("edqs").handler(handler).requestsTopic(this.topicService.buildTopicName(this.edqsConfig.getRequestsTopic())).consumerCreator(tpi -> this.createEdqsMsgConsumer(this.edqsConfig.getRequestsTopic(), "edqs-requests-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + String.valueOf(tpi.getPartition().orElse(999)), "edqs-requests-consumer-group", false, this.edqsRequestsAdmin)).responseProducer(responseProducer).pollInterval(this.edqsConfig.getPollInterval()).requestTimeout(this.edqsConfig.getMaxRequestTimeout()).maxPendingRequests(this.edqsConfig.getMaxPendingRequests()).consumerExecutor(this.edqsExecutors.getConsumersExecutor()).callbackExecutor((ExecutorService)this.edqsExecutors.getRequestExecutor()).consumerTaskExecutor(this.edqsExecutors.getConsumerTaskExecutor()).stats(this.statsFactory.createMessagesStats(StatsType.EDQS.getName())).build();
    }

    @Override
    public TbKafkaAdmin getEdqsQueueAdmin() {
        return this.edqsEventsAdmin;
    }
}

