package net.weta.components.communication.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.io.OptionalDataException;
import java.net.SocketException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import net.weta.components.communication.messaging.Message;
import net.weta.components.communication.messaging.MessageQueue;
import net.weta.components.communication.server.TooManyRunningThreads;
import net.weta.components.communication.stream.IInput;
import net.weta.components.communication.stream.MessageSizeTooBigException;
import net.weta.components.communication.tcp.server.IMessageSender;
import net.weta.components.communication.util.PooledThreadExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:ingrid-ibus-6.3.0/lib/ingrid-communication-6.0.0.jar:net/weta/components/communication/tcp/MessageReaderThread.class */
public class MessageReaderThread extends Thread {
    protected static final Logger LOG = LogManager.getLogger((Class<?>) MessageReaderThread.class);
    protected final MessageQueue _messageQueue;
    protected final String _peerName;
    protected final IMessageSender _messageSender;
    protected final int _maxThreadCount;
    private final IInput _in;
    protected int _threadCount = 0;
    private Map<String, Future<?>> _futures = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ingrid-ibus-6.3.0/lib/ingrid-communication-6.0.0.jar:net/weta/components/communication/tcp/MessageReaderThread$WaitForAnswerRunnable.class */
    public class WaitForAnswerRunnable implements Runnable {
        Message message;
        String tracker;

