package com.amplifyframework.datastore.syncengine;

import androidx.annotation.NonNull;
import com.amplifyframework.core.Action;
import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.Consumer;
import com.amplifyframework.core.model.Model;
import com.amplifyframework.core.model.SerializedModel;
import com.amplifyframework.datastore.DataStoreException;
import com.amplifyframework.datastore.storage.LocalStorageAdapter;
import com.amplifyframework.datastore.storage.StorageItemChange;
import com.amplifyframework.logging.Logger;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Objects;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: classes3.dex */
public final class StorageObserver {
    private static final Logger LOG = Amplify.Logging.forNamespace("amplify:aws-datastore");
    private final LocalStorageAdapter localStorageAdapter;
    private final MutationOutbox mutationOutbox;
    private final CompositeDisposable ongoingOperationsDisposable;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.amplifyframework.datastore.syncengine.StorageObserver$1, reason: invalid class name */
    /* loaded from: classes3.dex */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$amplifyframework$datastore$storage$StorageItemChange$Type;

        static {
            int[] iArr = new int[StorageItemChange.Type.values().length];
            $SwitchMap$com$amplifyframework$datastore$storage$StorageItemChange$Type = iArr;
            try {
                iArr[StorageItemChange.Type.CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError unused) {
            }
            try {
                $SwitchMap$com$amplifyframework$datastore$storage$StorageItemChange$Type[StorageItemChange.Type.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError unused2) {
            }
            try {
                $SwitchMap$com$amplifyframework$datastore$storage$StorageItemChange$Type[StorageItemChange.Type.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError unused3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StorageObserver(@NonNull LocalStorageAdapter localStorageAdapter, @NonNull MutationOutbox mutationOutbox) {
        Objects.requireNonNull(localStorageAdapter);
        this.localStorageAdapter = localStorageAdapter;
        Objects.requireNonNull(mutationOutbox);
        this.mutationOutbox = mutationOutbox;
        this.ongoingOperationsDisposable = new CompositeDisposable();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public /* synthetic */ void lambda$startObservingStorageChanges$0(Action action, final ObservableEmitter observableEmitter) throws Throwable {
        LocalStorageAdapter localStorageAdapter = this.localStorageAdapter;
        observableEmitter.getClass();
        localStorageAdapter.observe(new Consumer() { // from class: com.amplifyframework.datastore.syncengine.c2
            @Override // com.amplifyframework.core.Consumer
            public final void accept(Object obj) {
                ObservableEmitter.this.onNext((StorageItemChange) obj);
            }
        }, new Consumer() { // from class: com.amplifyframework.datastore.syncengine.d2
            @Override // com.amplifyframework.core.Consumer
            public final void accept(Object obj) {
                ObservableEmitter.this.onError((DataStoreException) obj);
            }
        }, new Action() { // from class: com.amplifyframework.datastore.syncengine.e2
            @Override // com.amplifyframework.core.Action
            public final void call() {
                ObservableEmitter.this.onComplete();
            }
        });
        action.call();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startObservingStorageChanges$1(Disposable disposable) throws Throwable {
        LOG.info("Now observing local storage. Local changes will be enqueued to mutation outbox.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ boolean lambda$startObservingStorageChanges$2(StorageItemChange storageItemChange) throws Throwable {
        return !StorageItemChange.Initiator.SYNC_ENGINE.equals(storageItemChange.initiator());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startObservingStorageChanges$3() throws Throwable {
        LOG.warn("Storage adapter subscription terminated with completion.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static /* synthetic */ void lambda$startObservingStorageChanges$4(Throwable th) throws Throwable {
        LOG.warn("Storage adapter subscription ended in error", th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PendingMutation<SerializedModel> toPendingMutation(StorageItemChange<? extends Model> storageItemChange) {
        int i2 = AnonymousClass1.$SwitchMap$com$amplifyframework$datastore$storage$StorageItemChange$Type[storageItemChange.type().ordinal()];
        if (i2 == 1) {
            return PendingMutation.creation(storageItemChange.patchItem(), storageItemChange.modelSchema());
        }
        if (i2 == 2) {
            return PendingMutation.update(storageItemChange.patchItem(), storageItemChange.modelSchema(), storageItemChange.predicate());
        }
        if (i2 == 3) {
            return PendingMutation.deletion(storageItemChange.patchItem(), storageItemChange.modelSchema(), storageItemChange.predicate());
        }
        throw new IllegalStateException("Unknown mutation type = " + storageItemChange.type());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startObservingStorageChanges(final Action action) {
        CompositeDisposable compositeDisposable = this.ongoingOperationsDisposable;
        Observable map = Observable.create(new ObservableOnSubscribe() { // from class: com.amplifyframework.datastore.syncengine.v1
            @Override // io.reactivex.rxjava3.core.ObservableOnSubscribe
            public final void subscribe(ObservableEmitter observableEmitter) {
                StorageObserver.this.lambda$startObservingStorageChanges$0(action, observableEmitter);
            }
        }).subscribeOn(Schedulers.single()).observeOn(Schedulers.single()).doOnSubscribe(new io.reactivex.rxjava3.functions.Consumer() { // from class: com.amplifyframework.datastore.syncengine.w1
            @Override // io.reactivex.rxjava3.functions.Consumer
            public final void accept(Object obj) {
                StorageObserver.lambda$startObservingStorageChanges$1((Disposable) obj);
            }
        }).filter(new Predicate() { // from class: com.amplifyframework.datastore.syncengine.x1
            @Override // io.reactivex.rxjava3.functions.Predicate
            public final boolean test(Object obj) {
                boolean lambda$startObservingStorageChanges$2;
                lambda$startObservingStorageChanges$2 = StorageObserver.lambda$startObservingStorageChanges$2((StorageItemChange) obj);
                return lambda$startObservingStorageChanges$2;
            }
        }).map(new Function() { // from class: com.amplifyframework.datastore.syncengine.y1
            @Override // io.reactivex.rxjava3.functions.Function
            public final Object apply(Object obj) {
                PendingMutation pendingMutation;
                pendingMutation = StorageObserver.this.toPendingMutation((StorageItemChange) obj);
                return pendingMutation;
            }
        });
        final MutationOutbox mutationOutbox = this.mutationOutbox;
        mutationOutbox.getClass();
        compositeDisposable.add(map.flatMapCompletable(new Function() { // from class: com.amplifyframework.datastore.syncengine.z1
            @Override // io.reactivex.rxjava3.functions.Function
            public final Object apply(Object obj) {
                return MutationOutbox.this.enqueue((PendingMutation) obj);
            }
        }).subscribe(new io.reactivex.rxjava3.functions.Action() { // from class: com.amplifyframework.datastore.syncengine.a2
            @Override // io.reactivex.rxjava3.functions.Action
            public final void run() {
                StorageObserver.lambda$startObservingStorageChanges$3();
            }
        }, new io.reactivex.rxjava3.functions.Consumer() { // from class: com.amplifyframework.datastore.syncengine.b2
            @Override // io.reactivex.rxjava3.functions.Consumer
            public final void accept(Object obj) {
                StorageObserver.lambda$startObservingStorageChanges$4((Throwable) obj);
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopObservingStorageChanges() {
        this.ongoingOperationsDisposable.clear();
    }
}
