/*
 * Decompiled with CFR 0.152.
 */
package com.humuson.tms.trans.schedule.realtime;

import com.humuson.tms.adaptor.activemq.ActiveMQSender;
import com.humuson.tms.config.TmsCommonConfig;
import com.humuson.tms.config.lock.DistributeLock;
import com.humuson.tms.trans.module.TmsBatchTargetInfoFactory;
import com.humuson.tms.trans.module.batch.BatchDbTargetInfo;
import com.humuson.tms.trans.module.batch.TmsBatchTargetInfo;
import com.humuson.tms.trans.module.batch.confing.BatchTaskExecutorMonitor;
import com.humuson.tms.trans.repository.dao.RealtimeDao;
import com.humuson.tms.trans.repository.dao.SendInfoDao;
import com.humuson.tms.trans.repository.model.TmsSchdInfo;
import com.humuson.tms.trans.repository.model.TmsSendInfo;
import com.humuson.tms.util.Pair;
import com.humuson.tms.util.Pair3;
import com.humuson.tms.util.date.DateUtil;
import com.humuson.tms.util.json.JsonConvertUtil;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.beanutils.BeanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.ObjectUtils;

public class RealtimeOnlyPushTargetScheduler {
    private static final Logger log = LoggerFactory.getLogger(RealtimeOnlyPushTargetScheduler.class);
    public static final String REALTIME_LOCK_PU = "REALTIME_LOCK_PU";
    public static final String REALTIME_C5 = "C5";
    private static final int MULTI_CREATE_BATCH_COUNT = 10;
    @Autowired
    TmsCommonConfig tmsConfig;
    @Autowired
    RealtimeDao realtimeDao;
    @Autowired
    SendInfoDao sendInfoDao;
    @Autowired
    TmsBatchTargetInfoFactory batchTargetInfoFactory;
    @Autowired
    BatchTaskExecutorMonitor batchTaskExecutorMonitor;
    @Autowired
    @Qualifier(value="activemq-sender")
    ActiveMQSender mqSender;
    @Value(value="${spring.activemq.queue.consumer}")
    private String CONSUMER_NAME;
    private static AtomicInteger targetPossibleCount = new AtomicInteger();
    Function<Pair<TmsSendInfo, TmsSchdInfo>, Pair3<TmsSendInfo, TmsSchdInfo, TmsBatchTargetInfo>> createBatchTargetFunction = p -> {
        try {
            return new Pair3(p.getFirst(), p.getSecond(), (Object)this.batchTargetInfoFactory.createBatchTargetInfo((TmsSendInfo)p.getFirst()));
        }
        catch (Exception e) {
            log.error("creating TmsBatchTargetInfo error. so return null, tmsSendInfo[{}]", p.getFirst(), (Object)e);
            return new Pair3(p.getFirst(), p.getSecond(), null);
        }
    };

    @Scheduled(fixedDelay=3000L)
    @DistributeLock(value="REALTIME_LOCK_PU")
    public void realtimeOnlyPushTargeting() {
        List<TmsSendInfo> tmsSendInfos = this.realtimeDao.selectRealtimePushSendInfo();
        targetPossibleCount.set(this.freePoolSize());
        HashMap<TmsSendInfo, TmsSchdInfo> tt = new HashMap<TmsSendInfo, TmsSchdInfo>();
        for (TmsSendInfo tmsSendInfo : tmsSendInfos) {
            if (targetPossibleCount.get() <= 0) {
                log.info("this DS engine[{}] Realtime active pool over. skip send info[{}] ", (Object)tmsSendInfo);
                continue;
            }
            TmsSchdInfo tmsSchdInfo = this.realtimeDao.selectMaxTmsSchdInfoOnlyPush(tmsSendInfo.getSEND_ID(), DateUtil.getWorkday(), "15");
            try {
                if (this.isNotExistSchdInfo(tmsSchdInfo)) continue;
                if (log.isDebugEnabled()) {
                    log.debug("realtime push data move catch. send_id[{}], schd_id[{}] job_status[15]", (Object)tmsSendInfo.getSEND_ID(), (Object)tmsSchdInfo.getSCHD_ID());
                }
                if (this.isMultiChannel(tmsSendInfo.getSEND_ID())) {
                    this.realtimeDao.updateSchdInfoJobStatus(tmsSchdInfo.getSCHD_ID(), "10");
                    continue;
                }
                tt.put(tmsSendInfo, tmsSchdInfo);
                if (tt.size() < 10) continue;
                this.multiTargetProcess(tt);
            }
            catch (Exception e) {
                log.error("sendInfo.send_id[{}] push move targeting in 'C5' error send_info[{}]", new Object[]{tmsSendInfo.getSEND_ID(), tmsSendInfo, e});
                this.updateSendInfoJobStatus(tmsSendInfo.getSEND_ID(), "31");
                tt.remove(tmsSendInfo);
            }
        }
        if (!ObjectUtils.isEmpty(tt)) {
            this.multiTargetProcess(tt);
        }
    }