        public WaitForAnswerRunnable(Message message) {
            this.message = null;
            this.tracker = null;
            this.message = message;
            this.tracker = message.getId();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    if (MessageReaderThread.LOG.isDebugEnabled()) {
                        MessageReaderThread.LOG.debug("Send message [" + this.message.getId() + "] to messager queue. Thread [" + Thread.currentThread().getName() + "].");
                    }
                    Message messageEvent = MessageReaderThread.this._messageQueue.messageEvent(this.message);
                    if (messageEvent != null) {
                        messageEvent.setType("");
                        try {
                            if (MessageReaderThread.LOG.isDebugEnabled()) {
                                MessageReaderThread.LOG.debug("Got answer [" + messageEvent.getId() + "], type [" + messageEvent + "] for message [" + this.message.getId() + "]. Send message to peer [" + MessageReaderThread.this._peerName + "]");
                            }
                            MessageReaderThread.this._messageSender.sendMessage(MessageReaderThread.this._peerName, messageEvent);
                        } catch (IOException e) {
                            if (MessageReaderThread.LOG.isWarnEnabled()) {
                                MessageReaderThread.LOG.warn("can not send answer message to [" + MessageReaderThread.this._peerName + "]: " + e.getMessage());
                            }
                            if (MessageReaderThread.LOG.isDebugEnabled()) {
                                MessageReaderThread.LOG.debug("Stacktrace:", (Throwable) e);
                            }
                        }
                    }
                    Future future = (Future) MessageReaderThread.this._futures.remove(this.tracker);
                    if (MessageReaderThread.LOG.isDebugEnabled()) {
                        MessageReaderThread.LOG.debug("Remove future from future list by message [" + this.tracker + "]. Thread [" + Thread.currentThread().getName() + "].");
                    }
                    if (future != null && !future.isDone() && MessageReaderThread.LOG.isDebugEnabled()) {
                        MessageReaderThread.LOG.debug("Future [" + future.toString() + "] for message [" + this.tracker + "] is NOT done yet.");
                    }
                    this.message = null;
                    MessageReaderThread.this._threadCount--;
                } catch (Throwable th) {
                    if (MessageReaderThread.LOG.isInfoEnabled()) {
                        MessageReaderThread.LOG.info("Unexpected interruption of sending message [" + this.message + "] to peer [" + MessageReaderThread.this._peerName + "].", th);
                    }
                    Future future2 = (Future) MessageReaderThread.this._futures.remove(this.tracker);
                    if (MessageReaderThread.LOG.isDebugEnabled()) {
                        MessageReaderThread.LOG.debug("Remove future from future list by message [" + this.tracker + "]. Thread [" + Thread.currentThread().getName() + "].");
                    }
                    if (future2 != null && !future2.isDone() && MessageReaderThread.LOG.isDebugEnabled()) {
                        MessageReaderThread.LOG.debug("Future [" + future2.toString() + "] for message [" + this.tracker + "] is NOT done yet.");
                    }
                    this.message = null;
                    MessageReaderThread.this._threadCount--;
                }
            } catch (Throwable th2) {
                Future future3 = (Future) MessageReaderThread.this._futures.remove(this.tracker);
                if (MessageReaderThread.LOG.isDebugEnabled()) {
                    MessageReaderThread.LOG.debug("Remove future from future list by message [" + this.tracker + "]. Thread [" + Thread.currentThread().getName() + "].");
                }
                if (future3 != null && !future3.isDone() && MessageReaderThread.LOG.isDebugEnabled()) {
                    MessageReaderThread.LOG.debug("Future [" + future3.toString() + "] for message [" + this.tracker + "] is NOT done yet.");
                }
                this.message = null;
                MessageReaderThread.this._threadCount--;
                throw th2;
            }
        }
    }

    public MessageReaderThread(String str, IInput iInput, MessageQueue messageQueue, IMessageSender iMessageSender, int i) {
        this._peerName = str;
        this._in = iInput;
        this._messageQueue = messageQueue;
        this._messageSender = iMessageSender;
        this._maxThreadCount = i;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            long j = 0;
            long currentTimeMillis = System.currentTimeMillis();
            if (LOG.isInfoEnabled()) {
                LOG.info("start to read messages for peer: [" + this._peerName + "]");
            }
            while (!isInterrupted()) {
                Message message = (Message) this._in.readObject();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Read message [" + message.getId() + "] of type [" + message.getType() + "] and id [" + message + "] for client [" + this._peerName + "]");
                }
                if (LOG.isInfoEnabled()) {
                    j++;
                    if (j % 500 == 0) {
                        LOG.info("Number of messages for peer [" + this._peerName + "]: " + j + " (" + (((j * 1.0d) / (System.currentTimeMillis() - currentTimeMillis)) * 1000.0d * 60.0d) + " msg/min) since " + new Date(currentTimeMillis) + ". Number of running WaitForAnswerRunnable tasks: " + this._futures.size());
                    }
                }
                waitForAnswer(message);
            }
        } catch (EOFException e) {
            if (LOG.isInfoEnabled()) {
                LOG.info("connection shutdown by peer (EOFException, " + e.getMessage() + "): " + this._peerName);
            }
            if (this._messageSender != null) {
                this._messageSender.connect(this._peerName);
            }
        } catch (OptionalDataException e2) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Optional Data Exception: " + this._peerName, (Throwable) e2);
                LOG.error("There is no more data in the buffered part of the stream: " + e2.eof);
                LOG.error("The number of bytes of primitive data available to be read in the current buffer: " + e2.length);
            }
            if (this._messageSender != null) {
                this._messageSender.disconnect(this._peerName);
                this._messageSender.connect(this._peerName);
            }
        } catch (SocketException e3) {
            if (LOG.isInfoEnabled()) {
                LOG.info("connection shutdown by peer (SocketException, " + e3.getMessage() + "): " + this._peerName);
            }
            if (this._messageSender != null) {
                if (LOG.isInfoEnabled()) {
                    LOG.info("Try to reconnect to peer: " + this._peerName);
                }
                this._messageSender.connect(this._peerName);
            }
        } catch (MessageSizeTooBigException e4) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("MessageSizeTooBigException for peer: " + this._peerName, (Throwable) e4);
            }
            if (this._messageSender != null) {
                this._messageSender.disconnect(this._peerName);
                this._messageSender.connect(this._peerName);
            }
        } catch (Exception e5) {
            if (LOG.isErrorEnabled()) {
                LOG.error("error while consuming messages for peer: " + this._peerName, (Throwable) e5);
            }
            if (this._messageSender != null) {
                this._messageSender.disconnect(this._peerName);
                this._messageSender.connect(this._peerName);
            }
        }
    }

    private void waitForAnswer(Message message) throws IOException {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Thread with name " + getName() + " is waiting for answer of message [" + message.getId() + "] for client [" + this._peerName + "]");
        }
        if (this._threadCount >= this._maxThreadCount) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("message not handled because, max thread count reached: " + this._maxThreadCount);
            }
            throw new TooManyRunningThreads("No more threads available.");
        }
        WaitForAnswerRunnable waitForAnswerRunnable = new WaitForAnswerRunnable(message);
        Future<?> submit = PooledThreadExecutor.submit(waitForAnswerRunnable);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Executed WaitForAnswerRunnable [" + waitForAnswerRunnable.toString() + "] in future [" + submit.toString() + "] in thread [" + getName() + "] for message [" + message.getId() + "] for client [" + this._peerName + "]: " + waitForAnswerRunnable.toString());
        }
        if (!submit.isDone()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Future [" + submit.toString() + "] is not done. Place in future list for message [" + message.getId() + "].");
            }
            this._futures.put(message.getId(), submit);
            if (submit.isDone()) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Future [" + submit.toString() + "] already done after adding to future list. Remove from list for message [" + message.getId() + "].");
                }
                this._futures.remove(message.getId());
            }
        }
        this._threadCount++;
        if (LOG.isDebugEnabled()) {
            LOG.debug("current 'waitForMessage' thread count: [" + this._threadCount + "]");
        }
    }

    @Override // java.lang.Thread
    public synchronized void interrupt() {
        if (LOG.isInfoEnabled()) {
            LOG.info("Shutdown MessageReaderThread: " + this._peerName);
            LOG.info("Try cancel running tasks. Number of registered tasks: " + this._futures.size());
        }
        if (this._futures != null && !this._futures.isEmpty()) {
            for (String str : this._futures.keySet()) {
                Future<?> future = this._futures.get(str);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Cancel future [" + future.toString() + "] for message [" + str + "].");
                }
                future.cancel(true);
            }
            this._futures.clear();
        }
        PooledThreadExecutor.purge();
        System.gc();
        super.interrupt();
    }
}
