/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.server.common.transport.service;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.protobuf.GeneratedMessageV3;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgDataType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.msg.tools.TbRateLimits;
import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.stats.MessagesStats;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.stats.StatsType;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.service.RpcRequestMetadata;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueMsg;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;

@Service
@ConditionalOnExpression(value="('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'")
public class DefaultTransportService
implements TransportService {
    private static final Logger log = LoggerFactory.getLogger(DefaultTransportService.class);
    @Value(value="${transport.rate_limits.enabled}")
    private boolean rateLimitEnabled;
    @Value(value="${transport.rate_limits.tenant}")
    private String perTenantLimitsConf;
    @Value(value="${transport.rate_limits.device}")
    private String perDevicesLimitsConf;
    @Value(value="${transport.sessions.inactivity_timeout}")
    private long sessionInactivityTimeout;
    @Value(value="${transport.sessions.report_timeout}")
    private long sessionReportTimeout;
    @Value(value="${transport.client_side_rpc.timeout:60000}")
    private long clientSideRpcTimeout;
    @Value(value="${queue.transport.poll_interval}")
    private int notificationsPollDuration;
    private final Gson gson = new Gson();
    private final TbTransportQueueFactory queueProvider;
    private final TbQueueProducerProvider producerProvider;
    private final PartitionService partitionService;
    private final TbServiceInfoProvider serviceInfoProvider;
    private final StatsFactory statsFactory;
    protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> transportApiRequestTemplate;
    protected TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> ruleEngineMsgProducer;
    protected TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbCoreMsgProducer;
    protected TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> transportNotificationsConsumer;
    protected MessagesStats ruleEngineProducerStats;
    protected MessagesStats tbCoreProducerStats;
    protected MessagesStats transportApiStats;
    protected ScheduledExecutorService schedulerExecutor;
    protected ExecutorService transportCallbackExecutor;
    private final ConcurrentMap<UUID, SessionMetaData> sessions = new ConcurrentHashMap<UUID, SessionMetaData>();
    private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<String, RpcRequestMetadata>();
    private final ConcurrentMap<TenantId, TbRateLimits> perTenantLimits = new ConcurrentHashMap<TenantId, TbRateLimits>();
    private final ConcurrentMap<DeviceId, TbRateLimits> perDeviceLimits = new ConcurrentHashMap<DeviceId, TbRateLimits>();
    private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"transport-consumer"));
    private volatile boolean stopped = false;

    public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) {
        this.serviceInfoProvider = serviceInfoProvider;
        this.queueProvider = queueProvider;
        this.producerProvider = producerProvider;
        this.partitionService = partitionService;
        this.statsFactory = statsFactory;
    }

    @PostConstruct
    public void init() {
        if (this.rateLimitEnabled) {
            new TbRateLimits(this.perTenantLimitsConf);
            new TbRateLimits(this.perDevicesLimitsConf);
        }
        this.ruleEngineProducerStats = this.statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
        this.tbCoreProducerStats = this.statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
        this.transportApiStats = this.statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
        this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)ThingsBoardThreadFactory.forName((String)"transport-scheduler"));
        this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
        this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int)this.sessionReportTimeout), this.sessionReportTimeout, TimeUnit.MILLISECONDS);
        this.transportApiRequestTemplate = this.queueProvider.createTransportApiRequestTemplate();
        this.transportApiRequestTemplate.setMessagesStats(this.transportApiStats);
        this.ruleEngineMsgProducer = this.producerProvider.getRuleEngineMsgProducer();
        this.tbCoreMsgProducer = this.producerProvider.getTbCoreMsgProducer();
        this.transportNotificationsConsumer = this.queueProvider.createTransportNotificationsConsumer();
        TopicPartitionInfo tpi = this.partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, this.serviceInfoProvider.getServiceId());
        this.transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
        this.transportApiRequestTemplate.init();
        this.mainConsumerExecutor.execute(() -> {
            while (!this.stopped) {
                try {
                    List records = this.transportNotificationsConsumer.poll((long)this.notificationsPollDuration);
                    if (records.size() == 0) continue;
                    records.forEach(record -> {
                        try {
                            this.processToTransportMsg((TransportProtos.ToTransportMsg)record.getValue());
                        }
                        catch (Throwable e) {
                            log.warn("Failed to process the notification.", e);
                        }
                    });
                    this.transportNotificationsConsumer.commit();
                }
                catch (Exception e) {
                    if (this.stopped) continue;
                    log.warn("Failed to obtain messages from queue.", (Throwable)e);
                    try {
                        Thread.sleep(this.notificationsPollDuration);
                    }
                    catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", (Throwable)e2);
                    }
                }
            }
        });
    }

    @PreDestroy
    public void destroy() {
        if (this.rateLimitEnabled) {
            this.perTenantLimits.clear();
            this.perDeviceLimits.clear();
        }
        this.stopped = true;
        if (this.transportNotificationsConsumer != null) {
            this.transportNotificationsConsumer.unsubscribe();
        }
        if (this.schedulerExecutor != null) {
            this.schedulerExecutor.shutdownNow();
        }
        if (this.transportCallbackExecutor != null) {
            this.transportCallbackExecutor.shutdownNow();
        }
        if (this.mainConsumerExecutor != null) {
            this.mainConsumerExecutor.shutdownNow();
        }
        if (this.transportApiRequestTemplate != null) {
            this.transportApiRequestTemplate.stop();
        }
    }

    @Override
    public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) {
        this.sessions.putIfAbsent(this.toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
    }

    @Override
    public TransportProtos.GetTenantRoutingInfoResponseMsg getRoutingInfo(TransportProtos.GetTenantRoutingInfoRequestMsg msg) {
        TbProtoQueueMsg protoMsg = new TbProtoQueueMsg(UUID.randomUUID(), (GeneratedMessageV3)TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build());
        try {
            TbProtoQueueMsg response = (TbProtoQueueMsg)this.transportApiRequestTemplate.send((TbQueueMsg)protoMsg).get();
            return ((TransportProtos.TransportApiResponseMsg)response.getValue()).getGetTenantRoutingInfoResponseMsg();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        TbProtoQueueMsg protoMsg = new TbProtoQueueMsg(UUID.randomUUID(), (GeneratedMessageV3)TransportProtos.TransportApiRequestMsg.newBuilder().setValidateTokenRequestMsg(msg).build());
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiRequestTemplate.send((TbQueueMsg)protoMsg), response -> callback.onSuccess(((TransportProtos.TransportApiResponseMsg)response.getValue()).getValidateTokenResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        TbProtoQueueMsg protoMsg = new TbProtoQueueMsg(UUID.randomUUID(), (GeneratedMessageV3)TransportProtos.TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build());
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiRequestTemplate.send((TbQueueMsg)protoMsg), response -> callback.onSuccess(((TransportProtos.TransportApiResponseMsg)response.getValue()).getValidateTokenResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback) {
        log.trace("Processing msg: {}", (Object)msg);
        TbProtoQueueMsg protoMsg = new TbProtoQueueMsg(UUID.randomUUID(), (GeneratedMessageV3)TransportProtos.TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(msg).build());
        AsyncCallbackTemplate.withCallback((ListenableFuture)this.transportApiRequestTemplate.send((TbQueueMsg)protoMsg), response -> callback.onSuccess(((TransportProtos.TransportApiResponseMsg)response.getValue()).getGetOrCreateDeviceResponseMsg()), callback::onError, (Executor)this.transportCallbackExecutor);
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback) {
        if (log.isTraceEnabled()) {
            log.trace("[{}] Processing msg: {}", (Object)this.toSessionId(sessionInfo), (Object)msg);
        }
        this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscriptionInfo(msg).build(), callback);
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSessionEvent(msg).build(), callback);
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
            DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
            MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback);
            for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
                TbMsgMetaData metaData = new TbMsgMetaData();
                metaData.putValue("deviceName", sessionInfo.getDeviceName());
                metaData.putValue("deviceType", sessionInfo.getDeviceType());
                metaData.putValue("ts", tsKv.getTs() + "");
                JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
                TbMsg tbMsg = TbMsg.newMsg((String)SessionMsgType.POST_TELEMETRY_REQUEST.name(), (EntityId)deviceId, (TbMsgMetaData)metaData, (String)this.gson.toJson((JsonElement)json));
                this.sendToRuleEngine(tenantId, tbMsg, packCallback);
            }
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
            DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
            JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
            TbMsgMetaData metaData = new TbMsgMetaData();
            metaData.putValue("deviceName", sessionInfo.getDeviceName());
            metaData.putValue("deviceType", sessionInfo.getDeviceType());
            TbMsg tbMsg = TbMsg.newMsg((String)SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), (EntityId)deviceId, (TbMsgMetaData)metaData, (String)this.gson.toJson((JsonElement)json));
            this.sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setGetAttributes(msg).build(), callback);
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            SessionMetaData sessionMetaData = this.reportActivityInternal(sessionInfo);
            sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), callback);
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            SessionMetaData sessionMetaData = this.reportActivityInternal(sessionInfo);
            sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), callback);
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), callback);
        }
    }

    private void processTimeout(String requestId) {
        RpcRequestMetadata data = this.toServerRpcPendingMap.remove(requestId);
        if (data != null) {
            SessionMetaData md = (SessionMetaData)this.sessions.get(data.getSessionId());
            if (md != null) {
                SessionMsgListener listener = md.getListener();
                this.transportCallbackExecutor.submit(() -> {
                    TransportProtos.ToServerRpcResponseMsg responseMsg = TransportProtos.ToServerRpcResponseMsg.newBuilder().setRequestId(data.getRequestId()).setError("timeout").build();
                    listener.onToServerRpcResponse(responseMsg);
                });
                if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
                    this.deregisterSession(md.getSessionInfo());
                }
            } else {
                log.debug("[{}] Missing session.", (Object)data.getSessionId());
            }
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            UUID sessionId = this.toSessionId(sessionInfo);
            TenantId tenantId = this.getTenantId(sessionInfo);
            DeviceId deviceId = this.getDeviceId(sessionInfo);
            JsonObject json = new JsonObject();
            json.addProperty("method", msg.getMethodName());
            json.add("params", JsonUtils.parse(msg.getParams()));
            TbMsgMetaData metaData = new TbMsgMetaData();
            metaData.putValue("deviceName", sessionInfo.getDeviceName());
            metaData.putValue("deviceType", sessionInfo.getDeviceType());
            metaData.putValue("requestId", Integer.toString(msg.getRequestId()));
            metaData.putValue("serviceId", this.serviceInfoProvider.getServiceId());
            metaData.putValue("sessionId", sessionId.toString());
            TbMsg tbMsg = TbMsg.newMsg((String)SessionMsgType.TO_SERVER_RPC_REQUEST.name(), (EntityId)deviceId, (TbMsgMetaData)metaData, (TbMsgDataType)TbMsgDataType.JSON, (String)this.gson.toJson((JsonElement)json));
            this.sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback));
            String requestId = sessionId + "-" + msg.getRequestId();
            this.toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId()));
            this.schedulerExecutor.schedule(() -> this.processTimeout(requestId), this.clientSideRpcTimeout, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
        if (this.checkLimits(sessionInfo, msg, callback)) {
            this.reportActivityInternal(sessionInfo);
            this.sendToDeviceActor(sessionInfo, TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setClaimDevice(msg).build(), callback);
        }
    }

    @Override
    public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) {
        this.reportActivityInternal(sessionInfo);
    }

    private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
        UUID sessionId = this.toSessionId(sessionInfo);
        SessionMetaData sessionMetaData = (SessionMetaData)this.sessions.get(sessionId);
        if (sessionMetaData != null) {
            sessionMetaData.updateLastActivityTime();
        }
        return sessionMetaData;
    }

    private void checkInactivityAndReportActivity() {
        long expTime = System.currentTimeMillis() - this.sessionInactivityTimeout;
        this.sessions.forEach((uuid, sessionMD) -> {
            SessionMetaData gwMetaData;
            long lastActivityTime = sessionMD.getLastActivityTime();
            TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo();
            if (sessionInfo.getGwSessionIdMSB() > 0L && sessionInfo.getGwSessionIdLSB() > 0L && (gwMetaData = (SessionMetaData)this.sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB()))) != null) {
                lastActivityTime = Math.max(gwMetaData.getLastActivityTime(), lastActivityTime);
            }
            if (lastActivityTime < expTime) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Session has expired due to last activity time: {}", (Object)this.toSessionId(sessionInfo), (Object)lastActivityTime);
                }
                this.process(sessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
                this.sessions.remove(uuid);
                sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
            } else if (lastActivityTime > sessionMD.getLastReportedActivityTime()) {
                final long lastActivityTimeFinal = lastActivityTime;
                this.process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder().setAttributeSubscription(sessionMD.isSubscribedToAttributes()).setRpcSubscription(sessionMD.isSubscribedToRPC()).setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback<Void>(){

                    @Override
                    public void onSuccess(Void msg) {
                        sessionMD.setLastReportedActivityTime(lastActivityTimeFinal);
                    }

                    @Override
                    public void onError(Throwable e) {
                        log.warn("[{}] Failed to report last activity time", (Object)uuid, (Object)e);
                    }
                });
            }
        });
    }

    @Override
    public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
        SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
        this.sessions.putIfAbsent(this.toSessionId(sessionInfo), currentSession);
        ScheduledFuture<?> executorFuture = this.schedulerExecutor.schedule(() -> {
            listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
            this.deregisterSession(sessionInfo);
        }, timeout, TimeUnit.MILLISECONDS);
        currentSession.setScheduledFuture(executorFuture);
    }

    @Override
    public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
        SessionMetaData currentSession = (SessionMetaData)this.sessions.get(this.toSessionId(sessionInfo));
        if (currentSession != null && currentSession.hasScheduledFuture()) {
            log.debug("Stopping scheduler to avoid resending response if request has been ack.");
            currentSession.getScheduledFuture().cancel(false);
        }
        this.sessions.remove(this.toSessionId(sessionInfo));
    }

    @Override
    public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback) {
        if (log.isTraceEnabled()) {
            log.trace("[{}] Processing msg: {}", (Object)this.toSessionId(sessionInfo), msg);
        }
        if (!this.rateLimitEnabled) {
            return true;
        }
        TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
        TbRateLimits rateLimits = this.perTenantLimits.computeIfAbsent(tenantId, id -> new TbRateLimits(this.perTenantLimitsConf));
        if (!rateLimits.tryConsume()) {
            if (callback != null) {
                callback.onError((Throwable)new TbRateLimitsException(EntityType.TENANT));
            }
            if (log.isTraceEnabled()) {
                log.trace("[{}][{}] Tenant level rate limit detected: {}", new Object[]{this.toSessionId(sessionInfo), tenantId, msg});
            }
            return false;
        }
        DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
        rateLimits = this.perDeviceLimits.computeIfAbsent(deviceId, id -> new TbRateLimits(this.perDevicesLimitsConf));
        if (!rateLimits.tryConsume()) {
            if (callback != null) {
                callback.onError((Throwable)new TbRateLimitsException(EntityType.DEVICE));
            }
            if (log.isTraceEnabled()) {
                log.trace("[{}][{}] Device level rate limit detected: {}", new Object[]{this.toSessionId(sessionInfo), deviceId, msg});
            }
            return false;
        }
        return true;
    }

    protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) {
        UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
        SessionMetaData md = (SessionMetaData)this.sessions.get(sessionId);
        if (md != null) {
            SessionMsgListener listener = md.getListener();
            this.transportCallbackExecutor.submit(() -> {
                if (toSessionMsg.hasGetAttributesResponse()) {
                    listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
                }
                if (toSessionMsg.hasAttributeUpdateNotification()) {
                    listener.onAttributeUpdate(toSessionMsg.getAttributeUpdateNotification());
                }
                if (toSessionMsg.hasSessionCloseNotification()) {
                    listener.onRemoteSessionCloseCommand(toSessionMsg.getSessionCloseNotification());
                }
                if (toSessionMsg.hasToDeviceRequest()) {
                    listener.onToDeviceRpcRequest(toSessionMsg.getToDeviceRequest());
                }
                if (toSessionMsg.hasToServerResponse()) {
                    String requestId = sessionId + "-" + toSessionMsg.getToServerResponse().getRequestId();
                    this.toServerRpcPendingMap.remove(requestId);
                    listener.onToServerRpcResponse(toSessionMsg.getToServerResponse());
                }
            });
            if (md.getSessionType() == TransportProtos.SessionType.SYNC) {
                this.deregisterSession(md.getSessionInfo());
            }
        } else {
            log.debug("[{}] Missing session.", (Object)sessionId);
        }
    }

    protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) {
        return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
    }

    protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) {
        return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB());
    }

    protected TenantId getTenantId(TransportProtos.SessionInfoProto sessionInfo) {
        return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB()));
    }

    protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) {
        return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
    }

    public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) {
        return TransportProtos.SessionEventMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).setEvent(event).build();
    }

    protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
        TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_CORE, this.getTenantId(sessionInfo), (EntityId)this.getDeviceId(sessionInfo));
        if (log.isTraceEnabled()) {
            log.trace("[{}][{}] Pushing to topic {} message {}", new Object[]{this.getTenantId(sessionInfo), this.getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg});
        }
        TransportTbQueueCallback transportTbQueueCallback = callback != null ? new TransportTbQueueCallback(callback) : null;
        this.tbCoreProducerStats.incrementTotal();
        StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, this.tbCoreProducerStats);
        this.tbCoreMsgProducer.send(tpi, (TbQueueMsg)new TbProtoQueueMsg(this.getRoutingKey(sessionInfo), (GeneratedMessageV3)TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), (TbQueueCallback)wrappedCallback);
    }

    protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
        TopicPartitionInfo tpi = this.partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tbMsg.getOriginator());
        if (log.isTraceEnabled()) {
            log.trace("[{}][{}] Pushing to topic {} message {}", new Object[]{tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg});
        }
        TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString((TbMsg)tbMsg)).setTenantIdMSB(tenantId.getId().getMostSignificantBits()).setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build();
        this.ruleEngineProducerStats.incrementTotal();
        StatsCallback wrappedCallback = new StatsCallback(callback, this.ruleEngineProducerStats);
        this.ruleEngineMsgProducer.send(tpi, (TbQueueMsg)new TbProtoQueueMsg(tbMsg.getId(), (GeneratedMessageV3)msg), (TbQueueCallback)wrappedCallback);
    }

    private class MsgPackCallback
    implements TbQueueCallback {
        private final AtomicInteger msgCount;
        private final TransportServiceCallback<Void> callback;

        public MsgPackCallback(Integer msgCount, TransportServiceCallback<Void> callback) {
            this.msgCount = new AtomicInteger(msgCount);
            this.callback = callback;
        }

        public void onSuccess(TbQueueMsgMetadata metadata) {
            if (this.msgCount.decrementAndGet() <= 0) {
                DefaultTransportService.this.transportCallbackExecutor.submit(() -> this.callback.onSuccess(null));
            }
        }

        public void onFailure(Throwable t) {
            this.callback.onError(t);
        }
    }

    private class StatsCallback
    implements TbQueueCallback {
        private final TbQueueCallback callback;
        private final MessagesStats stats;

        private StatsCallback(TbQueueCallback callback, MessagesStats stats) {
            this.callback = callback;
            this.stats = stats;
        }

        public void onSuccess(TbQueueMsgMetadata metadata) {
            this.stats.incrementSuccessful();
            if (this.callback != null) {
                this.callback.onSuccess(metadata);
            }
        }

        public void onFailure(Throwable t) {
            this.stats.incrementFailed();
            if (this.callback != null) {
                this.callback.onFailure(t);
            }
        }
    }

    private class TransportTbQueueCallback
    implements TbQueueCallback {
        private final TransportServiceCallback<Void> callback;

        private TransportTbQueueCallback(TransportServiceCallback<Void> callback) {
            this.callback = callback;
        }

        public void onSuccess(TbQueueMsgMetadata metadata) {
            DefaultTransportService.this.transportCallbackExecutor.submit(() -> this.callback.onSuccess(null));
        }

        public void onFailure(Throwable t) {
            DefaultTransportService.this.transportCallbackExecutor.submit(() -> this.callback.onError(t));
        }
    }
}

