package org.springframework.integration.aggregator;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.handler.MessageTriggerAction;
import org.springframework.integration.store.SimpleMessageGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.2.4.jar:org/springframework/integration/aggregator/BarrierMessageHandler.class */
public class BarrierMessageHandler extends AbstractReplyProducingMessageHandler implements MessageTriggerAction, DiscardingMessageHandler {
    private final Map<Object, SynchronousQueue<Message<?>>> suspensions;
    private final Map<Object, Thread> inProcess;
    private final long requestTimeout;
    private final long triggerTimeout;
    private final CorrelationStrategy correlationStrategy;
    private final MessageGroupProcessor messageGroupProcessor;
    private String discardChannelName;
    private MessageChannel discardChannel;

    public BarrierMessageHandler(long j) {
        this(j, j);
    }

    public BarrierMessageHandler(long j, MessageGroupProcessor messageGroupProcessor) {
        this(j, j, messageGroupProcessor);
    }

    public BarrierMessageHandler(long j, CorrelationStrategy correlationStrategy) {
        this(j, j, correlationStrategy);
    }

    public BarrierMessageHandler(long j, MessageGroupProcessor messageGroupProcessor, CorrelationStrategy correlationStrategy) {
        this(j, j, messageGroupProcessor, correlationStrategy);
    }

    public BarrierMessageHandler(long j, long j2) {
        this(j, j2, new DefaultAggregatingMessageGroupProcessor());
    }

    public BarrierMessageHandler(long j, long j2, MessageGroupProcessor messageGroupProcessor) {
        this(j, j2, messageGroupProcessor, null);
    }

    public BarrierMessageHandler(long j, long j2, CorrelationStrategy correlationStrategy) {
        this(j, j2, new DefaultAggregatingMessageGroupProcessor(), correlationStrategy);
    }

    public BarrierMessageHandler(long j, long j2, MessageGroupProcessor messageGroupProcessor, CorrelationStrategy correlationStrategy) {
        this.suspensions = new ConcurrentHashMap();
        this.inProcess = new ConcurrentHashMap();
        Assert.notNull(messageGroupProcessor, "'messageGroupProcessor' cannot be null");
        this.messageGroupProcessor = messageGroupProcessor;
        this.correlationStrategy = correlationStrategy == null ? new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
        this.requestTimeout = j;
        this.triggerTimeout = j2;
    }

    public void setDiscardChannelName(String str) {
        this.discardChannelName = str;
    }

    public void setDiscardChannel(MessageChannel messageChannel) {
        this.discardChannel = messageChannel;
    }

    @Override // org.springframework.integration.handler.DiscardingMessageHandler
    public MessageChannel getDiscardChannel() {
        String str = this.discardChannelName;
        if (str != null) {
            this.discardChannel = getChannelResolver().resolveDestination(str);
            this.discardChannelName = null;
        }
        return this.discardChannel;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "barrier";
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler, org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.IntegrationPattern
    public IntegrationPatternType getIntegrationPatternType() {
        return IntegrationPatternType.barrier;
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
        if (correlationKey == null) {
            throw new MessagingException(message, "Correlation Strategy returned null");
        }
        Thread putIfAbsent = this.inProcess.putIfAbsent(correlationKey, Thread.currentThread());
        if (putIfAbsent != null) {
            throw new MessagingException(message, "Correlation key (" + correlationKey + ") is already in use by " + putIfAbsent.getName());
        }
        try {
            try {
                Message<?> poll = createOrObtainQueue(correlationKey).poll(this.requestTimeout, TimeUnit.MILLISECONDS);
                if (poll == null) {
                    this.inProcess.remove(correlationKey);
                    this.suspensions.remove(correlationKey);
                    return null;
                }
                Object processRelease = processRelease(correlationKey, message, poll);
                this.inProcess.remove(correlationKey);
                this.suspensions.remove(correlationKey);
                return processRelease;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new MessageHandlingException(message, "Interrupted while waiting for release in the [" + this + "]", e);
            }
        } catch (Throwable th) {
            this.inProcess.remove(correlationKey);
            this.suspensions.remove(correlationKey);
            throw th;
        }
    }

    private Object processRelease(Object obj, Message<?> message, Message<?> message2) {
        this.suspensions.remove(obj);
        if (message2.getPayload() instanceof Throwable) {
            throw new MessagingException(message, "Releasing flow returned a throwable", (Throwable) message2.getPayload());
        }
        return buildResult(obj, message, message2);
    }

    protected Object buildResult(Object obj, Message<?> message, Message<?> message2) {
        SimpleMessageGroup simpleMessageGroup = new SimpleMessageGroup(obj);
        simpleMessageGroup.add(message);
        simpleMessageGroup.add(message2);
        return this.messageGroupProcessor.processMessageGroup(simpleMessageGroup);
    }

    private SynchronousQueue<Message<?>> createOrObtainQueue(Object obj) {
        SynchronousQueue<Message<?>> synchronousQueue = new SynchronousQueue<>();
        SynchronousQueue<Message<?>> putIfAbsent = this.suspensions.putIfAbsent(obj, synchronousQueue);
        if (putIfAbsent != null) {
            synchronousQueue = putIfAbsent;
        }
        return synchronousQueue;
    }

    @Override // org.springframework.integration.handler.MessageTriggerAction
    public void trigger(Message<?> message) {
        Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
        if (correlationKey == null) {
            throw new MessagingException(message, "Correlation Strategy returned null");
        }
        try {
            if (!createOrObtainQueue(correlationKey).offer(message, this.triggerTimeout, TimeUnit.MILLISECONDS)) {
                this.logger.error("Suspending thread timed out or did not arrive within timeout for: " + message);
                this.suspensions.remove(correlationKey);
                MessageChannel discardChannel = getDiscardChannel();
                if (discardChannel != null) {
                    this.messagingTemplate.send((MessagingTemplate) discardChannel, message);
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.logger.error("Interrupted while waiting for the suspending thread for: " + message);
            this.suspensions.remove(correlationKey);
        }
    }
}
