package com.humuson.rainboots.server.service;

import com.humuson.rainboots.context.RainbootsContext;
import com.humuson.rainboots.datastore.DataStore;
import com.humuson.rainboots.events.AbstractMsgEvent;
import com.humuson.rainboots.events.MQTTOutBoundMsgEvent;
import com.humuson.rainboots.events.mqtt.PublishEvent;
import com.humuson.rainboots.events.mqtt.RepublishEvent;
import com.humuson.rainboots.messaging.events.RainbootsEvent;
import com.humuson.rainboots.proto.messages.AbstractMqttMessage;
import com.humuson.rainboots.proto.messages.FeedbackProtos;
import com.humuson.rainboots.proto.messages.MqttConnAckMessage;
import com.humuson.rainboots.proto.messages.MqttConnectMessage;
import com.humuson.rainboots.proto.messages.MqttPubAckMessage;
import com.humuson.rainboots.proto.messages.MqttPubCompMessage;
import com.humuson.rainboots.proto.messages.MqttPubRecMessage;
import com.humuson.rainboots.proto.messages.MqttPubRelMessage;
import com.humuson.rainboots.proto.messages.MqttPublishMessage;
import com.humuson.rainboots.proto.messages.MqttSubAckMessage;
import com.humuson.rainboots.proto.messages.MqttSubscribeMessage;
import com.humuson.rainboots.proto.messages.MqttUnsubAckMessage;
import com.humuson.rainboots.proto.messages.PushProtos;
import com.humuson.rainboots.server.handler.RainbootsIdleHandler;
import com.humuson.rainboots.server.metrics.RainbootsMetrics;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.ConcurrentSet;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Component
@Qualifier("mqttProcessor")
@Service
/* loaded from: input_file:com/humuson/rainboots/server/service/MqttProtocolProcessor.class */
public class MqttProtocolProcessor implements EventHandler<RainbootsEvent> {
    private static final int MAX_USER_ID_SIZE = 23;
    private static final int MQTT_CUR_VERSION = 3;

    @Autowired
    private DataStore dataStore;

    @Value("${out.work.buffer.size}")
    private int outboundEntrySize;
    private ExecutorService outboundExecutor;
    private RingBuffer<RainbootsEvent> outboundRingBuffer;
    private BatchEventProcessor<RainbootsEvent> eventProcessor;

    @Value("${republish.flag}")
    private Boolean isRePublishFlag;

    @Value("${print.disconnect.count}")
    private int disconnectCount;

    @Value("${rainboots.server.id}")
    private String serverId;

