/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.mqtt;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.mqtt.ChannelClosedException;
import org.thingsboard.mqtt.MaxRetransmissionsReachedException;
import org.thingsboard.mqtt.MqttChannelHandler;
import org.thingsboard.mqtt.MqttClient;
import org.thingsboard.mqtt.MqttClientCallback;
import org.thingsboard.mqtt.MqttClientConfig;
import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.mqtt.MqttHandler;
import org.thingsboard.mqtt.MqttIncomingQos2Publish;
import org.thingsboard.mqtt.MqttPendingPublish;
import org.thingsboard.mqtt.MqttPendingSubscription;
import org.thingsboard.mqtt.MqttPendingUnsubscription;
import org.thingsboard.mqtt.MqttPingHandler;
import org.thingsboard.mqtt.MqttSubscription;
import org.thingsboard.mqtt.PendingOperation;
import org.thingsboard.mqtt.ReconnectStrategy;
import org.thingsboard.mqtt.ReconnectStrategyExponential;

final class MqttClientImpl
implements MqttClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);
    private final Set<String> serverSubscriptions = new HashSet<String>();
    private final ConcurrentMap<Integer, MqttPendingUnsubscription> pendingServerUnsubscribes = new ConcurrentHashMap<Integer, MqttPendingUnsubscription>();
    private final ConcurrentMap<Integer, MqttIncomingQos2Publish> qos2PendingIncomingPublishes = new ConcurrentHashMap<Integer, MqttIncomingQos2Publish>();
    private final ConcurrentMap<Integer, MqttPendingPublish> pendingPublishes = new ConcurrentHashMap<Integer, MqttPendingPublish>();
    private final HashMultimap<String, MqttSubscription> subscriptions = HashMultimap.create();
    private final ConcurrentMap<Integer, MqttPendingSubscription> pendingSubscriptions = new ConcurrentHashMap<Integer, MqttPendingSubscription>();
    private final Set<String> pendingSubscribeTopics = new HashSet<String>();
    private final HashMultimap<MqttHandler, MqttSubscription> handlerToSubscription = HashMultimap.create();
    private final AtomicInteger nextMessageId = new AtomicInteger(1);
    private final MqttClientConfig clientConfig;
    private final MqttHandler defaultHandler;
    private final ReconnectStrategy reconnectStrategy;
    private EventLoopGroup eventLoop;
    private volatile Channel channel;
    private volatile boolean disconnected = false;
    private volatile boolean reconnect = false;
    private String host;
    private int port;
    private MqttClientCallback callback;
    private final ListeningExecutor handlerExecutor;
    private static final int DISCONNECT_FALLBACK_DELAY_SECS = 1;

    public MqttClientImpl(MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
        this(new MqttClientConfig(), defaultHandler, handlerExecutor);
    }

    public MqttClientImpl(MqttClientConfig clientConfig, MqttHandler defaultHandler, ListeningExecutor handlerExecutor) {
        this.clientConfig = clientConfig;
        this.defaultHandler = defaultHandler;
        this.handlerExecutor = handlerExecutor;
        this.reconnectStrategy = new ReconnectStrategyExponential(this.getClientConfig().getReconnectDelay());
    }

    @Override
    public Promise<MqttConnectResult> connect(String host) {
        return this.connect(host, 1883);
    }

    @Override
    public Promise<MqttConnectResult> connect(String host, int port) {
        return this.connect(host, port, false);
    }

    private Promise<MqttConnectResult> connect(String host, int port, boolean reconnect) {
        log.trace("[{}] Connecting to server, isReconnect - {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)reconnect);
        if (this.eventLoop == null) {
            this.eventLoop = new NioEventLoopGroup();
        }
        this.host = host;
        this.port = port;
        DefaultPromise connectFuture = new DefaultPromise((EventExecutor)this.eventLoop.next());
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.eventLoop);
        bootstrap.channel(this.clientConfig.getChannelClass());
        bootstrap.remoteAddress(host, port);
        bootstrap.handler((ChannelHandler)new MqttChannelInitializer((Promise<MqttConnectResult>)connectFuture, host, port, this.clientConfig.getSslContext()));
        ChannelFuture future = bootstrap.connect();
        future.addListener((GenericFutureListener)((ChannelFutureListener)f -> {
            if (f.isSuccess()) {
                this.channel = f.channel();
                log.debug("[{}][{}] Connected successfully {}!", new Object[]{host, port, this.channel.id()});
                this.channel.closeFuture().addListener((GenericFutureListener)((ChannelFutureListener)channelFuture -> {
                    if (this.isConnected()) {
                        return;
                    }
                    log.debug("[{}][{}] Channel is closed {}!", new Object[]{host, port, this.channel.id()});
                    ChannelClosedException e = new ChannelClosedException("Channel is closed!");
                    if (this.callback != null) {
                        this.callback.connectionLost(e);
                    }
                    this.pendingSubscriptions.forEach((id, mqttPendingSubscription) -> mqttPendingSubscription.onChannelClosed());
                    this.pendingSubscriptions.clear();
                    this.serverSubscriptions.clear();
                    this.subscriptions.clear();
                    this.pendingServerUnsubscribes.forEach((id, mqttPendingServerUnsubscribes) -> mqttPendingServerUnsubscribes.onChannelClosed());
                    this.pendingServerUnsubscribes.clear();
                    this.qos2PendingIncomingPublishes.clear();
                    this.pendingPublishes.forEach((id, mqttPendingPublish) -> mqttPendingPublish.onChannelClosed());
                    this.pendingPublishes.clear();
                    this.pendingSubscribeTopics.clear();
                    this.handlerToSubscription.clear();
                    this.scheduleConnectIfRequired(host, port, true);
                }));
            } else {
                log.debug("[{}][{}] Connect failed, trying reconnect!", (Object)host, (Object)port);
                this.scheduleConnectIfRequired(host, port, reconnect);
            }
        }));
        return connectFuture;
    }

    private void scheduleConnectIfRequired(String host, int port, boolean reconnect) {
        log.trace("[{}][{}][{}] Scheduling connect to server, isReconnect - {}", new Object[]{host, port, this.channel != null ? this.channel.id() : "UNKNOWN", reconnect});
        if (this.clientConfig.isReconnect() && !this.disconnected) {
            if (reconnect) {
                this.reconnect = true;
            }
            long nextReconnectDelay = this.reconnectStrategy.getNextReconnectDelay();
            log.debug("[{}][{}][{}] Scheduling reconnect in [{}] sec", new Object[]{host, port, this.channel != null ? this.channel.id() : "UNKNOWN", nextReconnectDelay});
            this.eventLoop.schedule(() -> this.connect(host, port, reconnect), nextReconnectDelay, TimeUnit.SECONDS);
        }
    }

    @Override
    public boolean isConnected() {
        return !this.disconnected && this.channel != null && this.channel.isActive();
    }

    @Override
    public Promise<MqttConnectResult> reconnect() {
        log.trace("[{}] Reconnecting to server, isReconnect - {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)this.reconnect);
        if (this.host == null) {
            throw new IllegalStateException("Cannot reconnect. Call connect() first");
        }
        return this.connect(this.host, this.port);
    }

    @Override
    public EventLoopGroup getEventLoop() {
        return this.eventLoop;
    }

    @Override
    public void setEventLoop(EventLoopGroup eventLoop) {
        this.eventLoop = eventLoop;
    }

    @Override
    public Future<Void> on(String topic, MqttHandler handler) {
        return this.on(topic, handler, MqttQoS.AT_MOST_ONCE);
    }

    @Override
    public Future<Void> on(String topic, MqttHandler handler, MqttQoS qos) {
        return this.createSubscription(topic, handler, false, qos);
    }

    @Override
    public Future<Void> once(String topic, MqttHandler handler) {
        return this.once(topic, handler, MqttQoS.AT_MOST_ONCE);
    }

    @Override
    public Future<Void> once(String topic, MqttHandler handler, MqttQoS qos) {
        return this.createSubscription(topic, handler, true, qos);
    }

    @Override
    public Future<Void> off(String topic, MqttHandler handler) {
        log.trace("[{}] Unsubscribing from {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)topic);
        DefaultPromise future = new DefaultPromise((EventExecutor)this.eventLoop.next());
        for (MqttSubscription subscription : this.handlerToSubscription.get((Object)handler)) {
            this.subscriptions.remove((Object)topic, (Object)subscription);
        }
        this.handlerToSubscription.removeAll((Object)handler);
        this.checkSubscriptions(topic, (Promise<Void>)future);
        return future;
    }

    @Override
    public Future<Void> off(String topic) {
        log.trace("[{}] Unsubscribing from {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)topic);
        DefaultPromise future = new DefaultPromise((EventExecutor)this.eventLoop.next());
        ImmutableSet subscriptions = ImmutableSet.copyOf((Collection)this.subscriptions.get((Object)topic));
        for (MqttSubscription subscription : subscriptions) {
            for (MqttSubscription handSub : this.handlerToSubscription.get((Object)subscription.getHandler())) {
                this.subscriptions.remove((Object)topic, (Object)handSub);
            }
            this.handlerToSubscription.remove((Object)subscription.getHandler(), (Object)subscription);
        }
        this.checkSubscriptions(topic, (Promise<Void>)future);
        return future;
    }

    @Override
    public Future<Void> publish(String topic, ByteBuf payload) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE, false);
    }

    @Override
    public Future<Void> publish(String topic, ByteBuf payload, MqttQoS qos) {
        return this.publish(topic, payload, qos, false);
    }

    @Override
    public Future<Void> publish(String topic, ByteBuf payload, boolean retain) {
        return this.publish(topic, payload, MqttQoS.AT_MOST_ONCE, retain);
    }

    @Override
    public Future<Void> publish(final String topic, ByteBuf payload, MqttQoS qos, boolean retain) {
        log.trace("[{}] Publishing message to {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)topic);
        DefaultPromise future = new DefaultPromise((EventExecutor)this.eventLoop.next());
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retain, 0);
        final MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, this.getNewMessageId().messageId());
        MqttPublishMessage message = new MqttPublishMessage(fixedHeader, variableHeader, payload);
        MqttPendingPublish pendingPublish = MqttPendingPublish.builder().messageId(variableHeader.packetId()).future((Promise<Void>)future).payload(payload.retain()).message(message).qos(qos).ownerId(this.clientConfig.getOwnerId()).retransmissionConfig(this.clientConfig.getRetransmissionConfig()).pendingOperation(new PendingOperation(){

            @Override
            public boolean isCancelled() {
                return !MqttClientImpl.this.pendingPublishes.containsKey(variableHeader.packetId());
            }

            @Override
            public void onMaxRetransmissionAttemptsReached() {
                MqttClientImpl.this.pendingPublishes.computeIfPresent(variableHeader.packetId(), (__, pendingPublish) -> {
                    String message = "Unable to deliver publish message due to max retransmission attempts (%s) being reached for client '%s' on topic '%s' (message ID: %d)".formatted(MqttClientImpl.this.clientConfig.getRetransmissionConfig().maxAttempts(), MqttClientImpl.this.clientConfig.getClientId(), topic, variableHeader.packetId());
                    pendingPublish.getFuture().tryFailure((Throwable)new MaxRetransmissionsReachedException(message));
                    pendingPublish.getPayload().release();
                    return null;
                });
            }
        }).build();
        this.pendingPublishes.put(pendingPublish.getMessageId(), pendingPublish);
        ChannelFuture channelFuture = this.sendAndFlushPacket(message);
        if (channelFuture != null) {
            channelFuture.addListener(arg_0 -> this.lambda$publish$6(pendingPublish, (Promise)future, arg_0));
        } else {
            this.pendingPublishes.remove(pendingPublish.getMessageId());
        }
        return future;
    }

    @Override
    public void disconnect() {
        if (this.disconnected) {
            return;
        }
        this.disconnected = true;
        log.trace("[{}] Disconnecting from server", this.channel != null ? this.channel.id() : "UNKNOWN");
        if (this.channel != null) {
            MqttMessage message = new MqttMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0));
            this.sendAndFlushPacket(message).addListener((GenericFutureListener)((ChannelFutureListener)future -> future.channel().close()));
            this.eventLoop.schedule(() -> {
                if (this.channel.isOpen()) {
                    log.trace("[{}] Channel still open after {} second; forcing close now", (Object)this.channel.id(), (Object)1);
                    this.channel.close();
                }
            }, 1L, TimeUnit.SECONDS);
        }
    }

    public void onSuccessfulReconnect() {
        if (this.callback != null) {
            this.callback.onSuccessfulReconnect();
        }
    }

    ChannelFuture sendAndFlushPacket(Object message) {
        if (this.channel == null) {
            return null;
        }
        if (this.channel.isActive()) {
            log.trace("[{}] Sending message {}", this.channel != null ? this.channel.id() : "UNKNOWN", message);
            return this.channel.writeAndFlush(message);
        }
        return this.channel.newFailedFuture((Throwable)new ChannelClosedException("Channel is closed!"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private MqttMessageIdVariableHeader getNewMessageId() {
        int messageId;
        AtomicInteger atomicInteger = this.nextMessageId;
        synchronized (atomicInteger) {
            this.nextMessageId.compareAndSet(65535, 1);
            messageId = this.nextMessageId.getAndIncrement();
        }
        return MqttMessageIdVariableHeader.from((int)messageId);
    }

    private Future<Void> createSubscription(final String topic, MqttHandler handler, boolean once, MqttQoS qos) {
        Optional<Map.Entry> subscriptionEntry;
        log.trace("[{}] Creating subscription to {}", this.channel != null ? this.channel.id() : "UNKNOWN", (Object)topic);
        if (this.pendingSubscribeTopics.contains(topic) && (subscriptionEntry = this.pendingSubscriptions.entrySet().stream().filter(e -> ((MqttPendingSubscription)e.getValue()).getTopic().equals(topic)).findAny()).isPresent()) {
            ((MqttPendingSubscription)subscriptionEntry.get().getValue()).addHandler(handler, once);
            return ((MqttPendingSubscription)subscriptionEntry.get().getValue()).getFuture();
        }
        if (this.serverSubscriptions.contains(topic)) {
            MqttSubscription subscription = new MqttSubscription(topic, handler, once);
            this.subscriptions.put((Object)topic, (Object)subscription);
            this.handlerToSubscription.put((Object)handler, (Object)subscription);
            return this.channel.newSucceededFuture();
        }
        DefaultPromise future = new DefaultPromise((EventExecutor)this.eventLoop.next());
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttTopicSubscription subscription = new MqttTopicSubscription(topic, qos);
        final MqttMessageIdVariableHeader variableHeader = this.getNewMessageId();
        MqttSubscribePayload payload = new MqttSubscribePayload(Collections.singletonList(subscription));
        MqttSubscribeMessage message = new MqttSubscribeMessage(fixedHeader, variableHeader, payload);
        MqttPendingSubscription pendingSubscription = MqttPendingSubscription.builder().future((Promise<Void>)future).topic(topic).handlers(Sets.newHashSet((Object[])new MqttPendingSubscription.MqttPendingHandler[]{new MqttPendingSubscription.MqttPendingHandler(handler, once)})).subscribeMessage(message).ownerId(this.clientConfig.getOwnerId()).retransmissionConfig(this.clientConfig.getRetransmissionConfig()).pendingOperation(new PendingOperation(){

            @Override
            public boolean isCancelled() {
                return !MqttClientImpl.this.pendingSubscriptions.containsKey(variableHeader.messageId());
            }

            @Override
            public void onMaxRetransmissionAttemptsReached() {
                MqttClientImpl.this.pendingSubscriptions.computeIfPresent(variableHeader.messageId(), (__, pendingSubscription) -> {
                    String message = "Unable to deliver subscribe message due to max retransmission attempts (%s) being reached for client '%s' on topic '%s' (message ID: %d)".formatted(MqttClientImpl.this.clientConfig.getRetransmissionConfig().maxAttempts(), MqttClientImpl.this.clientConfig.getClientId(), topic, variableHeader.messageId());
                    pendingSubscription.getFuture().tryFailure((Throwable)new MaxRetransmissionsReachedException(message));
                    return null;
                });
            }
        }).build();
        this.pendingSubscriptions.put(variableHeader.messageId(), pendingSubscription);
        this.pendingSubscribeTopics.add(topic);
        pendingSubscription.setSent(this.sendAndFlushPacket(message) != null);
        pendingSubscription.startRetransmitTimer(this.eventLoop.next(), this::sendAndFlushPacket);
        return future;
    }

    private void checkSubscriptions(final String topic, Promise<Void> promise) {
        if ((!this.subscriptions.containsKey((Object)topic) || this.subscriptions.get((Object)topic).isEmpty()) && this.serverSubscriptions.contains(topic)) {
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
            final MqttMessageIdVariableHeader variableHeader = this.getNewMessageId();
            MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Collections.singletonList(topic));
            MqttUnsubscribeMessage message = new MqttUnsubscribeMessage(fixedHeader, variableHeader, payload);
            MqttPendingUnsubscription pendingUnsubscription = MqttPendingUnsubscription.builder().future(promise).topic(topic).unsubscribeMessage(message).ownerId(this.clientConfig.getOwnerId()).retransmissionConfig(this.clientConfig.getRetransmissionConfig()).pendingOperation(new PendingOperation(){

                @Override
                public boolean isCancelled() {
                    return !MqttClientImpl.this.pendingServerUnsubscribes.containsKey(variableHeader.messageId());
                }

                @Override
                public void onMaxRetransmissionAttemptsReached() {
                    MqttClientImpl.this.pendingServerUnsubscribes.computeIfPresent(variableHeader.messageId(), (__, pendingUnsubscription) -> {
                        String message = "Unable to deliver unsubscribe message due to max retransmission attempts (%s) being reached for client '%s' on topic '%s' (message ID: %d)".formatted(MqttClientImpl.this.clientConfig.getRetransmissionConfig().maxAttempts(), MqttClientImpl.this.clientConfig.getClientId(), topic, variableHeader.messageId());
                        pendingUnsubscription.getFuture().tryFailure((Throwable)new MaxRetransmissionsReachedException(message));
                        return null;
                    });
                }
            }).build();
            this.pendingServerUnsubscribes.put(variableHeader.messageId(), pendingUnsubscription);
            pendingUnsubscription.startRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
            this.sendAndFlushPacket(message);
        } else {
            promise.setSuccess(null);
        }
    }

    @Generated
    Set<String> getServerSubscriptions() {
        return this.serverSubscriptions;
    }

    @Generated
    ConcurrentMap<Integer, MqttPendingUnsubscription> getPendingServerUnsubscribes() {
        return this.pendingServerUnsubscribes;
    }

    @Generated
    ConcurrentMap<Integer, MqttIncomingQos2Publish> getQos2PendingIncomingPublishes() {
        return this.qos2PendingIncomingPublishes;
    }

    @Generated
    ConcurrentMap<Integer, MqttPendingPublish> getPendingPublishes() {
        return this.pendingPublishes;
    }

    @Generated
    HashMultimap<String, MqttSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Generated
    ConcurrentMap<Integer, MqttPendingSubscription> getPendingSubscriptions() {
        return this.pendingSubscriptions;
    }

    @Generated
    Set<String> getPendingSubscribeTopics() {
        return this.pendingSubscribeTopics;
    }

    @Generated
    HashMultimap<MqttHandler, MqttSubscription> getHandlerToSubscription() {
        return this.handlerToSubscription;
    }

    @Override
    @Generated
    public MqttClientConfig getClientConfig() {
        return this.clientConfig;
    }

    @Generated
    MqttHandler getDefaultHandler() {
        return this.defaultHandler;
    }

    @Generated
    public boolean isReconnect() {
        return this.reconnect;
    }

    @Generated
    public MqttClientCallback getCallback() {
        return this.callback;
    }

    @Override
    @Generated
    public void setCallback(MqttClientCallback callback) {
        this.callback = callback;
    }

    @Override
    @Generated
    public ListeningExecutor getHandlerExecutor() {
        return this.handlerExecutor;
    }

    private /* synthetic */ void lambda$publish$6(MqttPendingPublish pendingPublish, Promise future, Future result) throws Exception {
        pendingPublish.setSent(true);
        if (result.cause() != null) {
            this.pendingPublishes.remove(pendingPublish.getMessageId());
            future.setFailure(result.cause());
        } else if (pendingPublish.isSent() && pendingPublish.getQos() == MqttQoS.AT_MOST_ONCE) {
            this.pendingPublishes.remove(pendingPublish.getMessageId());
            pendingPublish.getFuture().setSuccess(null);
        } else if (pendingPublish.isSent()) {
            pendingPublish.startPublishRetransmissionTimer(this.eventLoop.next(), this::sendAndFlushPacket);
        } else {
            this.pendingPublishes.remove(pendingPublish.getMessageId());
        }
    }

    private class MqttChannelInitializer
    extends ChannelInitializer<SocketChannel> {
        private final Promise<MqttConnectResult> connectFuture;
        private final String host;
        private final int port;
        private final SslContext sslContext;

        public MqttChannelInitializer(Promise<MqttConnectResult> connectFuture, String host, int port, SslContext sslContext) {
            this.connectFuture = connectFuture;
            this.host = host;
            this.port = port;
            this.sslContext = sslContext;
        }

        protected void initChannel(SocketChannel ch) throws Exception {
            if (this.sslContext != null) {
                ch.pipeline().addLast(new ChannelHandler[]{this.sslContext.newHandler(ch.alloc(), this.host, this.port)});
            }
            ch.pipeline().addLast("mqttDecoder", (ChannelHandler)new MqttDecoder(MqttClientImpl.this.clientConfig.getMaxBytesInMessage()));
            ch.pipeline().addLast("mqttEncoder", (ChannelHandler)MqttEncoder.INSTANCE);
            ch.pipeline().addLast("idleStateHandler", (ChannelHandler)new IdleStateHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds(), MqttClientImpl.this.clientConfig.getTimeoutSeconds(), 0));
            ch.pipeline().addLast("mqttPingHandler", (ChannelHandler)new MqttPingHandler(MqttClientImpl.this.clientConfig.getTimeoutSeconds()));
            ch.pipeline().addLast("mqttHandler", (ChannelHandler)new MqttChannelHandler(MqttClientImpl.this, this.connectFuture));
        }
    }
}

