/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.provider;

import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import jakarta.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.DefaultTbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.settings.TasksQueueConfig;
import org.thingsboard.server.queue.settings.TbQueueCalculatedFieldSettings;
import org.thingsboard.server.queue.settings.TbQueueCoreSettings;
import org.thingsboard.server.queue.settings.TbQueueEdgeSettings;
import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings;

@Component
@ConditionalOnExpression(value="'${queue.type:null}'=='kafka' && '${service.type:null}'=='tb-core'")
public class KafkaTbCoreQueueFactory
implements TbCoreQueueFactory {
    private final TopicService topicService;
    private final TbKafkaSettings kafkaSettings;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final TbQueueCoreSettings coreSettings;
    private final TbQueueRuleEngineSettings ruleEngineSettings;
    private final TbQueueTransportApiSettings transportApiSettings;
    private final TbQueueRemoteJsInvokeSettings jsInvokeSettings;
    private final TbQueueVersionControlSettings vcSettings;
    private final TbKafkaConsumerStatsService consumerStatsService;
    private final TbQueueTransportNotificationSettings transportNotificationSettings;
    private final TbQueueEdgeSettings edgeSettings;
    private final TbQueueCalculatedFieldSettings calculatedFieldSettings;
    private final EdqsConfig edqsConfig;
    private final TasksQueueConfig tasksQueueConfig;
    private final TbQueueAdmin coreAdmin;
    private final TbQueueAdmin ruleEngineAdmin;
    private final TbQueueAdmin jsExecutorRequestAdmin;
    private final TbQueueAdmin jsExecutorResponseAdmin;
    private final TbQueueAdmin transportApiRequestAdmin;
    private final TbQueueAdmin transportApiResponseAdmin;
    private final TbQueueAdmin notificationAdmin;
    private final TbQueueAdmin fwUpdatesAdmin;
    private final TbQueueAdmin vcAdmin;
    private final TbQueueAdmin housekeeperAdmin;
    private final TbQueueAdmin housekeeperReprocessingAdmin;
    private final TbEdgeQueueAdmin edgeAdmin;
    private final TbQueueAdmin edgeEventAdmin;
    private final TbQueueAdmin cfAdmin;
    private final TbQueueAdmin edqsEventsAdmin;
    private final TbKafkaAdmin edqsRequestsAdmin;
    private final TbQueueAdmin tasksAdmin;
    private final AtomicLong consumerCount = new AtomicLong();
    private final AtomicLong edgeConsumerCount = new AtomicLong();

    public KafkaTbCoreQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, TbQueueTransportApiSettings transportApiSettings, TbQueueRemoteJsInvokeSettings jsInvokeSettings, TbQueueVersionControlSettings vcSettings, TbQueueEdgeSettings edgeSettings, TbKafkaConsumerStatsService consumerStatsService, TbQueueTransportNotificationSettings transportNotificationSettings, TbQueueCalculatedFieldSettings calculatedFieldSettings, EdqsConfig edqsConfig, TasksQueueConfig tasksQueueConfig, TbKafkaTopicConfigs kafkaTopicConfigs) {
        this.topicService = topicService;
        this.kafkaSettings = kafkaSettings;
        this.serviceInfoProvider = serviceInfoProvider;
        this.coreSettings = coreSettings;
        this.ruleEngineSettings = ruleEngineSettings;
        this.transportApiSettings = transportApiSettings;
        this.jsInvokeSettings = jsInvokeSettings;
        this.vcSettings = vcSettings;
        this.consumerStatsService = consumerStatsService;
        this.transportNotificationSettings = transportNotificationSettings;
        this.edgeSettings = edgeSettings;
        this.calculatedFieldSettings = calculatedFieldSettings;
        this.edqsConfig = edqsConfig;
        this.tasksQueueConfig = tasksQueueConfig;
        this.coreAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCoreConfigs());
        this.ruleEngineAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getRuleEngineConfigs());
        this.jsExecutorRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorRequestConfigs());
        this.jsExecutorResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorResponseConfigs());
        this.transportApiRequestAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiRequestConfigs());
        this.transportApiResponseAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiResponseConfigs());
        this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
        this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
        this.vcAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getVcConfigs());
        this.housekeeperAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperConfigs());
        this.housekeeperReprocessingAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getHousekeeperReprocessingConfigs());
        this.edgeAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeConfigs());
        this.edgeEventAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
        this.cfAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getCalculatedFieldConfigs());
        this.edqsEventsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsEventsConfigs());
        this.edqsRequestsAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdqsRequestsConfigs());
        this.tasksAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTasksConfigs());
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> createTransportNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-transport-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.transportNotificationSettings.getNotificationsTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createRuleEngineMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-rule-engine-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getTopic()));
        requestBuilder.admin(this.coreAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineNotificationMsg>> createRuleEngineNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-rule-engine-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.ruleEngineSettings.getTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createTbCoreMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-to-core-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getTopic()));
        requestBuilder.admin(this.coreAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createTbCoreNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-to-core-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.getNotificationsTopic(ServiceType.TB_CORE, this.serviceInfoProvider.getServiceId()).getFullTopicName());
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> createToCoreMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getTopic()));
        consumerBuilder.clientId("tb-core-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.consumerCount.incrementAndGet());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-node"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCoreMsg>(msg.getKey(), TransportProtos.ToCoreMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.coreAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>> createToCoreNotificationsMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getNotificationsTopic(ServiceType.TB_CORE, this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("tb-core-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-notifications-node-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToCoreNotificationMsg>(msg.getKey(), TransportProtos.ToCoreNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> createTransportApiRequestConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.transportApiSettings.getRequestsTopic()));
        consumerBuilder.clientId("tb-core-transport-api-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-transport-api-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>(msg.getKey(), TransportProtos.TransportApiRequestMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.transportApiRequestAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> createTransportApiResponseProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-transport-api-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.transportApiSettings.getResponsesTopic()));
        requestBuilder.admin(this.transportApiResponseAdmin);
        return requestBuilder.build();
    }

    @Override
    @Bean
    public TbQueueRequestTemplate<TbProtoJsQueueMsg<JsInvokeProtos.RemoteJsRequest>, TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>> createRemoteJsRequestTemplate() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("producer-js-invoke-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.jsInvokeSettings.getRequestTopic());
        requestBuilder.admin(this.jsExecutorRequestAdmin);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> responseBuilder = TbKafkaConsumerTemplate.builder();
        responseBuilder.settings(this.kafkaSettings);
        responseBuilder.topic(this.jsInvokeSettings.getResponseTopic() + "." + this.serviceInfoProvider.getServiceId());
        responseBuilder.clientId("js-" + this.serviceInfoProvider.getServiceId());
        responseBuilder.groupId(this.topicService.buildTopicName("rule-engine-node-") + this.serviceInfoProvider.getServiceId());
        responseBuilder.decoder(msg -> {
            JsInvokeProtos.RemoteJsResponse.Builder builder = JsInvokeProtos.RemoteJsResponse.newBuilder();
            JsonFormat.parser().ignoringUnknownFields().merge(new String(msg.getData(), StandardCharsets.UTF_8), (Message.Builder)builder);
            return new TbProtoQueueMsg<JsInvokeProtos.RemoteJsResponse>(msg.getKey(), builder.build(), msg.getHeaders());
        });
        responseBuilder.admin(this.jsExecutorResponseAdmin);
        responseBuilder.statsService(this.consumerStatsService);
        DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder builder = DefaultTbQueueRequestTemplate.builder();
        builder.queueAdmin(this.jsExecutorResponseAdmin);
        builder.requestTemplate(requestBuilder.build());
        builder.responseTemplate(responseBuilder.build());
        builder.maxPendingRequests(this.jsInvokeSettings.getMaxPendingRequests());
        builder.maxRequestTimeout(this.jsInvokeSettings.getMaxRequestsTimeout());
        builder.pollInterval(this.jsInvokeSettings.getResponsePollInterval());
        return builder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getUsageStatsTopic()));
        consumerBuilder.clientId("tb-core-us-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-us-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>(msg.getKey(), TransportProtos.ToUsageStatsServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.coreAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.coreSettings.getOtaPackageTopic()));
        consumerBuilder.clientId("tb-core-ota-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-ota-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>(msg.getKey(), TransportProtos.ToOtaPackageStateServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.fwUpdatesAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToOtaPackageStateServiceMsg>> createToOtaPackageStateServiceMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-ota-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getOtaPackageTopic()));
        requestBuilder.admin(this.fwUpdatesAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-us-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.coreSettings.getUsageStatsTopic()));
        requestBuilder.admin(this.coreAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToVersionControlServiceMsg>> createVersionControlMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-vc-producer-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.vcSettings.getTopic()));
        requestBuilder.admin(this.vcAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgProducer() {
        return TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("tb-core-housekeeper-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperTopic())).admin(this.housekeeperAdmin).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperMsgConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperTopic())).clientId("tb-core-housekeeper-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("tb-core-housekeeper-consumer")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.housekeeperAdmin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgProducer() {
        return TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("tb-core-housekeeper-reprocessing-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperReprocessingTopic())).admin(this.housekeeperReprocessingAdmin).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> createHousekeeperReprocessingMsgConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.coreSettings.getHousekeeperReprocessingTopic())).clientId("tb-core-housekeeper-reprocessing-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("tb-core-housekeeper-reprocessing-consumer")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>(msg.getKey(), TransportProtos.ToHousekeeperServiceMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.housekeeperReprocessingAdmin).statsService(this.consumerStatsService).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> createEdgeMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.buildTopicName(this.edgeSettings.getTopic()));
        consumerBuilder.clientId("tb-core-edge-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-core-edge-consumer"));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeMsg>(msg.getKey(), TransportProtos.ToEdgeMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin((TbQueueAdmin)this.edgeAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeMsg>> createEdgeMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-to-edge-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.edgeSettings.getTopic()));
        requestBuilder.admin((TbQueueAdmin)this.edgeAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createToEdgeNotificationsMsgConsumer() {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        consumerBuilder.topic(this.topicService.getEdgeNotificationsTopic(this.serviceInfoProvider.getServiceId()).getFullTopicName());
        consumerBuilder.clientId("tb-edge-notifications-consumer-" + this.serviceInfoProvider.getServiceId());
        consumerBuilder.groupId(this.topicService.buildTopicName("tb-edge-notifications-node-" + this.serviceInfoProvider.getServiceId()));
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>(msg.getKey(), TransportProtos.ToEdgeNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.notificationAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeNotificationMsg>> createEdgeNotificationsMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-to-edge-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.getEdgeNotificationsTopic(this.serviceInfoProvider.getServiceId()).getFullTopicName());
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgConsumer(TenantId tenantId, EdgeId edgeId) {
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> consumerBuilder = TbKafkaConsumerTemplate.builder();
        consumerBuilder.settings(this.kafkaSettings);
        String topic = this.topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId).getTopic();
        this.edgeAdmin.syncEdgeNotificationsOffsets(this.topicService.buildTopicName("tb-core-edge-event-consumer"), topic);
        consumerBuilder.topic(topic);
        consumerBuilder.clientId("tb-core-edge-event-consumer-" + this.serviceInfoProvider.getServiceId() + "-" + this.edgeConsumerCount.incrementAndGet());
        consumerBuilder.groupId(topic);
        consumerBuilder.decoder(msg -> new TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>(msg.getKey(), TransportProtos.ToEdgeEventNotificationMsg.parseFrom((byte[])msg.getData()), msg.getHeaders()));
        consumerBuilder.admin(this.edgeEventAdmin);
        consumerBuilder.statsService(this.consumerStatsService);
        return consumerBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdgeEventNotificationMsg>> createEdgeEventMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-edge-event-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName("edge-events"));
        requestBuilder.admin(this.edgeEventAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> createToCalculatedFieldMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-to-calculated-field-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic()));
        requestBuilder.admin(this.cfAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldNotificationMsg>> createToCalculatedFieldNotificationMsgProducer() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestBuilder = TbKafkaProducerTemplate.builder();
        requestBuilder.settings(this.kafkaSettings);
        requestBuilder.clientId("tb-core-calculated-field-notifications-" + this.serviceInfoProvider.getServiceId());
        requestBuilder.defaultTopic(this.topicService.buildTopicName(this.calculatedFieldSettings.getEventTopic()));
        requestBuilder.admin(this.notificationAdmin);
        return requestBuilder.build();
    }

    @Override
    public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>> createEdqsEventsProducer() {
        return TbKafkaProducerTemplate.builder().clientId("edqs-events-producer-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getEventsTopic())).settings(this.kafkaSettings).admin(this.edqsEventsAdmin).build();
    }

    @Override
    public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.ToEdqsMsg>, TbProtoQueueMsg<TransportProtos.FromEdqsMsg>> createEdqsRequestTemplate() {
        TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder requestProducer = TbKafkaProducerTemplate.builder().settings(this.kafkaSettings).clientId("edqs-request-" + this.serviceInfoProvider.getServiceId()).defaultTopic(this.topicService.buildTopicName(this.edqsConfig.getRequestsTopic())).admin(this.edqsRequestsAdmin);
        TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg> responseConsumer = TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.edqsConfig.getResponsesTopic() + "." + this.serviceInfoProvider.getServiceId())).clientId("tb-core-edqs-response-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("tb-core-edqs-response-consumer-" + this.serviceInfoProvider.getServiceId())).decoder(msg -> new TbProtoQueueMsg<TransportProtos.FromEdqsMsg>(msg.getKey(), TransportProtos.FromEdqsMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.edqsRequestsAdmin).statsService(this.consumerStatsService);
        return DefaultTbQueueRequestTemplate.builder().queueAdmin(this.edqsRequestsAdmin).requestTemplate(requestProducer.build()).responseTemplate(responseConsumer.build()).maxPendingRequests(this.edqsConfig.getMaxPendingRequests()).maxRequestTimeout(this.edqsConfig.getMaxRequestTimeout()).pollInterval(this.edqsConfig.getPollInterval()).build();
    }

    @Override
    public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.JobStatsMsg>> createJobStatsConsumer() {
        return TbKafkaConsumerTemplate.builder().settings(this.kafkaSettings).topic(this.topicService.buildTopicName(this.tasksQueueConfig.getStatsTopic())).clientId("job-stats-consumer-" + this.serviceInfoProvider.getServiceId()).groupId(this.topicService.buildTopicName("job-stats-consumer-group")).decoder(msg -> new TbProtoQueueMsg<TransportProtos.JobStatsMsg>(msg.getKey(), TransportProtos.JobStatsMsg.parseFrom((byte[])msg.getData()), msg.getHeaders())).admin(this.tasksAdmin).statsService(this.consumerStatsService).build();
    }

    @PreDestroy
    private void destroy() {
        if (this.coreAdmin != null) {
            this.coreAdmin.destroy();
        }
        if (this.ruleEngineAdmin != null) {
            this.ruleEngineAdmin.destroy();
        }
        if (this.jsExecutorRequestAdmin != null) {
            this.jsExecutorRequestAdmin.destroy();
        }
        if (this.jsExecutorResponseAdmin != null) {
            this.jsExecutorResponseAdmin.destroy();
        }
        if (this.transportApiRequestAdmin != null) {
            this.transportApiRequestAdmin.destroy();
        }
        if (this.transportApiResponseAdmin != null) {
            this.transportApiResponseAdmin.destroy();
        }
        if (this.notificationAdmin != null) {
            this.notificationAdmin.destroy();
        }
        if (this.fwUpdatesAdmin != null) {
            this.fwUpdatesAdmin.destroy();
        }
        if (this.vcAdmin != null) {
            this.vcAdmin.destroy();
        }
        if (this.cfAdmin != null) {
            this.cfAdmin.destroy();
        }
    }
}

