package blackboard.platform.messagequeue.impl.activemq;

import blackboard.data.user.User;
import blackboard.persist.Id;
import blackboard.platform.context.ContextManagerFactory;
import blackboard.platform.context.impl.ContextImpl;
import blackboard.platform.log.LogServiceFactory;
import blackboard.platform.messagequeue.MessageQueueException;
import blackboard.platform.messagequeue.MessageQueueHandler;
import blackboard.util.StringUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

/* loaded from: input_file:blackboard/platform/messagequeue/impl/activemq/ActiveMQMessageQueueConsumer.class */
public class ActiveMQMessageQueueConsumer {
    private static final int DEFAULT_RETRIES = 5;
    private final String _queueIdentifier;
    private final ActiveMQConnectionPool _connectionPool;
    private final MessageQueueHandler _handler;
    private final int _threadCount;
    private final boolean _exclusive;
    private final Map<String, String> _config;
    private final List<ConsumerThread> _consumers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:blackboard/platform/messagequeue/impl/activemq/ActiveMQMessageQueueConsumer$ConsumerThread.class */
    public static class ConsumerThread extends Thread implements Runnable {
        private final String _queueIdentifier;
        private final MessageQueueHandler _handler;
        private final ActiveMQConnectionPool _connectionPool;
        private boolean _alive;
        private int _numRetries;

        public ConsumerThread(String str, String str2, MessageQueueHandler messageQueueHandler, boolean z, ActiveMQConnectionPool activeMQConnectionPool, Map<String, String> map) {
            super(str);
            setDaemon(true);
            this._queueIdentifier = z ? str2 + "?consumer.exclusive=true" : str2;
            this._handler = messageQueueHandler;
            this._connectionPool = activeMQConnectionPool;
            this._alive = true;
            String str3 = map.get("retryCount");
            try {
                this._numRetries = StringUtil.isEmpty(str3) ? 5 : Integer.parseInt(str3);
            } catch (NumberFormatException e) {
                LogServiceFactory.getInstance().logError("retryCount isn't a number, using the default value", e);
                this._numRetries = 5;
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Connection connection = null;
            Session session = null;
            MessageConsumer messageConsumer = null;
            try {
                try {
                    connection = this._connectionPool.get();
                    session = connection.createSession(true, 0);
                    messageConsumer = session.createConsumer(session.createQueue(this._queueIdentifier));
                } catch (Exception e) {
                    LogServiceFactory.getInstance().logError(String.format("Error starting message handler for queue: %s", this._queueIdentifier), e);
                    this._alive = false;
                }
                while (this._alive) {
                    try {
                        MapMessage receive = messageConsumer.receive(500L);
                        if (receive != null) {
                            if (receive instanceof MapMessage) {
                                MapMessage mapMessage = receive;
                                try {
                                    try {
                                        setContext(mapMessage);
                                        this._handler.onMessage(new ActiveMQMessageQueueMessage(mapMessage));
                                        session.commit();
                                        purgeContext();
                                    } catch (Throwable th) {
                                        purgeContext();
                                        throw th;
                                    }
                                } catch (Exception e2) {
                                    int intProperty = mapMessage.getIntProperty("retryCount");
                                    if (intProperty < this._numRetries) {
                                        LogServiceFactory.getInstance().logDebug("Message processing failed, putting back on the end of the queue", e2);
                                        MapMessage createMapMessage = session.createMapMessage();
                                        ActiveMQMessageQueueMessage.prepareToRetryJMS(mapMessage, createMapMessage);
                                        MessageProducer createProducer = session.createProducer(session.createQueue(this._queueIdentifier));
                                        createProducer.send(createMapMessage);
                                        createProducer.close();
                                    } else {
                                        LogServiceFactory.getInstance().logError(String.format("Could not process message after trying %d times", Integer.valueOf(intProperty)), e2);
                                    }
                                    session.commit();
                                    purgeContext();
                                }
                            } else {
                                LogServiceFactory.getInstance().logError(String.format("Received an unexpected message from queue: %s", this._queueIdentifier));
                                session.commit();
                            }
                        }
                    } catch (JMSException e3) {
                        LogServiceFactory.getInstance().logError(String.format("Error receiving message from queue: %s", this._queueIdentifier), e3);
                        this._alive = false;
                    }
                }
                if (messageConsumer != null) {
                    try {
                        messageConsumer.close();
                    } catch (Exception e4) {
                        LogServiceFactory.getInstance().logError(String.format("Error stopping message queue handler for queue: %s", this._queueIdentifier), e4);
                        return;
                    }
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    this._connectionPool.release(connection);
                }
            } catch (Throwable th2) {
                if (messageConsumer != null) {
                    try {
                        messageConsumer.close();
                    } catch (Exception e5) {
                        LogServiceFactory.getInstance().logError(String.format("Error stopping message queue handler for queue: %s", this._queueIdentifier), e5);
                        throw th2;
                    }
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    this._connectionPool.release(connection);
                }
                throw th2;
            }
        }

        private void setContext(Message message) throws Exception {
            ContextImpl contextImpl = (ContextImpl) ContextManagerFactory.getInstance().getContext();
            String stringProperty = message.getStringProperty(ActiveMQMessageQueueMessage.BB_USER_PROPERTY);
            if (StringUtil.notEmpty(stringProperty)) {
                contextImpl.setUserId(Id.generateId(User.DATA_TYPE, stringProperty));
            }
        }

        private void purgeContext() {
            ContextManagerFactory.getInstance().purgeContext();
        }

        public void stopGracefully() {
            this._alive = false;
        }
    }

    public ActiveMQMessageQueueConsumer(String str, MessageQueueHandler messageQueueHandler, ActiveMQConnectionPool activeMQConnectionPool, int i, boolean z, Map<String, String> map) {
        this._queueIdentifier = str;
        this._connectionPool = activeMQConnectionPool;
        this._handler = messageQueueHandler;
        this._threadCount = i;
        this._exclusive = z;
        this._consumers = new ArrayList(i);
        this._config = map;
    }

    public void start() throws MessageQueueException {
        for (int i = 0; i < this._threadCount; i++) {
            try {
                ConsumerThread consumerThread = new ConsumerThread(String.format("MessageQueueHandler-%s-%d", this._queueIdentifier, Integer.valueOf(i)), this._queueIdentifier, this._handler, this._exclusive, this._connectionPool, this._config);
                this._consumers.add(consumerThread);
                consumerThread.start();
            } catch (Exception e) {
                stop();
                throw new MessageQueueException("Couldn't register MessageQueueHandler", e);
            }
        }
    }

    public void stop() {
        Iterator<ConsumerThread> it = this._consumers.iterator();
        while (it.hasNext()) {
            it.next().stopGracefully();
        }
        Iterator<ConsumerThread> it2 = this._consumers.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().join();
            } catch (InterruptedException e) {
                LogServiceFactory.getInstance().logDebug("Interrupted while trying to wait for MessageQueue consumer thread to finish");
            }
        }
    }
}
