/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.common.consumer;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;

public class TbQueueConsumerTask<M extends TbQueueMsg> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TbQueueConsumerTask.class);
    private final Object key;
    private volatile TbQueueConsumer<M> consumer;
    private volatile Supplier<TbQueueConsumer<M>> consumerSupplier;
    private final Runnable callback;
    private Future<?> task;

    public TbQueueConsumerTask(Object key, Supplier<TbQueueConsumer<M>> consumerSupplier, Runnable callback) {
        this.key = key;
        this.consumer = null;
        this.consumerSupplier = consumerSupplier;
        this.callback = callback;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TbQueueConsumer<M> getConsumer() {
        if (this.consumer == null) {
            TbQueueConsumerTask tbQueueConsumerTask = this;
            synchronized (tbQueueConsumerTask) {
                if (this.consumer == null) {
                    Objects.requireNonNull(this.consumerSupplier, "consumerSupplier for key [" + String.valueOf(this.key) + "] is null");
                    this.consumer = this.consumerSupplier.get();
                    Objects.requireNonNull(this.consumer, "consumer for key [" + String.valueOf(this.key) + "] is null");
                    this.consumerSupplier = null;
                }
            }
        }
        return this.consumer;
    }

    public void subscribe(Set<TopicPartitionInfo> partitions) {
        log.trace("[{}] Subscribing to partitions: {}", this.key, partitions);
        this.getConsumer().subscribe(partitions);
    }

    public void initiateStop() {
        log.debug("[{}] Initiating stop", this.key);
        this.getConsumer().stop();
    }

    public void awaitCompletion() {
        this.awaitCompletion(30);
    }

    public void awaitCompletion(int timeoutSec) {
        log.trace("[{}] Awaiting finish", this.key);
        if (this.isRunning()) {
            try {
                if (timeoutSec > 0) {
                    this.task.get(timeoutSec, TimeUnit.SECONDS);
                } else {
                    this.task.get();
                }
                log.trace("[{}] Awaited finish", this.key);
            }
            catch (Exception e) {
                log.warn("[{}] Failed to await for consumer to stop (timeout {} sec)", new Object[]{this.key, timeoutSec, e});
            }
            this.task = null;
        }
    }

    public boolean isRunning() {
        return this.task != null;
    }

    @Generated
    public Object getKey() {
        return this.key;
    }

    @Generated
    public Runnable getCallback() {
        return this.callback;
    }

    @Generated
    public void setTask(Future<?> task) {
        this.task = task;
    }
}

