package com.amplifyframework.datastore.syncengine;

import androidx.annotation.NonNull;
import com.amplifyframework.AmplifyException;
import com.amplifyframework.api.ApiException;
import com.amplifyframework.api.graphql.GraphQLResponse;
import com.amplifyframework.api.graphql.SubscriptionType;
import com.amplifyframework.core.Action;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.Consumer;
import com.amplifyframework.core.async.Cancelable;
import com.amplifyframework.core.category.CategoryType;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.core.model.ModelProvider;
import com.amplifyframework.core.model.ModelSchema;
import com.amplifyframework.core.model.SchemaRegistry;
import com.amplifyframework.core.model.SerializedModel;
import com.amplifyframework.datastore.AmplifyDisposables;
import com.amplifyframework.datastore.DataStoreChannelEventName;
import com.amplifyframework.datastore.DataStoreException;
import com.amplifyframework.datastore.appsync.AppSync;
import com.amplifyframework.datastore.appsync.AppSyncExtensions;
import com.amplifyframework.datastore.appsync.ModelWithMetadata;
import com.amplifyframework.datastore.syncengine.SubscriptionEvent;
import com.amplifyframework.datastore.utils.ErrorInspector;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.hub.HubEvent;
import com.amplifyframework.logging.Logger;
import com.amplifyframework.util.Empty;
import fq.a;
import io.reactivex.rxjava3.internal.operators.observable.d;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes.dex */
public final class SubscriptionProcessor {
    private static final Logger LOG = Amplify.Logging.logger(CategoryType.DATASTORE, "amplify:aws-datastore");
    private static final long NETWORK_OP_TIMEOUT_SECONDS = 60;
    private static final long TIMEOUT_SECONDS_PER_MODEL = 20;
    private final long adjustedTimeoutSeconds;
    private final AppSync appSync;
    private io.reactivex.rxjava3.subjects.e<SubscriptionEvent<? extends Model>> buffer;
    private final Merger merger;
    private final ModelProvider modelProvider;
    private final Consumer<Throwable> onFailure;
    private final cq.a ongoingOperationsDisposable;
    private final QueryPredicateProvider queryPredicateProvider;
    private final SchemaRegistry schemaRegistry;

    /* renamed from: com.amplifyframework.datastore.syncengine.SubscriptionProcessor$1, reason: invalid class name */
    /* loaded from: classes.dex */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType;

        static {
            int[] iArr = new int[SubscriptionType.values().length];
            $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType = iArr;
            try {
                iArr[SubscriptionType.ON_UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError unused) {
            }
            try {
                $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[SubscriptionType.ON_DELETE.ordinal()] = 2;
            } catch (NoSuchFieldError unused2) {
            }
            try {
                $SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[SubscriptionType.ON_CREATE.ordinal()] = 3;
            } catch (NoSuchFieldError unused3) {
            }
        }
    }

    /* loaded from: classes.dex */
    public interface AppSyncStep {
        @NonNull
        ModelProviderStep appSync(@NonNull AppSync appSync);
    }

    /* loaded from: classes.dex */
    public interface BuildStep {
        @NonNull
        SubscriptionProcessor build();
    }

