package org.thingsboard.mqtt;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/thingsboard/mqtt/MqttClientImpl.class */
public final class MqttClientImpl implements MqttClient {
    private final Set<String> serverSubscriptions;
    private final IntObjectHashMap<MqttPendingUnsubscription> pendingServerUnsubscribes;
    private final IntObjectHashMap<MqttIncomingQos2Publish> qos2PendingIncomingPublishes;
    private final IntObjectHashMap<MqttPendingPublish> pendingPublishes;
    private final HashMultimap<String, MqttSubscription> subscriptions;
    private final IntObjectHashMap<MqttPendingSubscription> pendingSubscriptions;
    private final Set<String> pendingSubscribeTopics;
    private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscribtion;
    private final AtomicInteger nextMessageId;
    private final MqttClientConfig clientConfig;
    private final MqttHandler defaultHandler;
    private EventLoopGroup eventLoop;
    private volatile Channel channel;
    private volatile boolean disconnected;
    private volatile boolean reconnect;
    private String host;
    private int port;
    private MqttClientCallback callback;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/thingsboard/mqtt/MqttClientImpl$MqttChannelInitializer.class */
    public class MqttChannelInitializer extends ChannelInitializer<SocketChannel> {
        private final Promise<MqttConnectResult> connectFuture;
        private final String host;
        private final int port;
        private final SslContext sslContext;

        public MqttChannelInitializer(Promise<MqttConnectResult> promise, String str, int i, SslContext sslContext) {
            this.connectFuture = promise;
            this.host = str;
            this.port = i;
            this.sslContext = sslContext;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void initChannel(SocketChannel socketChannel) throws Exception {
            if (this.sslContext != null) {
                socketChannel.pipeline().addLast(new ChannelHandler[]{this.sslContext.newHandler(socketChannel.alloc(), this.host, this.port)});
            }
            socketChannel.pipeline().addLast("mqttDecoder", new MqttDecoder(MqttClientImpl.this.clientConfig.getMaxBytesInMessage()));
            socketChannel.pipeline().addLast("mqttEncoder", MqttEncoder.INSTANCE);
            socketChannel.pipeline().addLast("idleStateHandler", new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
            socketChannel.pipeline().addLast("mqttPingHandler", new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));
            socketChannel.pipeline().addLast("mqttHandler", new MqttChannelHandler(MqttClientImpl.this, this.connectFuture));
        }
    }

    public MqttClientImpl(MqttHandler mqttHandler) {
        this.serverSubscriptions = new HashSet();
        this.pendingServerUnsubscribes = new IntObjectHashMap<>();
        this.qos2PendingIncomingPublishes = new IntObjectHashMap<>();
        this.pendingPublishes = new IntObjectHashMap<>();
        this.subscriptions = HashMultimap.create();
        this.pendingSubscriptions = new IntObjectHashMap<>();
        this.pendingSubscribeTopics = new HashSet();
        this.handlerToSubscribtion = HashMultimap.create();
        this.nextMessageId = new AtomicInteger(1);
        this.disconnected = false;
        this.reconnect = false;
        this.clientConfig = new MqttClientConfig();
        this.defaultHandler = mqttHandler;
    }

