/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.queue.memory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.memory.InMemoryStorage;

public class InMemoryTbQueueConsumer<T extends TbQueueMsg>
implements TbQueueConsumer<T> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(InMemoryTbQueueConsumer.class);
    private final InMemoryStorage storage;
    private volatile Set<TopicPartitionInfo> partitions;
    private volatile boolean stopped;
    private volatile boolean subscribed;
    private final String topic;

    public InMemoryTbQueueConsumer(InMemoryStorage storage, String topic) {
        this.storage = storage;
        this.topic = topic;
        this.stopped = false;
    }

    public String getTopic() {
        return this.topic;
    }

    public void subscribe() {
        this.partitions = Collections.singleton(new TopicPartitionInfo(this.topic, null, null, true));
        this.subscribed = true;
    }

    public void subscribe(Set<TopicPartitionInfo> partitions) {
        this.partitions = partitions;
        this.subscribed = true;
    }

    public void stop() {
        this.stopped = true;
    }

    public void unsubscribe() {
        this.stopped = true;
        this.subscribed = false;
    }

    public List<T> poll(long durationInMillis) {
        block4: {
            if (this.subscribed) {
                List messages = this.partitions.stream().map(tpi -> {
                    try {
                        return this.storage.get(tpi.getFullTopicName());
                    }
                    catch (InterruptedException e) {
                        if (!this.stopped) {
                            log.error("Queue was interrupted.", (Throwable)e);
                        }
                        return Collections.emptyList();
                    }
                }).flatMap(Collection::stream).map(msg -> (TbQueueMsg)msg).collect(Collectors.toList());
                if (messages.size() > 0) {
                    return messages;
                }
                try {
                    Thread.sleep(durationInMillis);
                }
                catch (InterruptedException e) {
                    if (this.stopped) break block4;
                    log.error("Failed to sleep.", (Throwable)e);
                }
            }
        }
        return Collections.emptyList();
    }

    public void commit() {
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public List<String> getFullTopicNames() {
        return this.partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
    }
}

