package com.humuson.rainboots.server.service.impl;

import com.humuson.rainboots.events.AbstractMsgEvent;
import com.humuson.rainboots.events.InitEvent;
import com.humuson.rainboots.events.MQTTInBoundMsgEvent;
import com.humuson.rainboots.events.mqtt.PublishEvent;
import com.humuson.rainboots.messaging.events.RainbootsEvent;
import com.humuson.rainboots.proto.messages.AbstractMqttMessage;
import com.humuson.rainboots.proto.messages.MqttConnectMessage;
import com.humuson.rainboots.proto.messages.MqttDisconnectMessage;
import com.humuson.rainboots.proto.messages.MqttPubAckMessage;
import com.humuson.rainboots.proto.messages.MqttPubCompMessage;
import com.humuson.rainboots.proto.messages.MqttPubRecMessage;
import com.humuson.rainboots.proto.messages.MqttPubRelMessage;
import com.humuson.rainboots.proto.messages.MqttPublishMessage;
import com.humuson.rainboots.proto.messages.MqttSubscribeMessage;
import com.humuson.rainboots.proto.messages.MqttUnsubscribeMessage;
import com.humuson.rainboots.proto.messages.PushProtos;
import com.humuson.rainboots.server.service.MqttProtocolProcessor;
import com.humuson.rainboots.server.service.RainbootsChannelService;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
@Qualifier("rainbootsService")
/* loaded from: input_file:com/humuson/rainboots/server/service/impl/RainbootsChannelServiceImpl.class */
public class RainbootsChannelServiceImpl implements RainbootsChannelService, EventHandler<RainbootsEvent> {
    static final Logger logger = LoggerFactory.getLogger(RainbootsChannelServiceImpl.class);

    @Value("${in.work.buffer.size}")
    private int ringbufferSize;
    private ExecutorService mqttProtocolExecutor;
    private RingBuffer<RainbootsEvent> rainbootsRingBuffer;
    private BatchEventProcessor<RainbootsEvent> eventProcessor;

    @Autowired
    private MqttProtocolProcessor mqttProcessor;

