package com.cyrillrx.tracker.consumer;

import com.cyrillrx.logger.Logger;
import com.cyrillrx.tracker.event.TrackEvent;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;

/* loaded from: classes.dex */
public abstract class StreamingConsumer extends EventConsumer<BlockingQueue<TrackEvent>> {
    private static final String TAG = "StreamingConsumer";
    protected Queue<TrackEvent> retryQueue;

    public StreamingConsumer(BlockingQueue<TrackEvent> blockingQueue) {
        this(blockingQueue, null);
    }

    public StreamingConsumer(BlockingQueue<TrackEvent> blockingQueue, Queue<TrackEvent> queue) {
        super(blockingQueue);
        this.retryQueue = queue;
    }

    @Override // com.cyrillrx.tracker.consumer.EventConsumer
    protected void consume() {
        try {
            TrackEvent trackEvent = (TrackEvent) ((BlockingQueue) this.events).take();
            if (EventConsumer.STOP_EVENT.equals(trackEvent)) {
                this.running = false;
            }
            try {
                doConsume(trackEvent);
            } catch (Exception e) {
                if (this.retryQueue == null) {
                    Logger.error(TAG, "Error while consuming the event without a retry queue. Rethrowing exception", e);
                    throw e;
                }
                Logger.info(TAG, "Error while consuming. Adding event to the retry queue.", e);
                this.retryQueue.add(trackEvent);
            }
        } catch (InterruptedException unused) {
            this.running = false;
        }
    }

    protected abstract void doConsume(TrackEvent trackEvent);
}
