/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.provider;

import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;

@Component
@ConditionalOnExpression(value="'${queue.type:null}'=='kafka' && '${service.type:null}'=='monolith'")
public class KafkaMonolithQueueFactory
implements TbCoreQueueFactory,
TbRuleEngineQueueFactory,
TbVersionControlQueueFactory {
    private final TopicService topicService;
    private final TbKafkaSettings kafkaSettings;
    private final KafkaAdmin kafkaAdmin;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbQueueCoreSettings coreSettings;
    private final TbQueueRuleEngineSettings ruleEngineSettings;
    private final TbQueueTransportApiSettings transportApiSettings;
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
    private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
    private final TbQueueVersionControlSettings vcSettings;
    private final TbQueueEdgeSettings edgeSettings;
    private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
    private final TbKafkaConsumerStatsService consumerStatsService;
    private final EdqsConfig edqsConfig;
    private final TasksQueueConfig tasksQueueConfig;
    private final TbQueueAdmin coreAdmin;
    private final TbKafkaAdmin ruleEngineAdmin;
    private final TbQueueAdmin jsExecutorRequestAdmin;
    private final TbQueueAdmin jsExecutorResponseAdmin;
    private final TbQueueAdmin transportApiRequestAdmin;
    private final TbQueueAdmin transportApiResponseAdmin;
    private final TbQueueAdmin notificationAdmin;
    private final TbQueueAdmin fwUpdatesAdmin;
    private final TbQueueAdmin vcAdmin;
    private final TbQueueAdmin housekeeperAdmin;
    private final TbQueueAdmin housekeeperReprocessingAdmin;
    private final TbEdgeQueueAdmin edgeAdmin;
    private final TbQueueAdmin edgeEventAdmin;
    private final TbQueueAdmin cfAdmin;
    private final TbQueueAdmin cfStateAdmin;
    private final TbQueueAdmin edqsEventsAdmin;
    private final TbKafkaAdmin edqsRequestsAdmin;
    private final TbQueueAdmin tasksAdmin;
    private final AtomicLong consumerCount = new AtomicLong();
    private final AtomicLong edgeConsumerCount = new AtomicLong();

    public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, KafkaAdmin kafkaAdmin, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbQueueVersionControlSettings vcSettings, TbQueueEdgeSettings edgeSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, TbKafkaConsumerStatsService consumerStatsService, TbKafkaTopicConfigs kafkaTopicConfigs, EdqsConfig edqsConfig, TasksQueueConfig tasksQueueConfig) {
        this.topicService = topicService;
        this.kafkaSettings = kafkaSettings;
        this.kafkaAdmin = kafkaAdmin;
        this.serviceInfoProvider = serviceInfoProvider;
        this.coreSettings = coreSettings;
        this.ruleEngineSettings = ruleEngineSettings;
        this.transportApiSettings = transportApiSettings;
        this.transportNotificationSettings = transportNotificationSettings;
        this.jsInvokeSettings = jsInvokeSettings;
        this.vcSettings = vcSettings;
        this.consumerStatsService = consumerStatsService;
        this.edgeSettings = edgeSettings;
        this.calculatedFieldSettings = calculatedFieldSettings;
        this.edqsConfig = edqsConfig;
        this.tasksQueueConfig = tasksQueueConfig;
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
        this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
        this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
        this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
        this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
        this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
        this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
        this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
        this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
        this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
        this.cfStateAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldStateConfigs());
        this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsEventsConfigs());
        this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsRequestsConfigs());
        this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-transport-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.transportNotificationSettings.getNotificationsTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-rule-engine-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.ruleEngineSettings.getTopic()));
        requestBuilder.admin(this.ruleEngineAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-rule-engine-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.ruleEngineSettings.getTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-core-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getTopic()));
        requestBuilder.admin(this.coreAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-core-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.getNotificationsTopic(ServiceType.TB_CORE, this.serviceInfoProvider.getServiceId()).getFullTopicName());
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createToVersionControlMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.vcSettings.getTopic()));
        consumerBuilder.clientId("monolith-vc-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-vc-node"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>(msg.getKey(), TransportProtos.ToVersionControlServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.vcAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration) {
        throw new UnsupportedOperationException("Rule engine consumer should use a partitionId");
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(Queue configuration, Integer partitionId) {
        String queueName = configuration.getName();
        String groupId = this.topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
        this.kafkaAdmin.syncOffsets(this.topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), groupId, partitionId);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(configuration.getTopic()));
        consumerBuilder.clientId("re-" + queueName + "-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.consumerCount.incrementAndGet());
        consumerBuilder.groupId(groupId);
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>(msg.getKey(), TransportProtos.ToRuleEngineMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.ruleEngineAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createToRuleEngineNotificationsMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("monolith-rule-engine-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-rule-engine-notifications-consumer-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>(msg.getKey(), TransportProtos.ToRuleEngineNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getTopic()));
        consumerBuilder.clientId("monolith-core-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.consumerCount.incrementAndGet());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-core-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCoreMsg>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.coreAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getNotificationsTopic(ServiceType.TB_CORE, this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("monolith-core-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-core-notifications-consumer-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.transportApiSettings.getRequestsTopic()));
        consumerBuilder.clientId("monolith-transport-api-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-transport-api-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.transportApiRequestAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-transport-api-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.transportApiSettings.getResponsesTopic()));
        requestBuilder.admin(this.transportApiResponseAdmin);
        return requestBuilder.build();
    }

    @Override
    @Bean
    public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("producer-js-invoke-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.jsInvokeSettings.getRequestTopic());
        requestBuilder.admin(this.jsExecutorRequestAdmin);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> responseBuilder = TbKafkaConsumerTemplate.builder();
        responseBuilder.settings(this.kafkaSettings);
        responseBuilder.topic(this.jsInvokeSettings.getResponseTopic() + "." + this.serviceInfoProvider.getServiceId());
        responseBuilder.clientId("js-" + this.serviceInfoProvider.getServiceId());
        responseBuilder.groupId(this.topicService.buildTopicName("rule-engine-node-") + this.serviceInfoProvider.getServiceId());
        responseBuilder.decoder(msg -> {
            JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
            JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), (Message.Builder)builder);
            return new TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>(msg.getKey(), builder.build(), msg.getHeaders());
        });
        responseBuilder.statsService(this.consumerStatsService);
        responseBuilder.admin(this.jsExecutorResponseAdmin);
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder builder = DefaultTbQueueRequestTemplate.builder();
        builder.queueAdmin(this.jsExecutorResponseAdmin);
        builder.requestTemplate(requestBuilder.build());
        builder.responseTemplate(responseBuilder.build());
        builder.maxPendingRequests(this.jsInvokeSettings.getMaxPendingRequests());
        builder.maxRequestTimeout(this.jsInvokeSettings.getMaxRequestsTimeout());
        builder.pollInterval(this.jsInvokeSettings.getResponsePollInterval());
        return builder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getUsageStatsTopic()));
        consumerBuilder.clientId("monolith-us-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-us-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>(msg.getKey(), TransportProtos.ToUsageStatsServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.coreAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getOtaPackageTopic()));
        consumerBuilder.clientId("monolith-ota-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-ota-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>(msg.getKey(), TransportProtos.ToOtaPackageStateServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.fwUpdatesAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-ota-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getOtaPackageTopic()));
        requestBuilder.admin(this.fwUpdatesAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-us-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getUsageStatsTopic()));
        requestBuilder.admin(this.coreAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-vc-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.vcSettings.getTopic()));
        requestBuilder.admin(this.vcAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
        return TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("monolith-housekeeper-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperTopic())).admin(this.housekeeperAdmin).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperTopic())).clientId("monolith-housekeeper-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("monolith-housekeeper-consumer")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.housekeeperAdmin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgProducer() {
        return TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("monolith-housekeeper-reprocessing-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperReprocessingTopic())).admin(this.housekeeperReprocessingAdmin).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperReprocessingTopic())).clientId("monolith-housekeeper-reprocessing-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("monolith-housekeeper-reprocessing-consumer")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.housekeeperReprocessingAdmin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> createEdgeMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.edgeSettings.getTopic()));
        consumerBuilder.clientId("monolith-edge-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-edge-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeMsg>(msg.getKey(), TransportProtos.ToEdgeMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin((TbQueueAdmin)this.edgeAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> createEdgeMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-to-edge-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.edgeSettings.getTopic()));
        requestBuilder.admin((TbQueueAdmin)this.edgeAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createToEdgeNotificationsMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getEdgeNotificationsTopic(this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("monolith-edge-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-edge-notifications-consumer-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>(msg.getKey(), TransportProtos.ToEdgeNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createEdgeNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-to-edge-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.getEdgeNotificationsTopic(this.serviceInfoProvider.getServiceId()).getFullTopicName());
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        String topic = this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
        this.edgeAdmin.syncEdgeNotificationsOffsets(this.topicService.buildTopicName("monolith-edge-event-consumer"), topic);
        consumerBuilder.topic(topic);
        consumerBuilder.clientId("monolith-to-edge-event-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.edgeConsumerCount.incrementAndGet());
        consumerBuilder.groupId(topic);
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>(msg.getKey(), TransportProtos.ToEdgeEventNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.edgeEventAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-to-edge-event-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName("edge-events"));
        requestBuilder.admin(this.edgeEventAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgConsumer(TopicPartitionInfo tpi) {
        String queueName = "CalculatedFields";
        if (tpi == null) {
            throw new IllegalArgumentException("TopicPartitionInfo is required.");
        }
        TenantId tenantId = tpi.getTenantId().orElse(TenantId.SYS_TENANT_ID);
        Integer partitionId = (Integer)tpi.getPartition().orElseThrow(() -> new IllegalArgumentException("PartitionId is required."));
        String groupId = this.topicService.buildConsumerGroupId("cf-", tenantId, queueName, partitionId);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic()));
        consumerBuilder.clientId("cf-" + queueName + "-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.consumerCount.incrementAndGet());
        consumerBuilder.groupId(groupId);
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>(msg.getKey(), TransportProtos.ToCalculatedFieldMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.cfAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueAdmin getCalculatedFieldQueueAdmin() {
        return this.cfAdmin;
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-calculated-field-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic()));
        requestBuilder.admin(this.cfAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getCalculatedFieldNotificationsTopic(this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("monolith-calculated-field-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("monolith-calculated-field-notifications-consumer-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>(msg.getKey(), TransportProtos.ToCalculatedFieldNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("monolith-calculated-field-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> createCalculatedFieldStateConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.calculatedFieldSettings.getStateTopic())).readFromBeginning(true).stopWhenRead(true).clientId("monolith-calculated-field-state-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.consumerCount.incrementAndGet()).groupId(null).decoder(msg -> new TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>(msg.getKey(), msg.getData() != null ? TransportProtos.CalculatedFieldStateProto.parseFrom((byte[])msg.getData()) : null, msg.getHeaders())).admin(this.cfStateAdmin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> createCalculatedFieldStateProducer() {
        return TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("monolith-calculated-field-state-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic())).admin(this.cfStateAdmin).build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsEventsProducer() {
        return TbKafkaProducerTemplate.builder().clientId("edqs-events-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getEventsTopic())).settings(this.kafkaSettings).admin(this.edqsEventsAdmin).build();
    }

    @Override
    public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> createEdqsRequestTemplate() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestProducer = TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("edqs-request-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getRequestsTopic())).admin(this.edqsRequestsAdmin);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> responseConsumer = TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.edqsConfig.getResponsesTopic() + "." + this.serviceInfoProvider.getServiceId())).clientId("monolith-edqs-response-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("monolith-edqs-response-consumer-" + this.serviceInfoProvider.getServiceId())).decoder(msg -> new TbProtoQueueMsg<TransportProtos.FromEdqsMsg>(msg.getKey(), TransportProtos.FromEdqsMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.edqsRequestsAdmin).statsService(this.consumerStatsService);
        return DefaultTbQueueRequestTemplate.builder().queueAdmin(this.edqsRequestsAdmin).requestTemplate(requestProducer.build()).responseTemplate(responseConsumer.build()).maxPendingRequests(this.edqsConfig.getMaxPendingRequests()).maxRequestTimeout(this.edqsConfig.getMaxRequestTimeout()).pollInterval(this.edqsConfig.getPollInterval()).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> createJobStatsConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.tasksQueueConfig.getStatsTopic())).clientId("job-stats-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("job-stats-consumer-group")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.JobStatsMsg>(msg.getKey(), TransportProtos.JobStatsMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.tasksAdmin).statsService(this.consumerStatsService).build();
    }

    @PreDestroy
    private void destroy() {
        if (this.coreAdmin != null) {
            this.coreAdmin.destroy();
        }
        if (this.ruleEngineAdmin != null) {
            this.ruleEngineAdmin.destroy();
        }
        if (this.jsExecutorRequestAdmin != null) {
            this.jsExecutorRequestAdmin.destroy();
        }
        if (this.jsExecutorResponseAdmin != null) {
            this.jsExecutorResponseAdmin.destroy();
        }
        if (this.transportApiRequestAdmin != null) {
            this.transportApiRequestAdmin.destroy();
        }
        if (this.transportApiResponseAdmin != null) {
            this.transportApiResponseAdmin.destroy();
        }
        if (this.notificationAdmin != null) {
            this.notificationAdmin.destroy();
        }
        if (this.fwUpdatesAdmin != null) {
            this.fwUpdatesAdmin.destroy();
        }
        if (this.vcAdmin != null) {
            this.vcAdmin.destroy();
        }
        if (this.edgeAdmin != null) {
            this.edgeAdmin.destroy();
        }
        if (this.cfAdmin != null) {
            this.cfAdmin.destroy();
        }
    }
}