    @Autowired
    private RainbootsMetrics serverMetrics;
    static final Logger logger = LoggerFactory.getLogger(MqttProtocolProcessor.class);
    static final ConcurrentSet<String> denyClients = new ConcurrentSet<>();
    private final int MAX_ELAPSED_TIME = 100;
    final ConcurrentHashMap<String, Channel> mapChannels = new ConcurrentHashMap<>();
    final ConcurrentHashMap<String, Integer> topicChannels = new ConcurrentHashMap<>();
    final ConcurrentHashMap<String, Integer> badClients = new ConcurrentHashMap<>();
    private final ChannelFutureListener channelRemover = new ChannelFutureListener() { // from class: com.humuson.rainboots.server.service.MqttProtocolProcessor.1
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            String str = (String) channelFuture.channel().attr(RainbootsContext.CLIENT_ID_KEY).get();
            String str2 = (String) channelFuture.channel().attr(RainbootsContext.TOPIC_KEY).get();
            MqttProtocolProcessor.this.mapChannels.remove(str, channelFuture.channel());
            if (MqttProtocolProcessor.logger.isDebugEnabled()) {
                MqttProtocolProcessor.logger.debug("CLOSED : {} topic :{} ip: {}", new Object[]{str, str2, channelFuture.channel().remoteAddress().toString()});
            }
            if (str2 == null) {
                MqttProtocolProcessor.this.serverMetrics.increamentDisconnectChannel();
                MqttProtocolProcessor.logger.info("CLOSED=>id:{} ip:{} topic is null", new Object[]{str, str2, channelFuture.channel().remoteAddress().toString()});
                return;
            }
            synchronized (MqttProtocolProcessor.this.topicChannels) {
                Integer num = MqttProtocolProcessor.this.topicChannels.get(str2);
                if (num == null || num.intValue() == 0) {
                    MqttProtocolProcessor.this.topicChannels.put(str2, 0);
                } else {
                    MqttProtocolProcessor.this.topicChannels.put(str2, Integer.valueOf(num.intValue() - 1));
                }
            }
            MqttProtocolProcessor.this.serverMetrics.increamentDisconnectChannel(str2);
        }
    };

    @PostConstruct
    public void init() {
        logger.info("outboundEntrySize :{}", Integer.valueOf(this.outboundEntrySize));
        this.outboundExecutor = Executors.newFixedThreadPool(1);
        this.outboundRingBuffer = RingBuffer.createMultiProducer(RainbootsEvent.EVENT_FACTORY, this.outboundEntrySize);
        this.eventProcessor = new BatchEventProcessor<>(this.outboundRingBuffer, this.outboundRingBuffer.newBarrier(new Sequence[0]), this);
        this.outboundRingBuffer.addGatingSequences(new Sequence[]{this.eventProcessor.getSequence()});
        this.outboundExecutor.submit((Runnable) this.eventProcessor);
    }

    public int getConnectUserCount() {
        return this.mapChannels.size();
    }

    public int getActiveUserCount(String str) {
        Integer num = this.topicChannels.get(str);
        if (num == null) {
            return 0;
        }
        return num.intValue();
    }

    public boolean isActiveChannel(String str) {
        if (this.mapChannels.get(str) == null) {
            return false;
        }
        return this.mapChannels.get(str).isActive();
    }

    public void onEvent(RainbootsEvent rainbootsEvent, long j, boolean z) throws Exception {
        try {
            AbstractMsgEvent event = rainbootsEvent.getEvent();
            if (event instanceof MQTTOutBoundMsgEvent) {
                MQTTOutBoundMsgEvent mQTTOutBoundMsgEvent = (MQTTOutBoundMsgEvent) event;
                Channel channel = mQTTOutBoundMsgEvent.getChannel();
                AbstractMqttMessage message = mQTTOutBoundMsgEvent.getMessage();
                if (channel != null) {
                    logger.info("OnEvent: {}", message.toString());
                    channel.writeAndFlush(message);
                } else if (message instanceof MqttPublishMessage) {
                    logger.error("channel is null [msg type :{}, clientId:{}]", Byte.valueOf(message.getMessageType()), ((MqttPublishMessage) message).getClientId());
                } else {
                    logger.error("channel is null [msg type :{}]", Byte.valueOf(message.getMessageType()));
                }
            }
        } catch (Exception e) {
            logger.error("MQTTProcessor onEvent : {}", e);
        }
    }

    public void handelProtocolMessage(String str, AbstractMqttMessage abstractMqttMessage) {
        logger.debug("handelProtocolMessage clientId : {}", str);
        Channel channel = this.mapChannels.get(str);
        if (channel != null) {
            disruptorPublish(new MQTTOutBoundMsgEvent(channel, abstractMqttMessage));
        }
    }

    private boolean disruptorPublish(AbstractMsgEvent abstractMsgEvent) {
        try {
            long tryNext = this.outboundRingBuffer.tryNext();
            ((RainbootsEvent) this.outboundRingBuffer.get(tryNext)).setEvent(abstractMsgEvent);
            this.outboundRingBuffer.publish(tryNext);
            return true;
        } catch (Exception e) {
            logger.error("Exception in MqttProcessor.disruptorPublish :{}", e);
            return false;
        }
    }

    public void processStop() {
        logger.info("disconnect: {}, badClients.size:{}", Integer.valueOf(this.disconnectCount), Integer.valueOf(this.badClients.size()));
        for (Map.Entry<String, Integer> entry : this.badClients.entrySet()) {
            if (entry.getValue().intValue() >= this.disconnectCount) {
                logger.info("clientId : {}. disconnectCount:{}", entry.getKey(), entry.getValue());
            }
        }
        this.outboundExecutor.shutdownNow();
        logger.info("MQTTProcessor Down");
    }

    public void processConnect(MqttConnectMessage mqttConnectMessage) {
        if (logger.isDebugEnabled()) {
            logger.debug("process Connect message");
        }
        long currentTimeMillis = System.currentTimeMillis();
        String clientId = mqttConnectMessage.getClientId();
        ChannelHandlerContext channelHandlerContext = mqttConnectMessage.getChannelHandlerContext();
        if (channelHandlerContext == null) {
            logger.error("client id : {} channel is null", clientId);
            return;
        }
        if (mqttConnectMessage.getProtocolVersion() != 3) {
            channelHandlerContext.writeAndFlush(new MqttConnAckMessage((byte) 1));
            logger.error("bad MQTT protocol : {}", clientId);
            channelHandlerContext.close();
            return;
        }
        if (clientId == null || clientId.length() > MAX_USER_ID_SIZE) {
            channelHandlerContext.writeAndFlush(new MqttConnAckMessage((byte) 2));
            logger.error("REJECTED UserId {}, lenth : {}", clientId, Integer.valueOf(clientId == null ? 0 : clientId.length()));
            channelHandlerContext.close();
            return;
        }
        boolean z = false;
        String str = null;
        if (mqttConnectMessage.isUserFlag()) {
            if (mqttConnectMessage.isPasswordFlag()) {
                str = mqttConnectMessage.getPassword();
                z = this.dataStore.isTopic(str);
            }
            mqttConnectMessage.getUserName();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("mapSize : {}", Integer.valueOf(this.mapChannels.size()));
        }
        if (this.mapChannels.containsKey(clientId) && !denyClients.contains(clientId)) {
            Channel channel = this.mapChannels.get(clientId);
            if (channelHandlerContext.channel().remoteAddress().toString().substring(0, channel.remoteAddress().toString().indexOf(DataStore.DELIM)).equals(channel.remoteAddress().toString().substring(0, channel.remoteAddress().toString().indexOf(DataStore.DELIM)))) {
                Integer num = this.badClients.get(clientId);
                if (num == null) {
                    num = 0;
                }
                this.badClients.put(clientId, Integer.valueOf(num.intValue() + 1));
            } else {
                logger.debug("old cId:{} ip:{}", clientId, channel.remoteAddress().toString());
                logger.debug("cur cId:{} ip:{}", clientId, channelHandlerContext.channel().remoteAddress().toString());
                channel.close();
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (!z) {
            logger.info("{} is invalid topic", str);
            channelHandlerContext.writeAndFlush(new MqttConnAckMessage((byte) 4));
            channelHandlerContext.close();
            return;
        }
        channelHandlerContext.channel().attr(RainbootsContext.TOPIC_KEY).set(str);
        channelHandlerContext.channel().attr(RainbootsContext.CLEAN_SESSION_KEY).set(mqttConnectMessage.isCleanSession() ? "true" : "false");
        channelHandlerContext.channel().attr(RainbootsContext.CLIENT_ID_KEY).set(clientId);
        List<Integer> subsMessageList = this.dataStore.getSubsMessageList(str, clientId, this.serverId);
        long currentTimeMillis3 = (System.currentTimeMillis() - currentTimeMillis) - currentTimeMillis2;
        if (channelHandlerContext.pipeline().names().contains(RainbootsContext.IDLE_HANDLER)) {
            channelHandlerContext.pipeline().remove(RainbootsContext.IDLE_HANDLER);
        }
        channelHandlerContext.pipeline().addFirst(RainbootsContext.IDLE_HANDLER, new RainbootsIdleHandler(0, 0, Math.round(mqttConnectMessage.getKeepAlive() * 1.5f)));
        long currentTimeMillis4 = ((System.currentTimeMillis() - currentTimeMillis3) - currentTimeMillis) - currentTimeMillis2;
        if (!denyClients.contains(clientId)) {
            addActiveChannel(channelHandlerContext.channel(), str, clientId);
        }
        channelHandlerContext.writeAndFlush(new MqttConnAckMessage((byte) 0));
        this.serverMetrics.increamentChannel(str);
        long currentTimeMillis5 = (((System.currentTimeMillis() - currentTimeMillis4) - currentTimeMillis) - currentTimeMillis3) - currentTimeMillis2;
        if (subsMessageList.size() > 0) {
            logger.info("unReadMessageKeyList cnt:{}", Integer.valueOf(subsMessageList.size()));
            if (this.isRePublishFlag.booleanValue() || this.isRePublishFlag == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("client : {}, REPUBLISH message size : {}", clientId, Integer.valueOf(subsMessageList.size()));
                }
                processRepublish(new RepublishEvent(clientId, subsMessageList));
            } else {
                removeOldMessage(str, clientId, subsMessageList);
            }
        }
        long currentTimeMillis6 = System.currentTimeMillis() - currentTimeMillis;
        long currentTimeMillis7 = ((((System.currentTimeMillis() - currentTimeMillis2) - currentTimeMillis3) - currentTimeMillis4) - currentTimeMillis5) - currentTimeMillis;
        if (currentTimeMillis6 > 200) {
            logger.info("CONNECT clientId:{}, elapsedTime:{} oldSsClose:{}, setAct:{}, pipeCh:{}, wCack:{}, rePub:{}", new Object[]{clientId, Long.valueOf(currentTimeMillis6), Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis3), Long.valueOf(currentTimeMillis4), Long.valueOf(currentTimeMillis5), Long.valueOf(currentTimeMillis7)});
        }
    }

    private void removeOldMessage(String str, String str2, List<Integer> list) {
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            this.dataStore.removeMessage(str, str2, it.next().intValue());
        }
    }

    public void processPublish(PublishEvent publishEvent) {
        logger.info("PUBLISH {}", publishEvent.toString());
        AbstractMqttMessage.QOSLevel qosLevel = publishEvent.getQosLevel();
        String clientId = publishEvent.getClientId();
        int messageId = publishEvent.getMessageId();
        Channel channel = this.mapChannels.get(clientId);
        if (qosLevel == AbstractMqttMessage.QOSLevel.LEAST_ONCE) {
            channel.writeAndFlush(new MqttPubAckMessage(messageId));
        } else if (qosLevel == AbstractMqttMessage.QOSLevel.EXACTLY_ONCE) {
            MqttPubRecMessage mqttPubRecMessage = new MqttPubRecMessage();
            mqttPubRecMessage.setMessageId(Integer.valueOf(messageId));
            channel.writeAndFlush(mqttPubRecMessage);
        }
    }

    public void processRepublish(RepublishEvent republishEvent) {
        long j;
        String clientId = republishEvent.getClientId();
        if (clientId == null) {
            logger.debug("client id is null");
            return;
        }
        String str = (String) this.mapChannels.get(clientId).attr(RainbootsContext.TOPIC_KEY).get();
        for (Integer num : republishEvent.getRepublishList()) {
            try {
                Map<String, String> messageMap = this.dataStore.getMessageMap(str, clientId, num.intValue());
                String str2 = messageMap.get("status");
                try {
                    j = Long.parseLong(messageMap.get(DataStore.FIELD_MESSAGE_EXPIRED_TIME));
                } catch (Exception e) {
                    j = 0;
                }
                if (System.currentTimeMillis() > j || j == 0) {
                    logger.info("processRepublish Expired [clientId:{} msgId:{}]", clientId, num);
                    this.dataStore.addFeedback(str, clientId, num.intValue(), FeedbackProtos.FeedbackResponse.MessageResultType.TIMEOUT_VALUE);
                } else if (AbstractMqttMessage.MessageStatus.PUBLISH.name().equals(str2)) {
                    ByteBuffer wrap = ByteBuffer.wrap(messageMap.get(DataStore.FIELD_MESSAGE_CONTENT).getBytes());
                    MqttPublishMessage mqttPublishMessage = new MqttPublishMessage();
                    mqttPublishMessage.setRetainFlag(false);
                    mqttPublishMessage.setTopicName(str);
                    mqttPublishMessage.setQosLevel(AbstractMqttMessage.QOSLevel.EXACTLY_ONCE);
                    mqttPublishMessage.setPayload(wrap);
                    mqttPublishMessage.setMessageId(num);
                    disruptorPublish(new MQTTOutBoundMsgEvent(this.mapChannels.get(clientId), mqttPublishMessage));
                } else if (AbstractMqttMessage.MessageStatus.PUB_REC.name().equals(str2) || AbstractMqttMessage.MessageStatus.PUB_REL.name().equals(str2)) {
                    processPubRec(clientId, num.intValue());
                } else {
                    this.dataStore.addFeedback(str, clientId, num.intValue(), FeedbackProtos.FeedbackResponse.MessageResultType.UNACTIVED_TOKEN_VALUE);
                }
            } catch (Exception e2) {
                logger.error("republish error", e2);
                logger.error("remove republish messageId :{}", num);
                this.dataStore.addFeedback(str, clientId, num.intValue(), FeedbackProtos.FeedbackResponse.MessageResultType.TIMEOUT_VALUE);
            }
        }
    }

    public boolean publishNotify(PushProtos.PushRequest.PushType pushType, String str, String str2, AbstractMqttMessage.QOSLevel qOSLevel, ByteBuffer byteBuffer, boolean z, Integer num, String str3, long j, boolean z2) {
        logger.info("PUBLISH => topic : {}, client Id : {}, msgId:{}", new Object[]{str2, str3, num});
        MqttPublishMessage mqttPublishMessage = new MqttPublishMessage();
        mqttPublishMessage.setRetainFlag(z);
        mqttPublishMessage.setTopicName(str2);
        mqttPublishMessage.setQosLevel(qOSLevel);
        mqttPublishMessage.setPayload(byteBuffer);
        if (qOSLevel != AbstractMqttMessage.QOSLevel.MOST_ONCE) {
            mqttPublishMessage.setMessageId(num);
        }
        mqttPublishMessage.setUnActivePublish(z2);
        mqttPublishMessage.setPushType(pushType);
        mqttPublishMessage.setClientId(str3);
        mqttPublishMessage.setRequestId(str);
        mqttPublishMessage.setExpiredTime(j);
        Channel channel = this.mapChannels.get(str3);
        if (channel == null && !z2) {
            logger.error(" clientId :{} channel is null", str3);
            return false;
        }
        this.serverMetrics.increamentPub(str2);
        if (qOSLevel != AbstractMqttMessage.QOSLevel.MOST_ONCE) {
            this.dataStore.setMessage(mqttPublishMessage.getPushType(), mqttPublishMessage.getTopicName(), str3, str, num.intValue(), new String(byteBuffer.array()), AbstractMqttMessage.MessageStatus.PUBLISH.name(), System.currentTimeMillis(), j);
        }
        return disruptorPublish(new MQTTOutBoundMsgEvent(channel, mqttPublishMessage));
    }

    public void processPubRel(String str, int i) {
        Channel channel = this.mapChannels.get(str);
        String str2 = (String) channel.attr(RainbootsContext.TOPIC_KEY).get();
        logger.info("PUBREL => topic :{}, clientId:{}, messageId: {}", new Object[]{str2, str, Integer.valueOf(i)});
        if (this.dataStore.setMessageStatus(str2, str, i, AbstractMqttMessage.MessageStatus.PUB_COMP.name())) {
            channel.writeAndFlush(new MqttPubCompMessage(i));
        } else {
            logger.info("PUBREL Expired => topic :{}, clientId:{}, messageId: {}", new Object[]{str2, str, Integer.valueOf(i)});
            this.dataStore.addFeedback(str2, str, i, FeedbackProtos.FeedbackResponse.MessageResultType.TIMEOUT_VALUE);
        }
    }

    public void processPubRec(String str, int i) {
        Channel channel = this.mapChannels.get(str);
        String str2 = (String) channel.attr(RainbootsContext.TOPIC_KEY).get();
        logger.info("PUBREC => topic : {}, client Id : {}, msgId:{}", new Object[]{str2, str, Integer.valueOf(i)});
        if (this.dataStore.setMessageStatus(str2, str, i, AbstractMqttMessage.MessageStatus.PUB_REL.name())) {
            channel.writeAndFlush(new MqttPubRelMessage(i));
        } else {
            logger.error("PUBREC Expired => topic : {}, client Id : {}, msgId:{}", new Object[]{str2, str, Integer.valueOf(i)});
            this.dataStore.addFeedback(str2, str, i, FeedbackProtos.FeedbackResponse.MessageResultType.TIMEOUT_VALUE);
        }
    }

    public void processPubComp(String str, int i) {
        if (!this.mapChannels.containsKey(str)) {
            logger.error("processPubComp clientId:{} channel is null", str);
            return;
        }
        String str2 = (String) this.mapChannels.get(str).attr(RainbootsContext.TOPIC_KEY).get();
        logger.info("PUBCOMP => topic : {}, client Id : {}, msgId:{}", new Object[]{str2, str, Integer.valueOf(i)});
        this.serverMetrics.increamentPubComp(str2);
        this.dataStore.addFeedback(str2, str, i, FeedbackProtos.FeedbackResponse.MessageResultType.DELIVERED_VALUE);
    }

    public void processPubAck(String str, int i) {
        String str2 = (String) this.mapChannels.get(str).attr(RainbootsContext.TOPIC_KEY).get();
        if (StringUtils.isEmpty(str2)) {
            logger.error("processPubAck topic is null clientId:{}", str);
            return;
        }
        logger.info("PUBACK => topic : {}, client Id : {}, msgId:{}", new Object[]{str2, str, Integer.valueOf(i)});
        this.serverMetrics.increamentPubComp(str2);
        this.dataStore.addFeedback(str2, str, i, FeedbackProtos.FeedbackResponse.MessageResultType.DELIVERED_VALUE);
    }

    public void processSubscribe(MqttSubscribeMessage mqttSubscribeMessage, String str, boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        MqttSubAckMessage mqttSubAckMessage = new MqttSubAckMessage();
        mqttSubAckMessage.setMessageId(mqttSubscribeMessage.getMessageId());
        mqttSubAckMessage.addQosLevel(AbstractMqttMessage.QOSLevel.MOST_ONCE);
        mqttSubAckMessage.addQosLevel(AbstractMqttMessage.QOSLevel.LEAST_ONCE);
        mqttSubAckMessage.addQosLevel(AbstractMqttMessage.QOSLevel.EXACTLY_ONCE);
        logger.debug("replying with SubAct to MSG ID {} qos size : {}", mqttSubAckMessage.getMessageId(), Integer.valueOf(mqttSubAckMessage.getQosLevels().size()));
        Channel channel = this.mapChannels.get(str);
        if (channel != null) {
            channel.writeAndFlush(mqttSubAckMessage);
        } else {
            logger.error("channel is null clinet id : {}", str);
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > 100) {
            logger.info("SUBSCRIBE => client Id : {}, elapsedTime :{}", str, Long.valueOf(currentTimeMillis2));
        } else {
            logger.debug("SUBSCRIBE => client Id : {}, elapsedTime :{}", str, Long.valueOf(currentTimeMillis2));
        }
    }

    public void processUnsubscribe(String str, List<String> list, int i) {
        logger.info("UNSUBSCRIBE clientId:{}, topics:{}, messageId:{}", new Object[]{str, list, Integer.valueOf(i)});
        Channel channel = this.mapChannels.get(str);
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.dataStore.removeSubscriber(it.next(), str);
        }
        MqttUnsubAckMessage mqttUnsubAckMessage = new MqttUnsubAckMessage();
        mqttUnsubAckMessage.setMessageId(Integer.valueOf(i));
        logger.debug("replying with UnsubAck to MSG ID {0}", Integer.valueOf(i));
        channel.writeAndFlush(mqttUnsubAckMessage);
    }

    private void addActiveChannel(Channel channel, String str, String str2) {
        Integer valueOf;
        synchronized (this.topicChannels) {
            Integer num = this.topicChannels.get(str);
            if (num == null) {
                num = 0;
            }
            ConcurrentHashMap<String, Integer> concurrentHashMap = this.topicChannels;
            valueOf = Integer.valueOf(num.intValue() + 1);
            concurrentHashMap.put(str, valueOf);
        }
        this.mapChannels.put(str2, channel);
        channel.closeFuture().addListener(this.channelRemover);
        this.dataStore.setActiveSubscriber(str, str2, true, this.serverId);
        logger.debug("addActiveChannel() topic :{}, client :{}, topic ch count :{}, mapChannelSize :{}", new Object[]{str, str2, valueOf, Integer.valueOf(this.mapChannels.size())});
    }

    public void setDenyClientIds(@Value("${deny.client.ids}") String str) {
        for (String str2 : str.split(",")) {
            denyClients.add(str2);
        }
    }
}
