package org.springframework.integration.redis.inbound;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.redis.event.RedisExceptionEvent;
import org.springframework.integration.support.channel.ChannelResolverUtils;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;

@IntegrationManagedResource
@ManagedResource
/* loaded from: input_file:BOOT-INF/lib/spring-integration-redis-6.4.4.jar:org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.class */
public class RedisQueueMessageDrivenEndpoint extends MessageProducerSupport implements ApplicationEventPublisherAware, BeanClassLoaderAware {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    private final BoundListOperations<String, byte[]> boundListOperations;
    private ApplicationEventPublisher applicationEventPublisher;
    private Executor taskExecutor;
    private RedisSerializer<?> serializer;
    private boolean serializerExplicitlySet;
    private boolean expectMessage = false;
    private long receiveTimeout = 1000;
    private long recoveryInterval = 5000;
    private boolean rightPop = true;
    private volatile boolean listening;
    private volatile Runnable stopCallback;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-redis-6.4.4.jar:org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint$ListenerTask.class */
    public class ListenerTask implements SchedulingAwareRunnable {
        ListenerTask() {
        }

        @Override // org.springframework.scheduling.SchedulingAwareRunnable
        public boolean isLongLived() {
            return true;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean isActive;
            while (RedisQueueMessageDrivenEndpoint.this.isActive()) {
                try {
                    RedisQueueMessageDrivenEndpoint.this.listening = true;
                    RedisQueueMessageDrivenEndpoint.this.popMessageAndSend();
                } finally {
                    if (RedisQueueMessageDrivenEndpoint.this.isActive()) {
                        RedisQueueMessageDrivenEndpoint.this.restart();
                    } else if (RedisQueueMessageDrivenEndpoint.this.stopCallback != null) {
                        RedisQueueMessageDrivenEndpoint.this.stopCallback.run();
                        RedisQueueMessageDrivenEndpoint.this.stopCallback = null;
                    }
                }
            }
            if (isActive) {
                return;
            }
        }
    }

    public RedisQueueMessageDrivenEndpoint(String str, RedisConnectionFactory redisConnectionFactory) {
        Assert.hasText(str, "'queueName' is required");
        Assert.notNull(redisConnectionFactory, "'connectionFactory' must not be null");
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setEnableDefaultSerializer(false);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.afterPropertiesSet();
        this.boundListOperations = redisTemplate.boundListOps(str);
    }

    @Override // org.springframework.context.ApplicationEventPublisherAware
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    @Override // org.springframework.beans.factory.BeanClassLoaderAware
    public void setBeanClassLoader(ClassLoader classLoader) {
        if (this.serializerExplicitlySet) {
            return;
        }
        this.serializer = new JdkSerializationRedisSerializer(classLoader);
    }

    public void setSerializer(RedisSerializer<?> redisSerializer) {
        this.serializer = redisSerializer;
        this.serializerExplicitlySet = true;
    }

    public void setExpectMessage(boolean z) {
        this.expectMessage = z;
    }

    public void setReceiveTimeout(long j) {
        Assert.isTrue(j >= 0, "'receiveTimeout' must be >= 0.");
        this.receiveTimeout = j;
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor;
    }

    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    public void setRightPop(boolean z) {
        this.rightPop = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        if (this.expectMessage) {
            Assert.notNull(this.serializer, "'serializer' has to be provided where 'expectMessage == true'.");
        }
        if (this.taskExecutor == null) {
            String componentName = getComponentName();
            this.taskExecutor = new SimpleAsyncTaskExecutor((componentName == null ? "" : componentName + "-") + getComponentType());
        }
        BeanFactory beanFactory = getBeanFactory();
        if ((this.taskExecutor instanceof ErrorHandlingTaskExecutor) || beanFactory == null) {
            return;
        }
        MessagePublishingErrorHandler messagePublishingErrorHandler = new MessagePublishingErrorHandler(ChannelResolverUtils.getChannelResolver(beanFactory));
        messagePublishingErrorHandler.setDefaultErrorChannel(getErrorChannel());
        this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, messagePublishingErrorHandler);
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "redis:queue-inbound-channel-adapter";
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void popMessageAndSend() {
        byte[] popForValue = popForValue();
        Message<?> message = null;
        if (popForValue != null) {
            if (this.expectMessage) {
                try {
                    message = (Message) this.serializer.deserialize(popForValue);
                } catch (Exception e) {
                    throw new MessagingException("Deserialization of Message failed.", e);
                }
            } else {
                byte[] bArr = popForValue;
                if (this.serializer != null) {
                    bArr = this.serializer.deserialize(popForValue);
                }
                if (bArr != null) {
                    message = getMessageBuilderFactory().withPayload(bArr).build();
                }
            }
        }
        if (message != null) {
            if (this.listening) {
                sendMessage(message);
            } else if (this.rightPop) {
                this.boundListOperations.rightPush(popForValue);
            } else {
                this.boundListOperations.leftPush(popForValue);
            }
        }
    }

    private byte[] popForValue() {
        byte[] bArr = null;
        try {
            bArr = this.rightPop ? this.boundListOperations.rightPop(this.receiveTimeout, TimeUnit.MILLISECONDS) : this.boundListOperations.leftPop(this.receiveTimeout, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            this.listening = false;
            if (isActive()) {
                this.logger.error(e, "Failed to execute listening task. Will attempt to resubmit in " + this.recoveryInterval + " milliseconds.");
                publishException(e);
                sleepBeforeRecoveryAttempt();
            } else {
                this.logger.debug(() -> {
                    return "Failed to execute listening task. " + String.valueOf(e.getClass()) + ": " + e.getMessage();
                });
            }
        }
        return bArr;
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        restart();
    }

    private void sleepBeforeRecoveryAttempt() {
        if (this.recoveryInterval > 0) {
            try {
                Thread.sleep(this.recoveryInterval);
            } catch (InterruptedException e) {
                this.logger.debug("Thread interrupted while sleeping the recovery interval");
                Thread.currentThread().interrupt();
            }
        }
    }

    private void publishException(Exception exc) {
        if (this.applicationEventPublisher != null) {
            this.applicationEventPublisher.publishEvent((ApplicationEvent) new RedisExceptionEvent(this, exc));
        } else {
            this.logger.debug(() -> {
                return "No application event publisher for exception: " + exc.getMessage();
            });
        }
    }

    private void restart() {
        this.taskExecutor.execute(new ListenerTask());
    }

    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop(Runnable runnable) {
        this.stopCallback = runnable;
        doStop();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        super.doStop();
        this.listening = false;
    }

    public boolean isListening() {
        return this.listening;
    }

    @ManagedMetric
    public long getQueueSize() {
        return ((Long) Optional.ofNullable(this.boundListOperations.size()).orElse(0L)).longValue();
    }

    @ManagedOperation
    public void clearQueue() {
        this.boundListOperations.getOperations().delete((RedisOperations<String, byte[]>) this.boundListOperations.getKey());
    }
}
