/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.service.cf.ctx.state;

import com.google.protobuf.GeneratedMessageV3;
import java.beans.ConstructorProperties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgHeaders;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.common.AbstractTbQueueTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.PartitionedQueueConsumerManager;
import org.thingsboard.server.queue.common.state.KafkaQueueStateService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.kafka.TbKafkaProducerTemplate;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.service.cf.AbstractCalculatedFieldStateService;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;

@Service
@ConditionalOnExpression(value="('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine') && '${queue.type:null}'=='kafka'")
public class KafkaCalculatedFieldStateService
extends AbstractCalculatedFieldStateService {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaCalculatedFieldStateService.class);
    private final TbRuleEngineQueueFactory queueFactory;
    private final PartitionService partitionService;
    @Value(value="${queue.calculated_fields.poll_interval:25}")
    private long pollInterval;
    @Value(value="${queue.calculated_fields.pack_processing_timeout:60000}")
    private long packProcessingTimeout;
    private TbKafkaProducerTemplate<TbProtoQueueMsg<TransportProtos.CalculatedFieldStateProto>> stateProducer;
    private final AtomicInteger counter = new AtomicInteger();

    @Override
    public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToCalculatedFieldMsg>> eventConsumer) {
        QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "CalculatedFieldStates");
        PartitionedQueueConsumerManager stateConsumer = PartitionedQueueConsumerManager.create().queueKey((Object)queueKey).topic(this.partitionService.getTopic(queueKey)).pollInterval(this.pollInterval).msgPackProcessor((msgs, consumer, consumerKey, config) -> {
            final CountDownLatch completionLatch = new CountDownLatch(msgs.size());
            for (final TbProtoQueueMsg msg : msgs) {
                TbCallback callback = new TbCallback(){

                    public void onSuccess() {
                        int processedMsgCount = KafkaCalculatedFieldStateService.this.counter.incrementAndGet();
                        if (processedMsgCount % 10000 == 0) {
                            log.info("Processed {} CF state messages", (Object)processedMsgCount);
                        }
                        completionLatch.countDown();
                    }

                    public void onFailure(Throwable t) {
                        log.error("Failed to process CF state message: {}", (Object)msg, (Object)t);
                        completionLatch.countDown();
                    }
                };
                try {
                    if (msg.getValue() != null) {
                        this.processRestoredState((TransportProtos.CalculatedFieldStateProto)msg.getValue(), callback);
                        continue;
                    }
                    this.processRestoredState(this.getStateId(msg.getHeaders()), null, callback);
                }
                catch (Throwable t) {
                    callback.onFailure(t);
                }
            }
            boolean success = completionLatch.await(this.packProcessingTimeout, TimeUnit.MILLISECONDS);
            if (!success) {
                log.error("Timeout to process CF state messages pack of size {}", (Object)msgs.size());
            }
        }).consumerCreator((queueConfig, tpi) -> this.queueFactory.createCalculatedFieldStateConsumer()).queueAdmin(this.queueFactory.getCalculatedFieldQueueAdmin()).consumerExecutor(eventConsumer.getConsumerExecutor()).scheduler(eventConsumer.getScheduler()).taskExecutor(eventConsumer.getTaskExecutor()).build();
        this.stateService = KafkaQueueStateService.builder().eventConsumer(eventConsumer).stateConsumer(stateConsumer).build();
        this.stateProducer = (TbKafkaProducerTemplate)this.queueFactory.createCalculatedFieldStateProducer();
    }

    @Override
    protected void doPersist(final CalculatedFieldEntityCtxId stateId, TransportProtos.CalculatedFieldStateProto stateMsgProto, TbCallback callback) {
        TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_RULE_ENGINE, "CalculatedFieldStates", stateId.tenantId(), stateId.entityId());
        TbProtoQueueMsg msg = new TbProtoQueueMsg(stateId.entityId().getId(), (GeneratedMessageV3)stateMsgProto);
        if (stateMsgProto == null) {
            this.putStateId(msg.getHeaders(), stateId);
        }
        this.stateProducer.send(tpi, stateId.toKey(), (TbQueueMsg)msg, new TbQueueCallback(){

            public void onSuccess(TbQueueMsgMetadata metadata) {
            }

            public void onFailure(Throwable t) {
                log.error("Failed to send state message: {}", (Object)stateId, (Object)t);
            }
        });
        callback.onSuccess();
    }

    @Override
    protected void doRemove(CalculatedFieldEntityCtxId stateId, TbCallback callback) {
        this.doPersist(stateId, null, callback);
    }

    private void putStateId(TbQueueMsgHeaders headers, CalculatedFieldEntityCtxId stateId) {
        headers.put("tenantId", AbstractTbQueueTemplate.uuidToBytes((UUID)stateId.tenantId().getId()));
        headers.put("cfId", AbstractTbQueueTemplate.uuidToBytes((UUID)stateId.cfId().getId()));
        headers.put("entityId", AbstractTbQueueTemplate.uuidToBytes((UUID)stateId.entityId().getId()));
        headers.put("entityType", AbstractTbQueueTemplate.stringToBytes((String)stateId.entityId().getEntityType().name()));
    }

    private CalculatedFieldEntityCtxId getStateId(TbQueueMsgHeaders headers) {
        TenantId tenantId = TenantId.fromUUID((UUID)AbstractTbQueueTemplate.bytesToUuid((byte[])headers.get("tenantId")));
        CalculatedFieldId cfId = new CalculatedFieldId(AbstractTbQueueTemplate.bytesToUuid((byte[])headers.get("cfId")));
        EntityId entityId = EntityIdFactory.getByTypeAndUuid((String)AbstractTbQueueTemplate.bytesToString((byte[])headers.get("entityType")), (UUID)AbstractTbQueueTemplate.bytesToUuid((byte[])headers.get("entityId")));
        return new CalculatedFieldEntityCtxId(tenantId, cfId, entityId);
    }

    @Override
    public void stop() {
        super.stop();
        this.stateProducer.stop();
    }

    @ConstructorProperties(value={"queueFactory", "partitionService"})
    @Generated
    public KafkaCalculatedFieldStateService(TbRuleEngineQueueFactory queueFactory, PartitionService partitionService) {
        this.queueFactory = queueFactory;
        this.partitionService = partitionService;
    }
}

