package org.thingsboard.server.dao.sql;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/thingsboard/server/dao/sql/TbSqlBlockingQueue.class */
public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
    private static final Logger log = LoggerFactory.getLogger(TbSqlBlockingQueue.class);
    private final BlockingQueue<TbSqlQueueElement<E>> queue = new LinkedBlockingQueue();
    private final AtomicInteger addedCount = new AtomicInteger();
    private final AtomicInteger savedCount = new AtomicInteger();
    private final AtomicInteger failedCount = new AtomicInteger();
    private final TbSqlBlockingQueueParams params;
    private ExecutorService executor;
    private ScheduledLogExecutorComponent logExecutor;

    public TbSqlBlockingQueue(TbSqlBlockingQueueParams tbSqlBlockingQueueParams) {
        this.params = tbSqlBlockingQueueParams;
    }

    @Override // org.thingsboard.server.dao.sql.TbSqlQueue
    public void init(ScheduledLogExecutorComponent scheduledLogExecutorComponent, Consumer<List<E>> consumer) {
        this.logExecutor = scheduledLogExecutorComponent;
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(() -> {
            String logName = this.params.getLogName();
            int batchSize = this.params.getBatchSize();
            long maxDelay = this.params.getMaxDelay();
            ArrayList arrayList = new ArrayList(batchSize);
            while (!Thread.interrupted()) {
                try {
                    try {
                        long currentTimeMillis = System.currentTimeMillis();
                        TbSqlQueueElement<E> poll = this.queue.poll(maxDelay, TimeUnit.MILLISECONDS);
                        if (poll == null) {
                            arrayList.clear();
                        } else {
                            arrayList.add(poll);
                            this.queue.drainTo(arrayList, batchSize - 1);
                            boolean z = arrayList.size() == batchSize;
                            log.debug("[{}] Going to save {} entities", logName, Integer.valueOf(arrayList.size()));
                            consumer.accept(arrayList.stream().map((v0) -> {
                                return v0.getEntity();
                            }).collect(Collectors.toList()));
                            arrayList.forEach(tbSqlQueueElement -> {
                                tbSqlQueueElement.getFuture().set((Object) null);
                            });
                            this.savedCount.addAndGet(arrayList.size());
                            if (!z) {
                                long currentTimeMillis2 = maxDelay - (System.currentTimeMillis() - currentTimeMillis);
                                if (currentTimeMillis2 > 0) {
                                    Thread.sleep(currentTimeMillis2);
                                }
                            }
                            arrayList.clear();
                        }
                    } catch (Exception e) {
                        this.failedCount.addAndGet(arrayList.size());
                        arrayList.forEach(tbSqlQueueElement2 -> {
                            tbSqlQueueElement2.getFuture().setException(e);
                        });
                        if (e instanceof InterruptedException) {
                            log.info("[{}] Queue polling was interrupted", logName);
                            arrayList.clear();
                            return;
                        } else {
                            log.error("[{}] Failed to save {} entities", new Object[]{logName, Integer.valueOf(arrayList.size()), e});
                            arrayList.clear();
                        }
                    }
                } catch (Throwable th) {
                    arrayList.clear();
                    throw th;
                }
            }
        });
        scheduledLogExecutorComponent.scheduleAtFixedRate(() -> {
            log.info("Attributes queueSize [{}] totalAdded [{}] totalSaved [{}] totalFailed [{}]", new Object[]{Integer.valueOf(this.queue.size()), Integer.valueOf(this.addedCount.getAndSet(0)), Integer.valueOf(this.savedCount.getAndSet(0)), Integer.valueOf(this.failedCount.getAndSet(0))});
        }, this.params.getStatsPrintIntervalMs(), this.params.getStatsPrintIntervalMs(), TimeUnit.MILLISECONDS);
    }

    @Override // org.thingsboard.server.dao.sql.TbSqlQueue
    public void destroy() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Override // org.thingsboard.server.dao.sql.TbSqlQueue
    public ListenableFuture<Void> add(E e) {
        SettableFuture create = SettableFuture.create();
        this.queue.add(new TbSqlQueueElement<>(create, e));
        this.addedCount.incrementAndGet();
        return create;
    }
}