    private int freePoolSize() {
        return this.batchTaskExecutorMonitor.freePoolsize(REALTIME_C5, 20);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void multiTargetProcess(Map<TmsSendInfo, TmsSchdInfo> targetInfo) {
        int WORKERS = targetInfo.size();
        ExecutorService executorService = Executors.newFixedThreadPool(WORKERS);
        ExecutorCompletionService<Pair3<TmsSendInfo, TmsSchdInfo, TmsBatchTargetInfo>> concurrentTaskService = new ExecutorCompletionService<Pair3<TmsSendInfo, TmsSchdInfo, TmsBatchTargetInfo>>(executorService);
        TmsSendInfo tmsSendInfoResult = null;
        TmsSchdInfo tmsSchdInfoResult = null;
        TmsBatchTargetInfo tmsBatchTargetInfo = null;
        try {
            Iterator<TmsSendInfo> tmsSendInfoIter = targetInfo.keySet().iterator();
            try {
                while (tmsSendInfoIter.hasNext()) {
                    TmsSendInfo tmsSendInfoOrg = tmsSendInfoIter.next();
                    final TmsSendInfo tmsSendInfo = new TmsSendInfo();
                    final TmsSchdInfo tmsSchdInfo = new TmsSchdInfo();
                    BeanUtils.copyProperties((Object)tmsSendInfo, (Object)tmsSendInfoOrg);
                    BeanUtils.copyProperties((Object)tmsSchdInfo, (Object)targetInfo.get(tmsSendInfoOrg));
                    concurrentTaskService.submit(new Callable<Pair3<TmsSendInfo, TmsSchdInfo, TmsBatchTargetInfo>>(){

                        @Override
                        public Pair3<TmsSendInfo, TmsSchdInfo, TmsBatchTargetInfo> call() throws Exception {
                            return RealtimeOnlyPushTargetScheduler.this.createBatchTargetFunction.apply((Pair<TmsSendInfo, TmsSchdInfo>)new Pair((Object)tmsSendInfo, (Object)tmsSchdInfo));
                        }
                    });
                }
            }
            finally {
                targetInfo.clear();
            }
            for (int j = 0; j < WORKERS; ++j) {
                block18: {
                    try {
                        Future result = concurrentTaskService.take();
                        Pair3 pair3 = (Pair3)result.get();
                        tmsSendInfoResult = (TmsSendInfo)pair3.getFirst();
                        tmsSchdInfoResult = (TmsSchdInfo)pair3.getSecond();
                        tmsBatchTargetInfo = (TmsBatchTargetInfo)pair3.getThird();
                        if (ObjectUtils.isEmpty((Object)tmsBatchTargetInfo)) {
                            log.error("realtime[C5-PUSH] failed create target batch info. skip.. schd_info[{}] ", (Object)tmsSchdInfoResult);
                        }
                        break block18;
                    }
                    catch (Exception e) {
                        log.error("error get target count worker job in realtime. schdInfo[{}] so skip", (Throwable)e);
                    }
                    continue;
                }
                BatchDbTargetInfo dbTargetInfo = (BatchDbTargetInfo)tmsBatchTargetInfo.getREAD_TARGET_INFO();
                if (ObjectUtils.isEmpty(dbTargetInfo.getUPDATE_QUERY())) {
                    log.error("not existed update query send_info[id={}] for realtime push move targeting in 'C5'. so updated job_status=31 and skip.. send_info[{}]", (Object)tmsSendInfoResult.getSEND_ID(), (Object)tmsSendInfoResult);
                    this.updateSendInfoJobStatus(tmsSendInfoResult.getSEND_ID(), "31");
                    continue;
                }
                if (dbTargetInfo.getTARGET_COUNT() == 0L) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("real time push channel[C5-PU send_id={}] target count=0. so this target Ignored", (Object)tmsSendInfoResult.getSEND_ID());
                    continue;
                }
                targetPossibleCount.decrementAndGet();
                this.updateJobstatusSchd(tmsSchdInfoResult, dbTargetInfo.getTARGET_COUNT());
                try {
                    this.mqSender.send(this.CONSUMER_NAME, this.message(tmsSendInfoResult));
                    if (!log.isInfoEnabled()) continue;
                    log.info("realtime push target[{}] sent mq[{}] send_id[{}]/schd_id[{}]", new Object[]{dbTargetInfo.getTARGET_COUNT(), this.CONSUMER_NAME, tmsSendInfoResult.getSEND_ID(), tmsSchdInfoResult.getSCHD_ID()});
                    continue;
                }
                catch (Exception e) {
                    log.error("realtime[C5] push batch executor error. skip.. target batch info[{}]", (Object)tmsBatchTargetInfo, (Object)e);
                }
            }
        }
        catch (Exception e) {
            log.error("realtime[C5] push batch process error. schd_info[{}] ", tmsSchdInfoResult, (Object)e);
        }
        finally {
            executorService.shutdown();
        }
    }

    private String message(TmsSendInfo tmsSendInfoResult) throws Exception {
        return JsonConvertUtil.beanToJsonString((Object)tmsSendInfoResult);
    }

    private void updateJobstatusSchd(TmsSchdInfo tmsSchdInfo, long targetCount) {
        log.info("real time push[send_id={}, schd_id={}] data move start. target count[{}] update schd job_status[20] ", new Object[]{tmsSchdInfo.getSEND_ID(), tmsSchdInfo.getSCHD_ID(), targetCount});
        this.realtimeDao.updateSchdInfoJobStatus(tmsSchdInfo.getSCHD_ID(), "25");
    }

    private TmsSchdInfo selectSchdInfo(int sendId) {
        return this.realtimeDao.selectMaxTmsSchdInfoOnlyPush(sendId, DateUtil.getWorkday(), "15");
    }

    private boolean isNotExistSchdInfo(TmsSchdInfo tmsSchdInfo) {
        return ObjectUtils.isEmpty((Object)tmsSchdInfo);
    }

    private void updateSendInfoJobStatus(long sendId, String jobStatus) {
        this.realtimeDao.updateSendInfoJobStatus(sendId, jobStatus);
    }

    protected boolean isMultiChannel(long sendId) {
        return "T".equalsIgnoreCase(this.sendInfoDao.selectTmsCampInfo(sendId).getCAMP_TYPE());
    }
}

