package org.thingsboard.server.dao.util;

import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.util.AsyncTask;

/* loaded from: input_file:org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.class */
public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extends ListenableFuture<V>, V> implements BufferedRateExecutor<T, F> {
    private static final Logger log = LoggerFactory.getLogger(AbstractBufferedRateExecutor.class);
    public static final String CONCURRENCY_LEVEL = "currBuffer";
    private final long maxWaitTime;
    private final long pollMs;
    private final BlockingQueue<AsyncTaskContext<T, V>> queue;
    private final ExecutorService dispatcherExecutor;
    private final ExecutorService callbackExecutor;
    private final int concurrencyLimit;
    private final int printQueriesFreq;
    protected final AtomicInteger concurrencyLevel;
    protected final BufferedRateExecutorStats stats;
    private final EntityService entityService;
    private final TbTenantProfileCache tenantProfileCache;
    private final boolean printTenantNames;
    private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap();
    private final AtomicInteger printQueriesIdx = new AtomicInteger(0);
    private final Map<TenantId, String> tenantNamesCache = new HashMap();
    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-timeout"));

    public AbstractBufferedRateExecutor(int i, int i2, long j, int i3, int i4, long j2, int i5, StatsFactory statsFactory, EntityService entityService, TbTenantProfileCache tbTenantProfileCache, boolean z) {
        this.maxWaitTime = j;
        this.pollMs = j2;
        this.concurrencyLimit = i2;
        this.printQueriesFreq = i5;
        this.queue = new LinkedBlockingDeque(i);
        this.dispatcherExecutor = Executors.newFixedThreadPool(i3, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher"));
        this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(i4, "nosql-" + getBufferName() + "-callback");
        this.stats = new BufferedRateExecutorStats(statsFactory);
        this.concurrencyLevel = (AtomicInteger) statsFactory.createGauge(StatsType.RATE_EXECUTOR.getName() + ".currBuffer" + getBufferName(), new AtomicInteger(0), new String[0]);
        this.entityService = entityService;
        this.tenantProfileCache = tbTenantProfileCache;
        this.printTenantNames = z;
        for (int i6 = 0; i6 < i3; i6++) {
            this.dispatcherExecutor.submit(this::dispatch);
        }
    }

    @Override // org.thingsboard.server.dao.util.BufferedRateExecutor
    public F submit(T t) {
        SettableFuture<V> create = create();
        F wrap = wrap(t, create);
        boolean z = false;
        DefaultTenantProfileConfiguration defaultProfileConfiguration = (t.getTenantId() == null || TenantId.SYS_TENANT_ID.equals(t.getTenantId())) ? null : this.tenantProfileCache.get(t.getTenantId()).getDefaultProfileConfiguration();
        if (defaultProfileConfiguration == null || !StringUtils.isNotEmpty(defaultProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration())) {
            if (!TenantId.SYS_TENANT_ID.equals(t.getTenantId())) {
                this.perTenantLimits.remove(t.getTenantId());
            }
        } else if (t.getTenantId() == null) {
            log.info("Invalid task received: {}", t);
        } else if (!t.getTenantId().isNullUid() && !this.perTenantLimits.computeIfAbsent(t.getTenantId(), tenantId -> {
            return new TbRateLimits(defaultProfileConfiguration.getCassandraQueryTenantRateLimitsConfiguration());
        }).tryConsume()) {
            this.stats.incrementRateLimitedTenant(t.getTenantId());
            this.stats.getTotalRateLimited().increment();
            create.setException(new TenantRateLimitException());
            z = true;
        }
        if (!z) {
            try {
                this.stats.getTotalAdded().increment();
                this.queue.add(new AsyncTaskContext<>(UUID.randomUUID(), t, create, System.currentTimeMillis()));
            } catch (IllegalStateException e) {
                this.stats.getTotalRejected().increment();
                create.setException(e);
            }
        }
        return wrap;
    }

    public void stop() {
        if (this.dispatcherExecutor != null) {
            this.dispatcherExecutor.shutdownNow();
        }
        if (this.callbackExecutor != null) {
            this.callbackExecutor.shutdownNow();
        }
        if (this.timeoutExecutor != null) {
            this.timeoutExecutor.shutdownNow();
        }
    }

    protected abstract SettableFuture<V> create();

    protected abstract F wrap(T t, SettableFuture<V> settableFuture);

    protected abstract ListenableFuture<V> execute(AsyncTaskContext<T, V> asyncTaskContext);

    public abstract String getBufferName();

    private void dispatch() {
        log.info("Buffered rate executor thread started");
        while (!Thread.interrupted()) {
            AsyncTaskContext asyncTaskContext = null;
            try {
                if (this.concurrencyLevel.get() <= this.concurrencyLimit) {
                    final AsyncTaskContext<T, V> take = this.queue.take();
                    if (this.printQueriesFreq > 0 && this.printQueriesIdx.incrementAndGet() >= this.printQueriesFreq) {
                        this.printQueriesIdx.set(0);
                        log.info("[{}] Cassandra query: {}", take.getId(), queryToString(take));
                    }
                    logTask("Processing", take);
                    this.concurrencyLevel.incrementAndGet();
                    long createTime = (take.getCreateTime() + this.maxWaitTime) - System.currentTimeMillis();
                    if (createTime > 0) {
                        this.stats.getTotalLaunched().increment();
                        Futures.addCallback(Futures.withTimeout(execute(take), createTime, TimeUnit.MILLISECONDS, this.timeoutExecutor), new FutureCallback<V>() { // from class: org.thingsboard.server.dao.util.AbstractBufferedRateExecutor.1
                            public void onSuccess(@Nullable V v) {
                                AbstractBufferedRateExecutor.this.logTask("Releasing", take);
                                AbstractBufferedRateExecutor.this.stats.getTotalReleased().increment();
                                AbstractBufferedRateExecutor.this.concurrencyLevel.decrementAndGet();
                                take.getFuture().set(v);
                            }

                            public void onFailure(Throwable th) {
                                if (th instanceof TimeoutException) {
                                    AbstractBufferedRateExecutor.this.logTask("Expired During Execution", take);
                                } else {
                                    AbstractBufferedRateExecutor.this.logTask("Failed", take);
                                }
                                AbstractBufferedRateExecutor.this.stats.getTotalFailed().increment();
                                AbstractBufferedRateExecutor.this.concurrencyLevel.decrementAndGet();
                                take.getFuture().setException(th);
                                AbstractBufferedRateExecutor.log.debug("[{}] Failed to execute task: {}", new Object[]{take.getId(), take.getTask(), th});
                            }
                        }, this.callbackExecutor);
                    } else {
                        logTask("Expired Before Execution", take);
                        this.stats.getTotalExpired().increment();
                        this.concurrencyLevel.decrementAndGet();
                        take.getFuture().setException(new TimeoutException());
                    }
                } else {
                    Thread.sleep(this.pollMs);
                }
            } catch (InterruptedException e) {
            } catch (Throwable th) {
                if (0 != 0) {
                    log.debug("[{}] Failed to execute task: {}", new Object[]{asyncTaskContext.getId(), null, th});
                    this.stats.getTotalFailed().increment();
                    this.concurrencyLevel.decrementAndGet();
                } else {
                    log.debug("Failed to queue task:", th);
                }
            }
        }
        log.info("Buffered rate executor thread stopped");
    }

    private void logTask(String str, AsyncTaskContext<T, V> asyncTaskContext) {
        if (!log.isTraceEnabled()) {
            log.debug("[{}] {} task", asyncTaskContext.getId(), str);
        } else if (!(asyncTaskContext.getTask() instanceof CassandraStatementTask)) {
            log.trace("[{}] {} task: {}", new Object[]{asyncTaskContext.getId(), str, asyncTaskContext});
        } else {
            log.trace("[{}] {} task: {}, BoundStatement query: {}", new Object[]{asyncTaskContext.getId(), str, asyncTaskContext, queryToString(asyncTaskContext)});
        }
    }

    private String queryToString(AsyncTaskContext<T, V> asyncTaskContext) {
        CassandraStatementTask task = asyncTaskContext.getTask();
        if (!(task.getStatement() instanceof BoundStatement)) {
            return "Not Cassandra Statement Task";
        }
        BoundStatement statement = task.getStatement();
        String query = statement.getPreparedStatement().getQuery();
        try {
            query = toStringWithValues(statement, ProtocolVersion.V5);
        } catch (Exception e) {
            log.warn("Can't convert to query with values", e);
        }
        return query;
    }

    private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) {
        CodecRegistry codecRegistry = boundStatement.codecRegistry();
        PreparedStatement preparedStatement = boundStatement.getPreparedStatement();
        String query = preparedStatement.getQuery();
        int i = 0;
        Iterator it = preparedStatement.getVariableDefinitions().iterator();
        while (it.hasNext()) {
            TypeCodec codecFor = codecRegistry.codecFor(((ColumnDefinition) it.next()).getType());
            if (boundStatement.getBytesUnsafe(i) != null) {
                query = query.replaceFirst("\\?", Matcher.quoteReplacement(codecFor.format(codecFor.decode(boundStatement.getBytesUnsafe(i), protocolVersion))));
            }
            i++;
        }
        return query;
    }

    protected int getQueueSize() {
        return this.queue.size();
    }

    public void printStats() {
        int queueSize = getQueueSize();
        int count = (int) this.stats.getRateLimitedTenants().values().stream().filter(defaultCounter -> {
            return defaultCounter.get() > 0;
        }).count();
        if (queueSize > 0 || count > 0 || this.concurrencyLevel.get() > 0 || this.stats.getStatsCounters().stream().anyMatch(statsCounter -> {
            return statsCounter.get() > 0;
        })) {
            StringBuilder sb = new StringBuilder();
            sb.append("queueSize").append(" = [").append(queueSize).append("] ");
            this.stats.getStatsCounters().forEach(statsCounter2 -> {
                sb.append(statsCounter2.getName()).append(" = [").append(statsCounter2.get()).append("] ");
            });
            sb.append("totalRateLimitedTenants").append(" = [").append(count).append("] ");
            sb.append(CONCURRENCY_LEVEL).append(" = [").append(this.concurrencyLevel.get()).append("] ");
            this.stats.getStatsCounters().forEach((v0) -> {
                v0.clear();
            });
            log.info("Permits {}", sb);
        }
        this.stats.getRateLimitedTenants().entrySet().stream().filter(entry -> {
            return ((DefaultCounter) entry.getValue()).get() > 0;
        }).forEach(entry2 -> {
            TenantId tenantId = (TenantId) entry2.getKey();
            DefaultCounter defaultCounter2 = (DefaultCounter) entry2.getValue();
            int i = defaultCounter2.get();
            defaultCounter2.clear();
            if (!this.printTenantNames) {
                log.info("[{}] Rate limited requests: {}", tenantId, Integer.valueOf(i));
            } else {
                log.info("[{}][{}] Rate limited requests: {}", new Object[]{tenantId, this.tenantNamesCache.computeIfAbsent(tenantId, tenantId2 -> {
                    try {
                        return (String) this.entityService.fetchEntityName(TenantId.SYS_TENANT_ID, tenantId).orElse("N/A");
                    } catch (Exception e) {
                        log.error("[{}] Failed to get tenant name", tenantId, e);
                        return "N/A";
                    }
                }), Integer.valueOf(i)});
            }
        });
    }
}