    /* loaded from: classes.dex */
    public static final class Builder implements AppSyncStep, ModelProviderStep, SchemaRegistryStep, MergerStep, QueryPredicateProviderStep, OnFailureStep, BuildStep {
        private AppSync appSync;
        private Merger merger;
        private ModelProvider modelProvider;
        private Consumer<Throwable> onFailure;
        private QueryPredicateProvider queryPredicateProvider;
        private SchemaRegistry schemaRegistry;

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.AppSyncStep
        @NonNull
        public ModelProviderStep appSync(@NonNull AppSync appSync) {
            Objects.requireNonNull(appSync);
            this.appSync = appSync;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.BuildStep
        @NonNull
        public SubscriptionProcessor build() {
            return new SubscriptionProcessor(this, null);
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.MergerStep
        @NonNull
        public QueryPredicateProviderStep merger(@NonNull Merger merger) {
            Objects.requireNonNull(merger);
            this.merger = merger;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.ModelProviderStep
        @NonNull
        public SchemaRegistryStep modelProvider(@NonNull ModelProvider modelProvider) {
            Objects.requireNonNull(modelProvider);
            this.modelProvider = modelProvider;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.OnFailureStep
        @NonNull
        public BuildStep onFailure(Consumer<Throwable> consumer) {
            Objects.requireNonNull(consumer);
            this.onFailure = consumer;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.QueryPredicateProviderStep
        @NonNull
        public OnFailureStep queryPredicateProvider(QueryPredicateProvider queryPredicateProvider) {
            Objects.requireNonNull(queryPredicateProvider);
            this.queryPredicateProvider = queryPredicateProvider;
            return this;
        }

        @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SchemaRegistryStep
        @NonNull
        public MergerStep schemaRegistry(@NonNull SchemaRegistry schemaRegistry) {
            Objects.requireNonNull(schemaRegistry);
            this.schemaRegistry = schemaRegistry;
            return this;
        }
    }

    /* loaded from: classes.dex */
    public interface MergerStep {
        @NonNull
        QueryPredicateProviderStep merger(@NonNull Merger merger);
    }

    /* loaded from: classes.dex */
    public interface ModelProviderStep {
        @NonNull
        SchemaRegistryStep modelProvider(@NonNull ModelProvider modelProvider);
    }

    /* loaded from: classes.dex */
    public interface OnFailureStep {
        @NonNull
        BuildStep onFailure(Consumer<Throwable> consumer);
    }

    /* loaded from: classes.dex */
    public interface QueryPredicateProviderStep {
        @NonNull
        OnFailureStep queryPredicateProvider(QueryPredicateProvider queryPredicateProvider);
    }

    /* loaded from: classes.dex */
    public interface SchemaRegistryStep {
        @NonNull
        MergerStep schemaRegistry(@NonNull SchemaRegistry schemaRegistry);
    }

    /* loaded from: classes.dex */
    public interface SubscriptionMethod {
        <T extends Model> Cancelable subscribe(@NonNull ModelSchema modelSchema, @NonNull Consumer<String> consumer, @NonNull Consumer<GraphQLResponse<ModelWithMetadata<T>>> consumer2, @NonNull Consumer<DataStoreException> consumer3, @NonNull Action action);
    }

    /* JADX WARN: Type inference failed for: r5v2, types: [java.lang.Object, cq.a] */
    private SubscriptionProcessor(Builder builder) {
        this.appSync = builder.appSync;
        this.modelProvider = builder.modelProvider;
        this.merger = builder.merger;
        this.queryPredicateProvider = builder.queryPredicateProvider;
        this.onFailure = builder.onFailure;
        this.schemaRegistry = builder.schemaRegistry;
        this.ongoingOperationsDisposable = new Object();
        this.adjustedTimeoutSeconds = Math.max(NETWORK_OP_TIMEOUT_SECONDS, Math.max(r0.models().size(), r0.modelSchemas().size()) * TIMEOUT_SECONDS_PER_MODEL);
    }

    public /* synthetic */ SubscriptionProcessor(Builder builder, AnonymousClass1 anonymousClass1) {
        this(builder);
    }

    public static AppSyncStep builder() {
        return new Builder();
    }

    private static SubscriptionEvent.Type fromSubscriptionType(SubscriptionType subscriptionType) {
        int i10 = AnonymousClass1.$SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[subscriptionType.ordinal()];
        if (i10 == 1) {
            return SubscriptionEvent.Type.UPDATE;
        }
        if (i10 == 2) {
            return SubscriptionEvent.Type.DELETE;
        }
        if (i10 == 3) {
            return SubscriptionEvent.Type.CREATE;
        }
        throw new IllegalArgumentException("Unknown subscription type: " + subscriptionType);
    }

    private boolean isExceptionType(DataStoreException dataStoreException, AppSyncExtensions.AppSyncErrorType appSyncErrorType) {
        if (!(dataStoreException instanceof DataStoreException.GraphQLResponseException)) {
            return false;
        }
        GraphQLResponse.Error error = ((DataStoreException.GraphQLResponseException) dataStoreException).getErrors().get(0);
        if (Empty.check(error.getExtensions())) {
            return false;
        }
        return appSyncErrorType.equals(new AppSyncExtensions(error.getExtensions()).getErrorType());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startDrainingMutationBuffer$10(cq.b bVar) throws Throwable {
        LOG.info("Starting processing subscription data buffer.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startDrainingMutationBuffer$11(Throwable th2) throws Throwable {
        LOG.warn("Reading subscriptions buffer has failed.", th2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startDrainingMutationBuffer$12() throws Throwable {
        LOG.warn("Reading from subscriptions buffer is completed.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startSubscriptions$0(cq.b bVar) throws Throwable {
        LOG.info("Starting processing subscription events.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startSubscriptions$1(Throwable th2) throws Throwable {
        LOG.warn("Reading subscription events has failed.", th2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startSubscriptions$2() throws Throwable {
        LOG.warn("Reading subscription events is completed.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$subscriptionObservable$3(SubscriptionType subscriptionType, ModelSchema modelSchema, AtomicReference atomicReference, AbortableCountDownLatch abortableCountDownLatch, String str) {
        LOG.debug("Subscription started for " + subscriptionType.name() + " " + modelSchema.getName() + " subscriptionId: " + str);
        atomicReference.set(str);
        abortableCountDownLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$subscriptionObservable$4(AbortableCountDownLatch abortableCountDownLatch, SubscriptionType subscriptionType, ModelSchema modelSchema, DataStoreException dataStoreException) {
        if (ErrorInspector.contains(dataStoreException, (Class<? extends Throwable>) ApiException.ApiAuthException.class) || isExceptionType(dataStoreException, AppSyncExtensions.AppSyncErrorType.UNAUTHORIZED)) {
            abortableCountDownLatch.countDown();
            LOG.warn("Unauthorized failure:" + subscriptionType.name() + " " + modelSchema.getName());
            return;
        }
        if (!isExceptionType(dataStoreException, AppSyncExtensions.AppSyncErrorType.OPERATION_DISABLED)) {
            if (abortableCountDownLatch.getCount() > 0) {
                abortableCountDownLatch.abort(dataStoreException);
                return;
            } else {
                this.onFailure.accept(dataStoreException);
                return;
            }
        }
        abortableCountDownLatch.countDown();
        LOG.warn("Operation disabled:" + subscriptionType.name() + " " + modelSchema.getName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$subscriptionObservable$5(AtomicReference atomicReference, bq.n nVar) {
        LOG.debug("Subscription completed:" + ((String) atomicReference.get()));
        ((d.a) nVar).b();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void lambda$subscriptionObservable$6(AppSync appSync, final SubscriptionType subscriptionType, final ModelSchema modelSchema, final AbortableCountDownLatch abortableCountDownLatch, final bq.n nVar) throws Throwable {
        SubscriptionMethod subscriptionMethodFor = subscriptionMethodFor(appSync, subscriptionType);
        final AtomicReference atomicReference = new AtomicReference();
        Consumer<String> consumer = new Consumer() { // from class: com.amplifyframework.datastore.syncengine.j3
            @Override // com.amplifyframework.core.Consumer
            public final void accept(Object obj) {
                SubscriptionProcessor.lambda$subscriptionObservable$3(SubscriptionType.this, modelSchema, atomicReference, abortableCountDownLatch, (String) obj);
            }
        };
        Objects.requireNonNull(nVar);
        eq.a.set((d.a) nVar, AmplifyDisposables.fromCancelable(subscriptionMethodFor.subscribe(modelSchema, consumer, new Consumer() { // from class: com.amplifyframework.datastore.syncengine.k3
            @Override // com.amplifyframework.core.Consumer
            public final void accept(Object obj) {
                ((d.a) bq.n.this).d((GraphQLResponse) obj);
            }
        }, new Consumer() { // from class: com.amplifyframework.datastore.syncengine.r2
            @Override // com.amplifyframework.core.Consumer
            public final void accept(Object obj) {
                SubscriptionProcessor.this.lambda$subscriptionObservable$4(abortableCountDownLatch, subscriptionType, modelSchema, (DataStoreException) obj);
            }
        }, new Action() { // from class: com.amplifyframework.datastore.syncengine.s2
            @Override // com.amplifyframework.core.Action
            public final void call() {
                SubscriptionProcessor.lambda$subscriptionObservable$5(atomicReference, nVar);
            }
        })));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$subscriptionObservable$7(SubscriptionType subscriptionType, ModelSchema modelSchema, Throwable th2) throws Throwable {
        LOG.warn("An error occurred on the remote " + subscriptionType.name() + " subscription for model " + modelSchema.getName(), th2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ boolean lambda$subscriptionObservable$8(ModelSchema modelSchema, ModelWithMetadata modelWithMetadata) throws Throwable {
        return this.queryPredicateProvider.getPredicate(modelSchema.getName()).evaluate(modelWithMetadata.getModel());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ SubscriptionEvent lambda$subscriptionObservable$9(SubscriptionType subscriptionType, ModelSchema modelSchema, ModelWithMetadata modelWithMetadata) throws Throwable {
        return SubscriptionEvent.builder().type(fromSubscriptionType(subscriptionType)).modelWithMetadata(modelWithMetadata).modelSchema(modelSchema).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public bq.a mergeEvent(SubscriptionEvent<? extends Model> subscriptionEvent) {
        ModelWithMetadata<? extends Model> modelWithMetadata = subscriptionEvent.modelWithMetadata();
        if (!(modelWithMetadata.getModel() instanceof SerializedModel)) {
            return this.merger.merge(modelWithMetadata);
        }
        return this.merger.merge(new ModelWithMetadata(SerializedModel.builder().modelSchema(subscriptionEvent.modelSchema()).serializedData(SerializedModel.parseSerializedData(((SerializedModel) modelWithMetadata.getModel()).getSerializedData(), subscriptionEvent.modelSchema().getName(), this.schemaRegistry)).build(), modelWithMetadata.getSyncMetadata()));
    }

    public static SubscriptionMethod subscriptionMethodFor(final AppSync appSync, SubscriptionType subscriptionType) throws DataStoreException {
        int i10 = AnonymousClass1.$SwitchMap$com$amplifyframework$api$graphql$SubscriptionType[subscriptionType.ordinal()];
        if (i10 == 1) {
            Objects.requireNonNull(appSync);
            return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.q2
                @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
                public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                    return AppSync.this.onUpdate(modelSchema, consumer, consumer2, consumer3, action);
                }
            };
        }
        if (i10 == 2) {
            Objects.requireNonNull(appSync);
            return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.b3
                @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
                public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                    return AppSync.this.onDelete(modelSchema, consumer, consumer2, consumer3, action);
                }
            };
        }
        if (i10 != 3) {
            throw new DataStoreException("Failed to establish a model subscription.", "Was a new subscription type created?");
        }
        Objects.requireNonNull(appSync);
        return new SubscriptionMethod() { // from class: com.amplifyframework.datastore.syncengine.d3
            @Override // com.amplifyframework.datastore.syncengine.SubscriptionProcessor.SubscriptionMethod
            public final Cancelable subscribe(ModelSchema modelSchema, Consumer consumer, Consumer consumer2, Consumer consumer3, Action action) {
                return AppSync.this.onCreate(modelSchema, consumer, consumer2, consumer3, action);
            }
        };
    }

    /* JADX WARN: Type inference failed for: r10v3, types: [java.lang.Object, dq.e] */
    private <T extends Model> bq.m<SubscriptionEvent<? extends Model>> subscriptionObservable(final AppSync appSync, final SubscriptionType subscriptionType, final AbortableCountDownLatch<DataStoreException> abortableCountDownLatch, final ModelSchema modelSchema) {
        io.reactivex.rxjava3.internal.operators.observable.g gVar = new io.reactivex.rxjava3.internal.operators.observable.g(new io.reactivex.rxjava3.internal.operators.observable.d(new bq.o() { // from class: com.amplifyframework.datastore.syncengine.e3
            @Override // bq.o
            public final void a(d.a aVar) {
                SubscriptionProcessor.this.lambda$subscriptionObservable$6(appSync, subscriptionType, modelSchema, abortableCountDownLatch, aVar);
            }
        }), fq.a.f41274d, new dq.d() { // from class: com.amplifyframework.datastore.syncengine.f3
            @Override // dq.d
            public final void accept(Object obj) {
                SubscriptionProcessor.lambda$subscriptionObservable$7(SubscriptionType.this, modelSchema, (Throwable) obj);
            }
        }, fq.a.f41273c);
        io.reactivex.rxjava3.internal.schedulers.c cVar = jq.a.f43748c;
        return new io.reactivex.rxjava3.internal.operators.observable.r(new io.reactivex.rxjava3.internal.operators.observable.l(new io.reactivex.rxjava3.internal.operators.observable.r(gVar.g(cVar).e(cVar), new Object()), new dq.f() { // from class: com.amplifyframework.datastore.syncengine.h3
            @Override // dq.f
            public final boolean test(Object obj) {
                boolean lambda$subscriptionObservable$8;
                lambda$subscriptionObservable$8 = SubscriptionProcessor.this.lambda$subscriptionObservable$8(modelSchema, (ModelWithMetadata) obj);
                return lambda$subscriptionObservable$8;
            }
        }), new dq.e() { // from class: com.amplifyframework.datastore.syncengine.i3
            @Override // dq.e
            public final Object apply(Object obj) {
                SubscriptionEvent lambda$subscriptionObservable$9;
                lambda$subscriptionObservable$9 = SubscriptionProcessor.lambda$subscriptionObservable$9(SubscriptionType.this, modelSchema, (ModelWithMetadata) obj);
                return lambda$subscriptionObservable$9;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T extends Model> ModelWithMetadata<T> unwrapResponse(GraphQLResponse<? extends ModelWithMetadata<T>> graphQLResponse) throws DataStoreException {
        String format = graphQLResponse.hasErrors() ? String.format("Errors on subscription: %s", graphQLResponse.getErrors()) : !graphQLResponse.hasData() ? "Empty data received on subscription." : null;
        if (format == null) {
            return graphQLResponse.getData();
        }
        throw new DataStoreException(format, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v2, types: [java.lang.Object, dq.d] */
    /* JADX WARN: Type inference failed for: r2v0, types: [java.lang.Object, dq.d] */
    /* JADX WARN: Type inference failed for: r2v2, types: [java.lang.Object, dq.a] */
    /* JADX WARN: Type inference failed for: r2v3, types: [bq.c, cq.b, java.util.concurrent.atomic.AtomicReference] */
    public void startDrainingMutationBuffer() {
        cq.a aVar = this.ongoingOperationsDisposable;
        io.reactivex.rxjava3.subjects.e<SubscriptionEvent<? extends Model>> eVar = this.buffer;
        ?? obj = new Object();
        a.C1063a c1063a = fq.a.f41273c;
        eVar.getClass();
        io.reactivex.rxjava3.internal.operators.completable.j e10 = new io.reactivex.rxjava3.internal.operators.observable.n(new io.reactivex.rxjava3.internal.operators.observable.h(eVar, obj, c1063a), new dq.e() { // from class: com.amplifyframework.datastore.syncengine.u2
            @Override // dq.e
            public final Object apply(Object obj2) {
                bq.a mergeEvent;
                mergeEvent = SubscriptionProcessor.this.mergeEvent((SubscriptionEvent) obj2);
                return mergeEvent;
            }
        }).f(new Object()).e(new Object());
        ?? atomicReference = new AtomicReference();
        e10.a(atomicReference);
        aVar.b(atomicReference);
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [java.lang.Object, dq.d] */
    /* JADX WARN: Type inference failed for: r0v12, types: [java.lang.Object, dq.a] */
    /* JADX WARN: Type inference failed for: r3v3, types: [java.lang.Object, dq.d] */
    public synchronized void startSubscriptions() throws DataStoreException {
        try {
            AbortableCountDownLatch<DataStoreException> abortableCountDownLatch = new AbortableCountDownLatch<>(this.modelProvider.modelNames().size() * SubscriptionType.values().length);
            this.buffer = new io.reactivex.rxjava3.subjects.e<>(bq.f.f10631c);
            HashSet hashSet = new HashSet();
            Iterator<ModelSchema> it = this.modelProvider.modelSchemas().values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ModelSchema next = it.next();
                for (SubscriptionType subscriptionType : SubscriptionType.values()) {
                    hashSet.add(subscriptionObservable(this.appSync, subscriptionType, abortableCountDownLatch, next));
                }
            }
            cq.a aVar = this.ongoingOperationsDisposable;
            bq.m<R> d10 = new io.reactivex.rxjava3.internal.operators.observable.p(hashSet).d(fq.a.f41271a);
            io.reactivex.rxjava3.internal.schedulers.c cVar = jq.a.f43748c;
            io.reactivex.rxjava3.internal.operators.observable.s e10 = d10.g(cVar).e(cVar);
            ?? obj = new Object();
            a.C1063a c1063a = fq.a.f41273c;
            io.reactivex.rxjava3.internal.operators.observable.h hVar = new io.reactivex.rxjava3.internal.operators.observable.h(e10, obj, c1063a);
            ?? obj2 = new Object();
            a.b bVar = fq.a.f41274d;
            io.reactivex.rxjava3.internal.operators.observable.g gVar = new io.reactivex.rxjava3.internal.operators.observable.g(new io.reactivex.rxjava3.internal.operators.observable.g(hVar, bVar, obj2, c1063a), bVar, bVar, new Object());
            final io.reactivex.rxjava3.subjects.e<SubscriptionEvent<? extends Model>> eVar = this.buffer;
            Objects.requireNonNull(eVar);
            dq.d dVar = new dq.d() { // from class: com.amplifyframework.datastore.syncengine.a3
                @Override // dq.d
                public final void accept(Object obj3) {
                    io.reactivex.rxjava3.subjects.e.this.b((SubscriptionEvent) obj3);
                }
            };
            io.reactivex.rxjava3.subjects.e<SubscriptionEvent<? extends Model>> eVar2 = this.buffer;
            Objects.requireNonNull(eVar2);
            c3 c3Var = new c3(eVar2);
            io.reactivex.rxjava3.subjects.e<SubscriptionEvent<? extends Model>> eVar3 = this.buffer;
            Objects.requireNonNull(eVar3);
            io.reactivex.rxjava3.internal.observers.j jVar = new io.reactivex.rxjava3.internal.observers.j(dVar, c3Var, new androidx.fragment.app.v0(eVar3));
            gVar.c(jVar);
            aVar.b(jVar);
            try {
                Logger logger = LOG;
                logger.debug("Waiting for subscriptions to start.");
                if (!abortableCountDownLatch.abortableAwait(this.adjustedTimeoutSeconds, TimeUnit.SECONDS)) {
                    throw new DataStoreException("Timed out waiting for subscription processor to start.", "Retry");
                }
                Amplify.Hub.publish(HubChannel.DATASTORE, HubEvent.create(DataStoreChannelEventName.SUBSCRIPTIONS_ESTABLISHED));
                logger.info(String.format(Locale.US, "Started subscription processor for models: %s of types %s.", this.modelProvider.modelNames(), Arrays.toString(SubscriptionType.values())));
            } catch (InterruptedException unused) {
                LOG.warn("Subscription operations were interrupted during setup.");
            }
        } catch (Throwable th2) {
            throw th2;
        }
    }

    public synchronized void stopAllSubscriptionActivity() {
        Logger logger = LOG;
        logger.info("Stopping subscription processor.");
        this.ongoingOperationsDisposable.d();
        logger.info("Stopped subscription processor.");
    }
}