    @PostConstruct
    public void init() {
        logger.info("ringbufferSize :{}", Integer.valueOf(this.ringbufferSize));
        this.mqttProtocolExecutor = Executors.newFixedThreadPool(1);
        this.rainbootsRingBuffer = RingBuffer.createMultiProducer(RainbootsEvent.EVENT_FACTORY, this.ringbufferSize);
        this.eventProcessor = new BatchEventProcessor<>(this.rainbootsRingBuffer, this.rainbootsRingBuffer.newBarrier(new Sequence[0]), this);
        this.rainbootsRingBuffer.addGatingSequences(new Sequence[]{this.eventProcessor.getSequence()});
        this.mqttProtocolExecutor.submit((Runnable) this.eventProcessor);
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public int getConnectUserCount() {
        return this.mqttProcessor.getConnectUserCount();
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public int getActiveUserCount(String str) {
        return this.mqttProcessor.getActiveUserCount(str);
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public boolean isActiveChannel(String str) {
        return this.mqttProcessor.isActiveChannel(str);
    }

    private void disruptorPublish(AbstractMsgEvent abstractMsgEvent) {
        try {
            long tryNext = this.rainbootsRingBuffer.tryNext();
            ((RainbootsEvent) this.rainbootsRingBuffer.get(tryNext)).setEvent(abstractMsgEvent);
            this.rainbootsRingBuffer.publish(tryNext);
        } catch (Exception e) {
            logger.error("Exception in MqttServiceImpl.disruptorPublish : {}", e);
        }
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public void lostConnection(String str, ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.close();
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public void handelProtocolMessage(String str, AbstractMqttMessage abstractMqttMessage) {
        if (abstractMqttMessage instanceof MqttPubRecMessage) {
            this.mqttProcessor.processPubRec(str, ((MqttPubRecMessage) abstractMqttMessage).getMessageId().intValue());
        } else if (!(abstractMqttMessage instanceof MqttPubCompMessage)) {
            disruptorPublish(new MQTTInBoundMsgEvent(str, abstractMqttMessage));
        } else {
            this.mqttProcessor.processPubComp(str, ((MqttPubCompMessage) abstractMqttMessage).getMessageId().intValue());
        }
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public boolean publish(PushProtos.PushRequest.PushType pushType, String str, String str2, int i, String str3, String str4, int i2, long j, boolean z) {
        logger.info("publish {}, expiredTime : {}, requestId : {}", new Object[]{str3, Long.valueOf(j), str});
        if (str3 == null) {
            return false;
        }
        try {
            ByteBuffer put = ByteBuffer.allocate(str4.getBytes("UTF-8").length).put(str4.getBytes("UTF-8"));
            put.rewind();
            return this.mqttProcessor.publishNotify(pushType, str, str2, AbstractMqttMessage.QOSLevel.valuesCustom()[i], put, false, Integer.valueOf(i2), str3, j, z);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
            logger.error("message Encoding error!!!", e);
            return false;
        }
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public void processStop() {
        if (this.mqttProcessor != null) {
            this.mqttProcessor.processStop();
        }
        if (this.mqttProcessor != null) {
            this.mqttProtocolExecutor.shutdownNow();
        }
        logger.info("processStop complete");
    }

    public void onEvent(RainbootsEvent rainbootsEvent, long j, boolean z) throws Exception {
        try {
            AbstractMsgEvent event = rainbootsEvent.getEvent();
            if (event instanceof PublishEvent) {
                PublishEvent publishEvent = (PublishEvent) event;
                this.mqttProcessor.publishNotify(publishEvent.getPublishType(), publishEvent.getRequestId(), publishEvent.getTopic(), publishEvent.getQosLevel(), publishEvent.getMessage(), publishEvent.isRetainFlag(), Integer.valueOf(publishEvent.getMessageId()), publishEvent.getClientId(), publishEvent.getExpiredTime(), false);
                return;
            }
            if (!(event instanceof MQTTInBoundMsgEvent)) {
                if (event instanceof InitEvent) {
                    logger.trace("MQTT InitEvent ");
                    return;
                }
                return;
            }
            AbstractMqttMessage message = ((MQTTInBoundMsgEvent) event).getMessage();
            if (message instanceof MqttConnectMessage) {
                logger.info("MQTT ConnectMessage onEvent clientId: {} ip : {}", ((MqttConnectMessage) message).getClientId(), ((MqttConnectMessage) message).getChannelHandlerContext().channel().remoteAddress().toString());
                this.mqttProcessor.processConnect((MqttConnectMessage) message);
                return;
            }
            if (message instanceof MqttPublishMessage) {
                String clientId = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.info("MqttPublishMessage onEvent clientId: {} msgId:{}", clientId, ((MqttPublishMessage) message).getMessageId());
                this.mqttProcessor.processPublish(new PublishEvent((MqttPublishMessage) message, clientId));
                return;
            }
            if (message instanceof MqttDisconnectMessage) {
                message.getChannelHandlerContext().close();
                return;
            }
            if (message instanceof MqttUnsubscribeMessage) {
                String clientId2 = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.debug("MQTT Unsubscribe onEvent clientId: {}", clientId2);
                MqttUnsubscribeMessage mqttUnsubscribeMessage = (MqttUnsubscribeMessage) message;
                this.mqttProcessor.processUnsubscribe(clientId2, mqttUnsubscribeMessage.getTopics(), mqttUnsubscribeMessage.getMessageId().intValue());
                return;
            }
            if (message instanceof MqttSubscribeMessage) {
                String clientId3 = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.info("MQTT Subscribe onEvent clientId: {}", clientId3);
                this.mqttProcessor.processSubscribe((MqttSubscribeMessage) message, clientId3, true);
                return;
            }
            if (message instanceof MqttPubRelMessage) {
                String clientId4 = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.debug("MQTT PubRel onEvent clientId: {}", clientId4);
                this.mqttProcessor.processPubRel(clientId4, ((MqttPubRelMessage) message).getMessageId().intValue());
                return;
            }
            if (message instanceof MqttPubRecMessage) {
                String clientId5 = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.debug("MQTT PubRec onEvent clientId: {}", clientId5);
                this.mqttProcessor.processPubRec(clientId5, ((MqttPubRecMessage) message).getMessageId().intValue());
                return;
            }
            if (message instanceof MqttPubCompMessage) {
                String clientId6 = ((MQTTInBoundMsgEvent) event).getClientId();
                logger.debug("MQTT PubComp onEvent clientId: {}", clientId6);
                this.mqttProcessor.processPubComp(clientId6, ((MqttPubCompMessage) message).getMessageId().intValue());
                return;
            }
            if (!(message instanceof MqttPubAckMessage)) {
                logger.error("MQTT OnEvent Exeception: {}", message);
                throw new RuntimeException("Illegal message received " + message);
            }
            String clientId7 = ((MQTTInBoundMsgEvent) event).getClientId();
            logger.info("MQTT PubAck onEvent clientId: {}", clientId7);
            this.mqttProcessor.processPubAck(clientId7, ((MqttPubAckMessage) message).getMessageId().intValue());
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("OnEvent : {}", e);
        }
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public long getRingSize() {
        return this.rainbootsRingBuffer.getBufferSize() - this.rainbootsRingBuffer.remainingCapacity();
    }

    public void processSubscribe(MqttSubscribeMessage mqttSubscribeMessage, String str, boolean z) {
        this.mqttProcessor.processSubscribe(mqttSubscribeMessage, str, z);
    }

    @Override // com.humuson.rainboots.server.service.RainbootsChannelService
    public void processConnect(MqttConnectMessage mqttConnectMessage) {
        this.mqttProcessor.processConnect(mqttConnectMessage);
    }
}
