package com.tudou.utils.intercom;

import com.tudou.utils.asynlog.AsynWriterService;
import com.tudou.utils.client.HTTPLongClient;
import com.tudou.utils.intercom.InnerMsg;
import com.tudou.utils.thread.MyThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.rubyeye.xmemcached.MemcachedClient;
import org.apache.log4j.Logger;

/* loaded from: classes.dex */
public abstract class Intercom2<T extends InnerMsg> {
    private static final int DEFAULT_FAIL_DEAL_THREAD_SIZE = 1;
    private static final int DEFAULT_HTTP_CONNECTTIMEOUT = 2000;
    private static final int DEFAULT_HTTP_SOTIMEOUT = 1000;
    private static final int DEFAULT_THREAD_SIZE = 2;
    private static final Logger log = Logger.getLogger(Intercom.class);
    private String addr;
    private String appName;
    private AsynWriterService<T> asynWriterService;
    private ExecutorService es;
    private HTTPLongClient httpLongClient;
    private ArrayBlockingQueue<T> receiveMsgFailQueue;
    private ArrayBlockingQueue<T> receiveMsgQueue;
    private ArrayBlockingQueue<T> sendMsgFailQueue;
    private ArrayBlockingQueue<T> sendMsgQueue;

    /* loaded from: classes.dex */
    class DealFailReceiveMsgThread implements Runnable {
        DealFailReceiveMsgThread() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                InnerMsg innerMsg = null;
                try {
                    innerMsg = (InnerMsg) Intercom2.this.receiveMsgFailQueue.poll(1L, TimeUnit.HOURS);
                } catch (InterruptedException e) {
                    Intercom2.log.error(Thread.currentThread().getName() + " DealFailReceiveMsgThread is interrupted", e);
                }
                if (innerMsg != null) {
                    try {
                        Intercom2.this.receive(innerMsg);
                    } catch (Exception e2) {
                        Intercom2.this.addFailReceiveMsg(innerMsg);
                        Intercom2.log.error(" DealFailReceiveMsgThread error:receiveMsg=" + innerMsg.toJson(), e2);
                        try {
                            Thread.currentThread();
                            Thread.sleep(MemcachedClient.DEFAULT_HEAL_SESSION_INTERVAL);
                        } catch (InterruptedException e3) {
                            Intercom2.log.error(Thread.currentThread().getName() + " DealFailReceiveMsgThread is interrupted", e3);
                        }
                    }
                }
            }
        }
    }

    /* loaded from: classes.dex */
    class DealFailSendMsgThread implements Runnable {
        DealFailSendMsgThread() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            InnerMsg innerMsg;
            while (true) {
                try {
                    innerMsg = (InnerMsg) Intercom2.this.sendMsgFailQueue.poll(1L, TimeUnit.HOURS);
                } catch (InterruptedException e) {
                    Intercom2.log.error(Thread.currentThread().getName() + " DealFailSendMsgThread is interrupted", e);
                    innerMsg = null;
                }
                if (innerMsg != null) {
                    Intercom2.this.send(innerMsg);
                }
            }
        }
    }

    /* loaded from: classes.dex */
    class ReceiveMsgThread implements Runnable {
        ReceiveMsgThread() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                InnerMsg innerMsg = null;
                try {
                    innerMsg = (InnerMsg) Intercom2.this.receiveMsgQueue.poll(1L, TimeUnit.HOURS);
                } catch (InterruptedException e) {
                    Intercom2.log.error(Thread.currentThread().getName() + " ReceiveMsgThread is interrupted", e);
                }
                if (innerMsg != null) {
                    try {
                        Intercom2.this.receive(innerMsg);
                    } catch (Exception e2) {
                        Intercom2.this.addFailReceiveMsg(innerMsg);
                        Intercom2.log.error(" deal receive msg error:receiveMsg=" + innerMsg.toJson(), e2);
                    }
                }
            }
        }
    }

    /* loaded from: classes.dex */
    class SendMsgThread implements Runnable {
        SendMsgThread() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            InnerMsg innerMsg;
            while (true) {
                try {
                    innerMsg = (InnerMsg) Intercom2.this.sendMsgQueue.poll(1L, TimeUnit.HOURS);
                } catch (InterruptedException e) {
                    Intercom2.log.error(Thread.currentThread().getName() + " SendMsgThread is interrupted", e);
                    innerMsg = null;
                }
                if (innerMsg != null) {
                    Intercom2.this.send(innerMsg);
                }
            }
        }
    }

    public Intercom2(int i, int i2, int i3, String str, String str2, int i4, int i5, int i6) {
        this.es = null;
        this.sendMsgQueue = null;
        this.receiveMsgQueue = null;
        this.receiveMsgFailQueue = null;
        this.sendMsgFailQueue = null;
        this.asynWriterService = null;
        this.addr = str;
        this.appName = str2;
        this.sendMsgQueue = new ArrayBlockingQueue<>(i, true);
        this.receiveMsgQueue = new ArrayBlockingQueue<>(i, true);
        this.sendMsgFailQueue = new ArrayBlockingQueue<>(10000, true);
        this.receiveMsgFailQueue = new ArrayBlockingQueue<>(10000, true);
        this.httpLongClient = HTTPLongClient.newInstance(i2, i3);
        this.asynWriterService = new AsynWriterService<>();
        if (i4 <= 0 || i5 <= 0) {
            throw new RuntimeException("sendThreadNum and receiveThreadNum must be more than 0.");
        }
        this.es = Executors.newFixedThreadPool(i4 + i5 + (i6 * 2), new MyThreadFactory("InnerCommunication"));
        for (int i7 = 0; i7 < i6; i7++) {
            this.es.submit(new DealFailSendMsgThread());
            this.es.submit(new DealFailReceiveMsgThread());
        }
        for (int i8 = 0; i8 < i4; i8++) {
            this.es.submit(new SendMsgThread());
        }
        for (int i9 = 0; i9 < i5; i9++) {
            this.es.submit(new ReceiveMsgThread());
        }
    }

    public Intercom2(int i, String str, String str2) {
        this(i, 2000, 1000, str, str2, 2, 2, 1);
    }

    public void addFailReceiveMsg(T t) {
        if (t != null) {
            this.receiveMsgFailQueue.offer(t);
            if (log.isInfoEnabled()) {
                log.info("addFailReceiveMsg size=" + this.receiveMsgFailQueue.size());
            }
        }
    }

    public void addFailSendMsg(T t) {
        if (t != null) {
            this.asynWriterService.write((AsynWriterService<T>) t);
            this.sendMsgFailQueue.offer(t);
            if (log.isInfoEnabled()) {
                log.info("addFailSendMsg size=" + this.sendMsgFailQueue.size());
            }
        }
    }

    public void addReceiveMsg(T t) {
        if (t != null) {
            this.receiveMsgQueue.offer(t);
            if (log.isInfoEnabled()) {
                log.info("receiveMsgQueue size=" + this.receiveMsgQueue.size());
            }
        }
    }

    public void addSendMsg(T t) {
        if (t != null) {
            this.asynWriterService.write((AsynWriterService<T>) t);
            this.sendMsgQueue.offer(t);
            if (log.isInfoEnabled()) {
                log.info("sendMsgQueue size=" + this.sendMsgQueue.size());
            }
        }
    }

    public int getReceiveFailMsgQueueSize() {
        return this.receiveMsgFailQueue.size();
    }

    public int getReceiveMsgQueueSize() {
        return this.receiveMsgQueue.size();
    }

    public int getSendMsgFailQueueSize() {
        return this.sendMsgFailQueue.size();
    }

    public int getSendMsgQueueSize() {
        return this.sendMsgQueue.size();
    }

    public abstract void receive(T t);

    public void send(T t) {
        String json = t.toJson();
        if (log.isDebugEnabled()) {
            log.debug("InnerCommunication send  msg. msg=" + json);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("app", this.appName);
        hashMap.put("msg", json);
        try {
            this.httpLongClient.geturlcontent(this.addr, false, (Map<String, Object>) hashMap, "utf8");
        } catch (Exception e) {
            addFailSendMsg(t);
            log.error("SendMsgThread error, msg=" + json, e);
            try {
                Thread.currentThread();
                Thread.sleep(MemcachedClient.DEFAULT_HEAL_SESSION_INTERVAL);
            } catch (InterruptedException e2) {
                log.error(Thread.currentThread().getName() + " SendMsgThread is interrupted", e2);
            }
        }
    }
}
