package org.thingsboard.server.service.housekeeper;

import jakarta.annotation.PreDestroy;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.service.ContextAwareActor;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.housekeeper.HousekeeperConfig;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;

@TbCoreComponent
@Service
/* loaded from: input_file:org/thingsboard/server/service/housekeeper/HousekeeperReprocessingService.class */
public class HousekeeperReprocessingService {
    private static final Logger log = LoggerFactory.getLogger(HousekeeperReprocessingService.class);
    private final HousekeeperConfig config;
    private final HousekeeperService housekeeperService;
    private final QueueConsumerManager<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> consumer;
    private final TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> producer;
    private final TopicPartitionInfo submitTpi;
    private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("housekeeper-reprocessing-consumer"));

    public HousekeeperReprocessingService(HousekeeperConfig housekeeperConfig, @Lazy HousekeeperService housekeeperService, TbCoreQueueFactory tbCoreQueueFactory) {
        this.config = housekeeperConfig;
        this.housekeeperService = housekeeperService;
        QueueConsumerManager.QueueConsumerManagerBuilder pollInterval = QueueConsumerManager.builder().name("Housekeeper reprocessing").msgPackProcessor(this::processMsgs).pollInterval(housekeeperConfig.getPollInterval());
        Objects.requireNonNull(tbCoreQueueFactory);
        this.consumer = pollInterval.consumerCreator(tbCoreQueueFactory::createHousekeeperReprocessingMsgConsumer).consumerExecutor(this.consumerExecutor).build();
        this.producer = tbCoreQueueFactory.createHousekeeperReprocessingMsgProducer();
        this.submitTpi = TopicPartitionInfo.builder().topic(this.producer.getDefaultTopic()).build();
    }

    @AfterStartUp(order = 11)
    public void afterStartUp() {
        this.consumer.subscribe();
        this.consumer.launch();
    }

    private void processMsgs(List<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> list, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg>> tbQueueConsumer) throws Exception {
        Thread.sleep(this.config.getTaskReprocessingDelay());
        for (TbProtoQueueMsg<TransportProtos.ToHousekeeperServiceMsg> tbProtoQueueMsg : list) {
            log.trace("Reprocessing task: {}", tbProtoQueueMsg);
            try {
                this.housekeeperService.processTask((TransportProtos.ToHousekeeperServiceMsg) tbProtoQueueMsg.getValue());
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th) {
                log.error("Unexpected error during message reprocessing [{}]", tbProtoQueueMsg, th);
                submitForReprocessing((TransportProtos.ToHousekeeperServiceMsg) tbProtoQueueMsg.getValue(), th);
            }
        }
        tbQueueConsumer.commit();
    }

    public void submitForReprocessing(TransportProtos.ToHousekeeperServiceMsg toHousekeeperServiceMsg, Throwable th) {
        TransportProtos.HousekeeperTaskProto task = toHousekeeperServiceMsg.getTask();
        LinkedHashSet linkedHashSet = new LinkedHashSet((Collection) task.getErrorsList());
        linkedHashSet.add(StringUtils.truncate(ExceptionUtils.getStackTrace(th), ContextAwareActor.ENTITY_PACK_LIMIT));
        TransportProtos.ToHousekeeperServiceMsg build = toHousekeeperServiceMsg.toBuilder().setTask(task.toBuilder().setAttempt(task.getAttempt() + 1).clearErrors().addAllErrors(linkedHashSet).build()).build();
        log.trace("Submitting for reprocessing: {}", build);
        this.producer.send(this.submitTpi, new TbProtoQueueMsg(UUID.randomUUID(), build), (TbQueueCallback) null);
    }

    @PreDestroy
    private void stop() {
        this.consumer.stop();
        this.consumerExecutor.shutdownNow();
        log.info("Stopped Housekeeper reprocessing service");
    }
}
