package org.thingsboard.server.service.cf.ctx.state;

import java.beans.ConstructorProperties;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.controller.RuleEngineController;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.AbstractTbQueueTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats;

@Service
@ConditionalOnExpression("('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine') && '${queue.type:null}'=='kafka'")
/* loaded from: input_file:org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.class */
public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldStateService {
    private static final Logger log = LoggerFactory.getLogger(KafkaCalculatedFieldStateService.class);
    private final TbRuleEngineQueueFactory queueFactory;
    private final PartitionService partitionService;

    @Value("${queue.calculated_fields.poll_interval:25}")
    private long pollInterval;
    private TbKafkaProducerTemplate<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> stateProducer;
    private final AtomicInteger counter = new AtomicInteger();

    @Override // org.thingsboard.server.service.cf.CalculatedFieldStateService
    public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> partitionedQueueConsumerManager) {
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "CalculatedFieldStates");
        this.stateService = KafkaQueueStateService.builder().eventConsumer(partitionedQueueConsumerManager).stateConsumer(PartitionedQueueConsumerManager.create().queueKey(queueKey).topic(this.partitionService.getTopic(queueKey)).pollInterval(this.pollInterval).msgPackProcessor((list, tbQueueConsumer, queueConfig) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                TbProtoQueueMsg tbProtoQueueMsg = (TbProtoQueueMsg) it.next();
                try {
                    if (tbProtoQueueMsg.getValue() != null) {
                        processRestoredState((TransportProtos.CalculatedFieldStateProto) tbProtoQueueMsg.getValue());
                    } else {
                        processRestoredState(getStateId(tbProtoQueueMsg.getHeaders()), null);
                    }
                } catch (Throwable th) {
                    log.error("Failed to process state message: {}", tbProtoQueueMsg, th);
                }
                int incrementAndGet = this.counter.incrementAndGet();
                if (incrementAndGet % RuleEngineController.DEFAULT_TIMEOUT == 0) {
                    log.info("Processed {} calculated field state msgs", Integer.valueOf(incrementAndGet));
                }
            }
        }).consumerCreator((queueConfig2, topicPartitionInfo) -> {
            return this.queueFactory.createCalculatedFieldStateConsumer();
        }).queueAdmin(this.queueFactory.getCalculatedFieldQueueAdmin()).consumerExecutor(partitionedQueueConsumerManager.getConsumerExecutor()).scheduler(partitionedQueueConsumerManager.getScheduler()).taskExecutor(partitionedQueueConsumerManager.getTaskExecutor()).build()).build();
        this.stateProducer = this.queueFactory.createCalculatedFieldStateProducer();
    }

    @Override // org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService
    protected void doPersist(CalculatedFieldEntityCtxId calculatedFieldEntityCtxId, TransportProtos.CalculatedFieldStateProto calculatedFieldStateProto, final TbCallback tbCallback) {
        TopicPartitionInfo resolve = this.partitionService.resolve(ServiceType.TB_RULE_ENGINE, "CalculatedFieldStates", calculatedFieldEntityCtxId.tenantId(), calculatedFieldEntityCtxId.entityId());
        TbProtoQueueMsg tbProtoQueueMsg = new TbProtoQueueMsg(calculatedFieldEntityCtxId.entityId().getId(), calculatedFieldStateProto);
        if (calculatedFieldStateProto == null) {
            putStateId(tbProtoQueueMsg.getHeaders(), calculatedFieldEntityCtxId);
        }
        this.stateProducer.send(resolve, calculatedFieldEntityCtxId.toKey(), tbProtoQueueMsg, new TbQueueCallback() { // from class: org.thingsboard.server.service.cf.ctx.state.KafkaCalculatedFieldStateService.1
            public void onSuccess(TbQueueMsgMetadata tbQueueMsgMetadata) {
                if (tbCallback != null) {
                    tbCallback.onSuccess();
                }
            }

            public void onFailure(Throwable th) {
                if (tbCallback != null) {
                    tbCallback.onFailure(th);
                }
            }
        });
    }

    @Override // org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService
    protected void doRemove(CalculatedFieldEntityCtxId calculatedFieldEntityCtxId, TbCallback tbCallback) {
        doPersist(calculatedFieldEntityCtxId, null, tbCallback);
    }

    private void putStateId(TbQueueMsgHeaders tbQueueMsgHeaders, CalculatedFieldEntityCtxId calculatedFieldEntityCtxId) {
        tbQueueMsgHeaders.put(TbRuleEngineConsumerStats.TENANT_ID_TAG, AbstractTbQueueTemplate.uuidToBytes(calculatedFieldEntityCtxId.tenantId().getId()));
        tbQueueMsgHeaders.put("cfId", AbstractTbQueueTemplate.uuidToBytes(calculatedFieldEntityCtxId.cfId().getId()));
        tbQueueMsgHeaders.put("entityId", AbstractTbQueueTemplate.uuidToBytes(calculatedFieldEntityCtxId.entityId().getId()));
        tbQueueMsgHeaders.put("entityType", AbstractTbQueueTemplate.stringToBytes(calculatedFieldEntityCtxId.entityId().getEntityType().name()));
    }

    private CalculatedFieldEntityCtxId getStateId(TbQueueMsgHeaders tbQueueMsgHeaders) {
        return new CalculatedFieldEntityCtxId(TenantId.fromUUID(AbstractTbQueueTemplate.bytesToUuid(tbQueueMsgHeaders.get(TbRuleEngineConsumerStats.TENANT_ID_TAG))), new CalculatedFieldId(AbstractTbQueueTemplate.bytesToUuid(tbQueueMsgHeaders.get("cfId"))), EntityIdFactory.getByTypeAndUuid(AbstractTbQueueTemplate.bytesToString(tbQueueMsgHeaders.get("entityType")), AbstractTbQueueTemplate.bytesToUuid(tbQueueMsgHeaders.get("entityId"))));
    }

    @Override // org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService, org.thingsboard.server.service.cf.CalculatedFieldStateService
    public void stop() {
        super.stop();
        this.stateProducer.stop();
    }

    @ConstructorProperties({"queueFactory", "partitionService"})
    public KafkaCalculatedFieldStateService(TbRuleEngineQueueFactory tbRuleEngineQueueFactory, PartitionService partitionService) {
        this.queueFactory = tbRuleEngineQueueFactory;
        this.partitionService = partitionService;
    }
}