    public MqttClientImpl(MqttClientConfig mqttClientConfig, MqttHandler mqttHandler) {
        this.serverSubscriptions = new HashSet();
        this.pendingServerUnsubscribes = new IntObjectHashMap<>();
        this.qos2PendingIncomingPublishes = new IntObjectHashMap<>();
        this.pendingPublishes = new IntObjectHashMap<>();
        this.subscriptions = HashMultimap.create();
        this.pendingSubscriptions = new IntObjectHashMap<>();
        this.pendingSubscribeTopics = new HashSet();
        this.handlerToSubscribtion = HashMultimap.create();
        this.nextMessageId = new AtomicInteger(1);
        this.disconnected = false;
        this.reconnect = false;
        this.clientConfig = mqttClientConfig;
        this.defaultHandler = mqttHandler;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<MqttConnectResult> connect(String str) {
        return connect(str, 1883);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<MqttConnectResult> connect(String str, int i) {
        return connect(str, i, false);
    }

    private Future<MqttConnectResult> connect(String str, int i, boolean z) {
        if (this.eventLoop == null) {
            this.eventLoop = new NioEventLoopGroup();
        }
        this.host = str;
        this.port = i;
        DefaultPromise defaultPromise = new DefaultPromise(this.eventLoop.next());
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.eventLoop);
        bootstrap.channel(this.clientConfig.getChannelClass());
        bootstrap.remoteAddress(str, i);
        bootstrap.handler(new MqttChannelInitializer(defaultPromise, str, i, this.clientConfig.getSslContext()));
        bootstrap.connect().addListener(channelFuture -> {
            if (!channelFuture.isSuccess()) {
                scheduleConnectIfRequired(str, i, z);
            } else {
                this.channel = channelFuture.channel();
                this.channel.closeFuture().addListener(channelFuture -> {
                    if (isConnected()) {
                        return;
                    }
                    ChannelClosedException channelClosedException = new ChannelClosedException("Channel is closed!");
                    if (this.callback != null) {
                        this.callback.connectionLost(channelClosedException);
                    }
                    this.pendingSubscriptions.clear();
                    this.serverSubscriptions.clear();
                    this.subscriptions.clear();
                    this.pendingServerUnsubscribes.clear();
                    this.qos2PendingIncomingPublishes.clear();
                    this.pendingPublishes.clear();
                    this.pendingSubscribeTopics.clear();
                    this.handlerToSubscribtion.clear();
                    scheduleConnectIfRequired(str, i, true);
                });
            }
        });
        return defaultPromise;
    }

    private void scheduleConnectIfRequired(String str, int i, boolean z) {
        if (!this.clientConfig.isReconnect() || this.disconnected) {
            return;
        }
        if (z) {
            this.reconnect = true;
        }
        this.eventLoop.schedule(() -> {
            connect(str, i, z);
        }, this.clientConfig.getReconnectDelay(), TimeUnit.SECONDS);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public boolean isConnected() {
        return (this.disconnected || this.channel == null || !this.channel.isActive()) ? false : true;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<MqttConnectResult> reconnect() {
        if (this.host == null) {
            throw new IllegalStateException("Cannot reconnect. Call connect() first");
        }
        return connect(this.host, this.port);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public EventLoopGroup getEventLoop() {
        return this.eventLoop;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public void setEventLoop(EventLoopGroup eventLoopGroup) {
        this.eventLoop = eventLoopGroup;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> on(String str, MqttHandler mqttHandler) {
        return on(str, mqttHandler, MqttQoS.AT_MOST_ONCE);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> on(String str, MqttHandler mqttHandler, MqttQoS mqttQoS) {
        return createSubscription(str, mqttHandler, false, mqttQoS);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> once(String str, MqttHandler mqttHandler) {
        return once(str, mqttHandler, MqttQoS.AT_MOST_ONCE);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> once(String str, MqttHandler mqttHandler, MqttQoS mqttQoS) {
        return createSubscription(str, mqttHandler, true, mqttQoS);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> off(String str, MqttHandler mqttHandler) {
        DefaultPromise defaultPromise = new DefaultPromise(this.eventLoop.next());
        Iterator it = this.handlerToSubscribtion.get(mqttHandler).iterator();
        while (it.hasNext()) {
            this.subscriptions.remove(str, (MqttSubscription) it.next());
        }
        this.handlerToSubscribtion.removeAll(mqttHandler);
        checkSubscribtions(str, defaultPromise);
        return defaultPromise;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> off(String str) {
        DefaultPromise defaultPromise = new DefaultPromise(this.eventLoop.next());
        UnmodifiableIterator it = ImmutableSet.copyOf(this.subscriptions.get(str)).iterator();
        while (it.hasNext()) {
            MqttSubscription mqttSubscription = (MqttSubscription) it.next();
            Iterator it2 = this.handlerToSubscribtion.get(mqttSubscription.getHandler()).iterator();
            while (it2.hasNext()) {
                this.subscriptions.remove(str, (MqttSubscription) it2.next());
            }
            this.handlerToSubscribtion.remove(mqttSubscription.getHandler(), mqttSubscription);
        }
        checkSubscribtions(str, defaultPromise);
        return defaultPromise;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> publish(String str, ByteBuf byteBuf) {
        return publish(str, byteBuf, MqttQoS.AT_MOST_ONCE, false);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> publish(String str, ByteBuf byteBuf, MqttQoS mqttQoS) {
        return publish(str, byteBuf, mqttQoS, false);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> publish(String str, ByteBuf byteBuf, boolean z) {
        return publish(str, byteBuf, MqttQoS.AT_MOST_ONCE, z);
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public Future<Void> publish(String str, ByteBuf byteBuf, MqttQoS mqttQoS, boolean z) {
        DefaultPromise defaultPromise = new DefaultPromise(this.eventLoop.next());
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, mqttQoS, z, 0);
        MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(str, getNewMessageId().messageId());
        MqttPublishMessage mqttPublishMessage = new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, byteBuf);
        MqttPendingPublish mqttPendingPublish = new MqttPendingPublish(mqttPublishVariableHeader.packetId(), defaultPromise, byteBuf.retain(), mqttPublishMessage, mqttQoS);
        ChannelFuture sendAndFlushPacket = sendAndFlushPacket(mqttPublishMessage);
        if (sendAndFlushPacket != null) {
            mqttPendingPublish.setSent(true);
            if (sendAndFlushPacket.cause() != null) {
                defaultPromise.setFailure(sendAndFlushPacket.cause());
                return defaultPromise;
            }
        }
        if (mqttPendingPublish.isSent() && mqttPendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
            mqttPendingPublish.getFuture().setSuccess((Object) null);
        } else if (mqttPendingPublish.isSent()) {
            this.pendingPublishes.put(mqttPendingPublish.getMessageId(), mqttPendingPublish);
            mqttPendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
        }
        return defaultPromise;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public MqttClientConfig getClientConfig() {
        return this.clientConfig;
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public void disconnect() {
        this.disconnected = true;
        if (this.channel != null) {
            sendAndFlushPacket(new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0))).addListener(future -> {
                this.channel.close();
            });
        }
    }

    @Override // org.thingsboard.mqtt.MqttClient
    public void setCallback(MqttClientCallback mqttClientCallback) {
        this.callback = mqttClientCallback;
    }

    public boolean isReconnect() {
        return this.reconnect;
    }

    public void onSuccessfulReconnect() {
        if (this.callback != null) {
            this.callback.onSuccessfulReconnect();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelFuture sendAndFlushPacket(Object obj) {
        if (this.channel == null) {
            return null;
        }
        return this.channel.isActive() ? this.channel.writeAndFlush(obj) : this.channel.newFailedFuture(new ChannelClosedException("Channel is closed!"));
    }

    private MqttMessageIdVariableHeader getNewMessageId() {
        this.nextMessageId.compareAndSet(65535, 1);
        return MqttMessageIdVariableHeader.from(this.nextMessageId.getAndIncrement());
    }

    private Future<Void> createSubscription(String str, MqttHandler mqttHandler, boolean z, MqttQoS mqttQoS) {
        if (this.pendingSubscribeTopics.contains(str)) {
            Optional findAny = this.pendingSubscriptions.entrySet().stream().filter(entry -> {
                return ((MqttPendingSubscription) entry.getValue()).getTopic().equals(str);
            }).findAny();
            if (findAny.isPresent()) {
                ((MqttPendingSubscription) ((Map.Entry) findAny.get()).getValue()).addHandler(mqttHandler, z);
                return ((MqttPendingSubscription) ((Map.Entry) findAny.get()).getValue()).getFuture();
            }
        }
        if (this.serverSubscriptions.contains(str)) {
            MqttSubscription mqttSubscription = new MqttSubscription(str, mqttHandler, z);
            this.subscriptions.put(str, mqttSubscription);
            this.handlerToSubscribtion.put(mqttHandler, mqttSubscription);
            return this.channel.newSucceededFuture();
        }
        DefaultPromise defaultPromise = new DefaultPromise(this.eventLoop.next());
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttTopicSubscription mqttTopicSubscription = new MqttTopicSubscription(str, mqttQoS);
        MqttMessageIdVariableHeader newMessageId = getNewMessageId();
        MqttSubscribeMessage mqttSubscribeMessage = new MqttSubscribeMessage(mqttFixedHeader, newMessageId, new MqttSubscribePayload(Collections.singletonList(mqttTopicSubscription)));
        MqttPendingSubscription mqttPendingSubscription = new MqttPendingSubscription(defaultPromise, str, mqttSubscribeMessage);
        mqttPendingSubscription.addHandler(mqttHandler, z);
        this.pendingSubscriptions.put(newMessageId.messageId(), mqttPendingSubscription);
        this.pendingSubscribeTopics.add(str);
        mqttPendingSubscription.setSent(sendAndFlushPacket(mqttSubscribeMessage) != null);
        mqttPendingSubscription.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
        return defaultPromise;
    }

    private void checkSubscribtions(String str, Promise<Void> promise) {
        if ((this.subscriptions.containsKey(str) && this.subscriptions.get(str).size() != 0) || !this.serverSubscriptions.contains(str)) {
            promise.setSuccess((Object) null);
            return;
        }
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader newMessageId = getNewMessageId();
        MqttUnsubscribeMessage mqttUnsubscribeMessage = new MqttUnsubscribeMessage(mqttFixedHeader, newMessageId, new MqttUnsubscribePayload(Collections.singletonList(str)));
        MqttPendingUnsubscription mqttPendingUnsubscription = new MqttPendingUnsubscription(promise, str, mqttUnsubscribeMessage);
        this.pendingServerUnsubscribes.put(newMessageId.messageId(), mqttPendingUnsubscription);
        mqttPendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
        sendAndFlushPacket(mqttUnsubscribeMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IntObjectHashMap<MqttPendingSubscription> getPendingSubscriptions() {
        return this.pendingSubscriptions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HashMultimap<String, MqttSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> getPendingSubscribeTopics() {
        return this.pendingSubscribeTopics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscribtion() {
        return this.handlerToSubscribtion;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> getServerSubscriptions() {
        return this.serverSubscriptions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IntObjectHashMap<MqttPendingUnsubscription> getPendingServerUnsubscribes() {
        return this.pendingServerUnsubscribes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IntObjectHashMap<MqttPendingPublish> getPendingPublishes() {
        return this.pendingPublishes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IntObjectHashMap<MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() {
        return this.qos2PendingIncomingPublishes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttHandler getDefaultHandler() {
        return this.defaultHandler;
    }
}
