package com.taobao.message.platform.eventlistener;

import com.taobao.message.common.inter.service.event.Event;
import com.taobao.message.common.inter.service.event.EventListener;
import com.taobao.message.platform.eventlistener.BaseSyncHandlerTask;
import com.taobao.message.sync.common.SingleTaskConsumer;
import com.taobao.message.sync.common.TaskContext;
import com.taobao.message.sync.common.TaskProvider;
import com.taobao.message.sync.sdk.worker.MergeTaskLinkedBlockingQueue;

/* loaded from: classes7.dex */
public abstract class BaseDataSourceEventListener<T extends BaseSyncHandlerTask> implements EventListener, TaskProvider<T> {
    public MergeTaskLinkedBlockingQueue<T> queue = new MergeTaskLinkedBlockingQueue<>();
    public SingleTaskConsumer<T> taskConsumer = new DataSourceTaskConsume(this);

    /* loaded from: classes7.dex */
    public class DataSourceTaskConsume extends SingleTaskConsumer<T> {
        public DataSourceTaskConsume(TaskProvider<T> taskProvider) {
            super(taskProvider);
        }

        @Override // com.taobao.message.sync.common.SingleTaskConsumer
        public void consume(T t, TaskContext taskContext) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
            t.execute(taskContext);
        }
    }

    public BaseDataSourceEventListener() {
        this.taskConsumer.start();
    }

    public abstract T createTask(Event<?> event);

    public abstract boolean filterEvent(Event<?> event);

    @Override // com.taobao.message.common.inter.service.event.EventListener
    public void onEvent(Event<?> event) {
        if (filterEvent(event)) {
            this.queue.addTask(createTask(event));
        }
    }

    @Override // com.taobao.message.sync.common.TaskProvider
    public T provide() {
        return this.queue.popTask();
    }
}
