/*
 * Decompiled with CFR 0.152.
 */
package org.thingsboard.mqtt;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.thingsboard.mqtt.MqttClientImpl;
import org.thingsboard.mqtt.MqttConnectResult;
import org.thingsboard.mqtt.MqttIncomingQos2Publish;
import org.thingsboard.mqtt.MqttPendingPublish;
import org.thingsboard.mqtt.MqttPendingSubscription;
import org.thingsboard.mqtt.MqttPendingUnsubscription;
import org.thingsboard.mqtt.MqttSubscription;

final class MqttChannelHandler
extends SimpleChannelInboundHandler<MqttMessage> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(MqttChannelHandler.class);
    private final MqttClientImpl client;
    private final Promise<MqttConnectResult> connectFuture;

    MqttChannelHandler(MqttClientImpl client, Promise<MqttConnectResult> connectFuture) {
        this.client = client;
        this.connectFuture = connectFuture;
    }

    protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) {
        if (msg.decoderResult().isSuccess()) {
            switch (msg.fixedHeader().messageType()) {
                case CONNACK: {
                    this.handleConack(ctx.channel(), (MqttConnAckMessage)msg);
                    break;
                }
                case SUBACK: {
                    this.handleSubAck((MqttSubAckMessage)msg);
                    break;
                }
                case PUBLISH: {
                    this.handlePublish(ctx.channel(), (MqttPublishMessage)msg);
                    break;
                }
                case UNSUBACK: {
                    this.handleUnsuback((MqttUnsubAckMessage)msg);
                    break;
                }
                case PUBACK: {
                    this.handlePuback((MqttPubAckMessage)msg);
                    break;
                }
                case PUBREC: {
                    this.handlePubrec(ctx.channel(), msg);
                    break;
                }
                case PUBREL: {
                    this.handlePubrel(ctx.channel(), msg);
                    break;
                }
                case PUBCOMP: {
                    this.handlePubcomp(msg);
                    break;
                }
                case DISCONNECT: {
                    this.handleDisconnect(msg);
                }
            }
        } else {
            log.error("[{}] Message decoding failed: {}", (Object)this.client.getClientConfig().getClientId(), (Object)msg.decoderResult().cause().getMessage());
            ctx.close();
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(this.client.getClientConfig().getProtocolVersion().protocolName(), (int)this.client.getClientConfig().getProtocolVersion().protocolLevel(), this.client.getClientConfig().getUsername() != null, this.client.getClientConfig().getPassword() != null, this.client.getClientConfig().getLastWill() != null && this.client.getClientConfig().getLastWill().isRetain(), this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getQos().value() : 0, this.client.getClientConfig().getLastWill() != null, this.client.getClientConfig().isCleanSession(), this.client.getClientConfig().getTimeoutSeconds());
        MqttConnectPayload payload = new MqttConnectPayload(this.client.getClientConfig().getClientId(), this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getTopic() : null, this.client.getClientConfig().getLastWill() != null ? this.client.getClientConfig().getLastWill().getMessage().getBytes(CharsetUtil.UTF_8) : null, this.client.getClientConfig().getUsername(), this.client.getClientConfig().getPassword() != null ? this.client.getClientConfig().getPassword().getBytes(CharsetUtil.UTF_8) : null);
        log.debug("{} Sending CONNECT", (Object)this.client.getClientConfig().getOwnerId());
        ctx.channel().writeAndFlush((Object)new MqttConnectMessage(fixedHeader, variableHeader, payload));
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ListenableFuture<Void> invokeHandlersForIncomingPublish(MqttPublishMessage message) {
        ListenableFuture future = Futures.immediateVoidFuture();
        AtomicBoolean handlerInvoked = new AtomicBoolean();
        try {
            for (MqttSubscription subscription : ImmutableSet.copyOf((Collection)this.client.getSubscriptions().values())) {
                if (!subscription.matches(message.variableHeader().topicName())) continue;
                future = Futures.transform((ListenableFuture)future, x -> {
                    if (subscription.isOnce() && subscription.isCalled()) {
                        return null;
                    }
                    message.payload().markReaderIndex();
                    subscription.setCalled(true);
                    subscription.getHandler().onMessage(message.variableHeader().topicName(), message.payload());
                    if (subscription.isOnce()) {
                        this.client.off(subscription.getTopic(), subscription.getHandler());
                    }
                    message.payload().resetReaderIndex();
                    handlerInvoked.set(true);
                    return null;
                }, (Executor)this.client.getHandlerExecutor());
            }
            future = Futures.transform((ListenableFuture)future, x -> {
                if (!handlerInvoked.get() && this.client.getDefaultHandler() != null) {
                    this.client.getDefaultHandler().onMessage(message.variableHeader().topicName(), message.payload());
                }
                return null;
            }, (Executor)this.client.getHandlerExecutor());
        }
        catch (Throwable throwable) {
            Futures.addCallback((ListenableFuture)future, (FutureCallback)new FutureCallback<Void>(message){
                final /* synthetic */ MqttPublishMessage val$message;
                {
                    this.val$message = mqttPublishMessage;
                }

                public void onSuccess(Void result) {
                    this.val$message.payload().release();
                }

                public void onFailure(Throwable t) {
                    this.val$message.payload().release();
                }
            }, (Executor)MoreExecutors.directExecutor());
            throw throwable;
        }
        Futures.addCallback((ListenableFuture)future, (FutureCallback)new /* invalid duplicate definition of identical inner class */, (Executor)MoreExecutors.directExecutor());
        return future;
    }

    private void handleConack(Channel channel, MqttConnAckMessage message) {
        log.debug("{} Handling CONNACK", (Object)this.client.getClientConfig().getOwnerId());
        switch (message.variableHeader().connectReturnCode()) {
            case CONNECTION_ACCEPTED: {
                this.connectFuture.setSuccess((Object)new MqttConnectResult(true, MqttConnectReturnCode.CONNECTION_ACCEPTED, channel.closeFuture()));
                this.client.getPendingSubscriptions().entrySet().stream().filter(e -> !((MqttPendingSubscription)e.getValue()).isSent()).forEach(e -> {
                    channel.write((Object)((MqttPendingSubscription)e.getValue()).getSubscribeMessage());
                    ((MqttPendingSubscription)e.getValue()).setSent(true);
                });
                this.client.getPendingPublishes().forEach((id, publish) -> {
                    if (publish.isSent()) {
                        return;
                    }
                    channel.write((Object)publish.getMessage());
                    publish.setSent(true);
                    if (publish.getQos() == MqttQoS.AT_MOST_ONCE) {
                        publish.getFuture().setSuccess(null);
                        this.client.getPendingPublishes().remove(publish.getMessageId());
                    }
                });
                channel.flush();
                if (!this.client.isReconnect()) break;
                this.client.onSuccessfulReconnect();
                break;
            }
            case CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD: 
            case CONNECTION_REFUSED_IDENTIFIER_REJECTED: 
            case CONNECTION_REFUSED_NOT_AUTHORIZED: 
            case CONNECTION_REFUSED_SERVER_UNAVAILABLE: 
            case CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION: {
                this.connectFuture.setSuccess((Object)new MqttConnectResult(false, message.variableHeader().connectReturnCode(), channel.closeFuture()));
                channel.close();
            }
        }
        if (this.client.getCallback() != null) {
            this.client.getCallback().onConnAck(message);
        }
    }

    private void handleSubAck(MqttSubAckMessage message) {
        MqttPendingSubscription pendingSubscription = (MqttPendingSubscription)this.client.getPendingSubscriptions().remove(message.variableHeader().messageId());
        if (pendingSubscription == null) {
            return;
        }
        pendingSubscription.onSubackReceived();
        for (MqttPendingSubscription.MqttPendingHandler handler : pendingSubscription.getHandlers()) {
            MqttSubscription subscription = new MqttSubscription(pendingSubscription.getTopic(), handler.handler(), handler.once());
            this.client.getSubscriptions().put((Object)pendingSubscription.getTopic(), (Object)subscription);
            this.client.getHandlerToSubscription().put((Object)handler.handler(), (Object)subscription);
        }
        this.client.getPendingSubscribeTopics().remove(pendingSubscription.getTopic());
        this.client.getServerSubscriptions().add(pendingSubscription.getTopic());
        if (!pendingSubscription.getFuture().isDone()) {
            pendingSubscription.getFuture().setSuccess(null);
        }
        if (this.client.getCallback() != null) {
            this.client.getCallback().onSubAck(message);
        }
    }

    private void handlePublish(Channel channel, MqttPublishMessage message) {
        switch (message.fixedHeader().qosLevel()) {
            case AT_MOST_ONCE: {
                this.invokeHandlersForIncomingPublish(message);
                break;
            }
            case AT_LEAST_ONCE: {
                ListenableFuture<Void> future = this.invokeHandlersForIncomingPublish(message);
                if (message.variableHeader().packetId() == -1) break;
                future.addListener(() -> {
                    MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
                    MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from((int)message.variableHeader().packetId());
                    channel.writeAndFlush((Object)new MqttPubAckMessage(fixedHeader, variableHeader));
                }, MoreExecutors.directExecutor());
                break;
            }
            case EXACTLY_ONCE: {
                if (message.variableHeader().packetId() == -1) break;
                MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from((int)message.variableHeader().packetId());
                MqttMessage pubrecMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
                MqttIncomingQos2Publish incomingQos2Publish = new MqttIncomingQos2Publish(message);
                this.client.getQos2PendingIncomingPublishes().put(message.variableHeader().packetId(), incomingQos2Publish);
                channel.writeAndFlush((Object)pubrecMessage);
            }
        }
    }

    private void handleUnsuback(MqttUnsubAckMessage message) {
        MqttPendingUnsubscription unsubscription = (MqttPendingUnsubscription)this.client.getPendingServerUnsubscribes().get(message.variableHeader().messageId());
        if (unsubscription == null) {
            return;
        }
        unsubscription.onUnsubackReceived();
        this.client.getServerSubscriptions().remove(unsubscription.getTopic());
        unsubscription.getFuture().setSuccess(null);
        this.client.getPendingServerUnsubscribes().remove(message.variableHeader().messageId());
        if (this.client.getCallback() != null) {
            this.client.getCallback().onUnsubAck(message);
        }
    }

    private void handlePuback(MqttPubAckMessage message) {
        log.trace("{} Handling PUBACK", (Object)this.client.getClientConfig().getOwnerId());
        this.client.getPendingPublishes().computeIfPresent(message.variableHeader().messageId(), (__, pendingPublish) -> {
            pendingPublish.getFuture().setSuccess(null);
            pendingPublish.onPubackReceived();
            pendingPublish.getPayload().release();
            if (this.client.getCallback() != null) {
                this.client.getCallback().onPubAck(message);
            }
            return null;
        });
    }

    private void handlePubrec(Channel channel, MqttMessage message) {
        MqttPendingPublish pendingPublish = (MqttPendingPublish)this.client.getPendingPublishes().get(((MqttMessageIdVariableHeader)message.variableHeader()).messageId());
        pendingPublish.onPubackReceived();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)message.variableHeader();
        MqttMessage pubrelMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
        channel.writeAndFlush((Object)pubrelMessage);
        pendingPublish.setPubrelMessage(pubrelMessage);
        pendingPublish.startPubrelRetransmissionTimer(this.client.getEventLoop().next(), this.client::sendAndFlushPacket);
    }

    private void handlePubrel(Channel channel, MqttMessage message) {
        ListenableFuture future = Futures.immediateVoidFuture();
        if (this.client.getQos2PendingIncomingPublishes().containsKey(((MqttMessageIdVariableHeader)message.variableHeader()).messageId())) {
            MqttIncomingQos2Publish incomingQos2Publish = (MqttIncomingQos2Publish)this.client.getQos2PendingIncomingPublishes().get(((MqttMessageIdVariableHeader)message.variableHeader()).messageId());
            future = this.invokeHandlersForIncomingPublish(incomingQos2Publish.getIncomingPublish());
            future = Futures.transform(future, x -> {
                this.client.getQos2PendingIncomingPublishes().remove(incomingQos2Publish.getIncomingPublish().variableHeader().packetId());
                return null;
            }, (Executor)MoreExecutors.directExecutor());
        }
        future.addListener(() -> {
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
            MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from((int)((MqttMessageIdVariableHeader)message.variableHeader()).messageId());
            channel.writeAndFlush((Object)new MqttMessage(fixedHeader, (Object)variableHeader));
        }, MoreExecutors.directExecutor());
    }

    private void handlePubcomp(MqttMessage message) {
        MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader)message.variableHeader();
        MqttPendingPublish pendingPublish = (MqttPendingPublish)this.client.getPendingPublishes().get(variableHeader.messageId());
        pendingPublish.getFuture().setSuccess(null);
        this.client.getPendingPublishes().remove(variableHeader.messageId());
        pendingPublish.getPayload().release();
        pendingPublish.onPubcompReceived();
    }

    private void handleDisconnect(MqttMessage message) {
        log.debug("{} Handling DISCONNECT", (Object)this.client.getClientConfig().getOwnerId());
        if (this.client.getCallback() != null) {
            this.client.getCallback().onDisconnect(message);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        try {
            if (cause instanceof IOException) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] IOException: ", (Object)this.client.getClientConfig().getOwnerId(), (Object)cause);
                } else {
                    log.info("[{}] IOException: {}", (Object)this.client.getClientConfig().getOwnerId(), (Object)cause.getMessage());
                }
            } else {
                log.warn("[{}] exceptionCaught", (Object)this.client.getClientConfig().getOwnerId(), (Object)cause);
            }
        }
        finally {
            ReferenceCountUtil.release((Object)cause);
        }
    }
}

