package dev.ragnarok.fenrir.realtime;

import android.content.Context;
import dev.ragnarok.fenrir.Includes;
import dev.ragnarok.fenrir.api.interfaces.INetworker;
import dev.ragnarok.fenrir.api.model.VKApiMessage;
import dev.ragnarok.fenrir.api.model.longpoll.AddMessageUpdate;
import dev.ragnarok.fenrir.db.interfaces.IStorages;
import dev.ragnarok.fenrir.domain.IMessagesRepository;
import dev.ragnarok.fenrir.domain.IOwnersRepository;
import dev.ragnarok.fenrir.domain.Repository;
import dev.ragnarok.fenrir.longpoll.LongPollNotificationHelper;
import dev.ragnarok.fenrir.model.Message;
import dev.ragnarok.fenrir.model.Peer;
import dev.ragnarok.fenrir.push.NotificationScheduler;
import dev.ragnarok.fenrir.realtime.TmpResult;
import dev.ragnarok.fenrir.util.Logger;
import dev.ragnarok.fenrir.util.Pair;
import dev.ragnarok.fenrir.util.PersistentLogger;
import dev.ragnarok.fenrir.util.VKOwnIds;
import dev.ragnarok.fenrir.util.coroutines.CoroutinesUtils;
import dev.ragnarok.fenrir.util.coroutines.CoroutinesUtils$myEmit$1;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.intrinsics.CoroutineSingletons;
import kotlin.coroutines.jvm.internal.ContinuationImpl;
import kotlin.coroutines.jvm.internal.DebugMetadata;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.Dispatchers;
import kotlinx.coroutines.channels.BufferOverflow;
import kotlinx.coroutines.flow.Flow;
import kotlinx.coroutines.flow.FlowCollector;
import kotlinx.coroutines.flow.FlowKt;
import kotlinx.coroutines.flow.MutableSharedFlow;
import kotlinx.coroutines.flow.SafeFlow;
import kotlinx.coroutines.flow.SharedFlow;
import kotlinx.coroutines.flow.SharedFlowKt;
import kotlinx.coroutines.flow.internal.CombineKt$zipImpl$$inlined$unsafeFlow$1;
import kotlinx.coroutines.scheduling.DefaultIoScheduler;
import kotlinx.coroutines.scheduling.DefaultScheduler;

/* compiled from: RealtimeMessagesProcessor.kt */
/* loaded from: classes2.dex */
public final class RealtimeMessagesProcessor implements IRealtimeMessagesProcessor {
    public static final Companion Companion = new Companion(null);
    private static final AtomicInteger ID_GENERATOR = new AtomicInteger();
    private static final String TAG = "RealtimeMessagesProcessor";
    private final Context app;
    private volatile Entry current;
    private final IMessagesRepository messagesInteractor;
    private final INetworker networker;
    private final HashMap<Long, Pair<Long, Long>> notificationsInterceptors;
    private final IOwnersRepository ownersRepository;
    private final MutableSharedFlow<TmpResult> publishSubject;
    private final List<Entry> queue;
    private final IStorages repositories;
    private final Object stateLock;

    /* compiled from: RealtimeMessagesProcessor.kt */
    /* loaded from: classes2.dex */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }

        public final Set<Long> getChatIds$app_fenrir_kateRelease(TmpResult result) {
            Intrinsics.checkNotNullParameter(result, "result");
            Iterator<TmpResult.Msg> it = result.getData().iterator();
            HashSet hashSet = null;
            while (it.hasNext()) {
                VKApiMessage dto = it.next().getDto();
                if (dto != null && Peer.Companion.isGroupChat(dto.getPeer_id())) {
                    if (hashSet == null) {
                        hashSet = new HashSet(1);
                    }
                    hashSet.add(Long.valueOf(dto.getPeer_id()));
                }
            }
            return hashSet;
        }

        public final VKOwnIds getOwnIds$app_fenrir_kateRelease(TmpResult result) {
            Intrinsics.checkNotNullParameter(result, "result");
            VKOwnIds vKOwnIds = new VKOwnIds();
            Iterator<TmpResult.Msg> it = result.getData().iterator();
            while (it.hasNext()) {
                VKApiMessage dto = it.next().getDto();
                if (dto != null) {
                    vKOwnIds.append(dto);
                }
            }
            return vKOwnIds;
        }
    }

    public RealtimeMessagesProcessor() {
        CoroutinesUtils coroutinesUtils = CoroutinesUtils.INSTANCE;
        BufferOverflow bufferOverflow = BufferOverflow.SUSPEND;
        this.publishSubject = SharedFlowKt.MutableSharedFlow();
        Includes includes = Includes.INSTANCE;
        this.repositories = includes.getStores();
        this.networker = includes.getNetworkInterfaces();
        this.stateLock = new Object();
        this.queue = new LinkedList();
        this.app = includes.provideApplicationContext();
        this.notificationsInterceptors = new HashMap<>(3);
        Repository repository = Repository.INSTANCE;
        this.ownersRepository = repository.getOwners();
        this.messagesInteractor = repository.getMessages();
    }

    private final void addToQueue(Entry entry) {
        synchronized (this.stateLock) {
            this.queue.add(entry);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Function2<TmpResult, Continuation<? super Flow<TmpResult>>, Object> andStore() {
        return new RealtimeMessagesProcessor$andStore$1(this, null);
    }

    private final Flow<Boolean> findMissingChatsGetAndStore(long j, Collection<Long> collection) {
        return FlowKt.flatMapConcat(this.repositories.dialogs().getMissingGroupChats(j, collection), new RealtimeMessagesProcessor$findMissingChatsGetAndStore$1(this, j, null));
    }

    private final Flow<List<Long>> findMissingOwnerIds(long j, VKOwnIds vKOwnIds) {
        return new CombineKt$zipImpl$$inlined$unsafeFlow$1(this.repositories.owners().getMissingCommunityIds(j, vKOwnIds.getGids()), this.repositories.owners().getMissingUserIds(j, vKOwnIds.getUids()), new RealtimeMessagesProcessor$findMissingOwnerIds$1(null));
    }

    private final Flow<Boolean> findMissingOwnersGetAndStore(long j, VKOwnIds vKOwnIds) {
        return FlowKt.flatMapConcat(findMissingOwnerIds(j, vKOwnIds), new RealtimeMessagesProcessor$findMissingOwnersGetAndStore$1(this, j, null));
    }

    private final boolean hasInQueueOrCurrent(int i) {
        synchronized (this.stateLock) {
            Entry entry = this.current;
            if (entry != null && entry.has(i)) {
                return true;
            }
            Iterator<Entry> it = this.queue.iterator();
            while (it.hasNext()) {
                if (it.next().has(i)) {
                    return true;
                }
            }
            Unit unit = Unit.INSTANCE;
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Flow<Boolean> identifyMissingObjectsGetAndStore(TmpResult tmpResult) {
        Companion companion = Companion;
        VKOwnIds ownIds$app_fenrir_kateRelease = companion.getOwnIds$app_fenrir_kateRelease(tmpResult);
        Set<Long> chatIds$app_fenrir_kateRelease = companion.getChatIds$app_fenrir_kateRelease(tmpResult);
        long accountId = tmpResult.getAccountId();
        Flow<Boolean> emptyTaskFlow = CoroutinesUtils.INSTANCE.emptyTaskFlow();
        if (ownIds$app_fenrir_kateRelease.nonEmpty()) {
            emptyTaskFlow = FlowKt.flatMapConcat(emptyTaskFlow, new RealtimeMessagesProcessor$identifyMissingObjectsGetAndStore$$inlined$andThen$1(findMissingOwnersGetAndStore(accountId, ownIds$app_fenrir_kateRelease), null));
        }
        return (chatIds$app_fenrir_kateRelease == null || chatIds$app_fenrir_kateRelease.isEmpty()) ? emptyTaskFlow : FlowKt.flatMapConcat(emptyTaskFlow, new RealtimeMessagesProcessor$identifyMissingObjectsGetAndStore$$inlined$andThen$2(findMissingChatsGetAndStore(accountId, chatIds$app_fenrir_kateRelease), null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onProcessError(Throwable th) {
        th.printStackTrace();
        PersistentLogger persistentLogger = PersistentLogger.INSTANCE;
        String simpleName = Reflection.getOrCreateKotlinClass(RealtimeMessagesProcessor.class).getSimpleName();
        if (simpleName == null) {
            simpleName = "";
        }
        persistentLogger.logThrowable(simpleName, th);
        resetCurrent();
        startIfNotStarted();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onResultReceived(long j, TmpResult tmpResult) {
        long currentTimeMillis = System.currentTimeMillis() - j;
        Logger.INSTANCE.d(TAG, "SUCCESS, data: " + tmpResult + ", time: " + currentTimeMillis);
        sendNotifications(tmpResult);
        CoroutinesUtils coroutinesUtils = CoroutinesUtils.INSTANCE;
        MutableSharedFlow<TmpResult> mutableSharedFlow = this.publishSubject;
        DefaultScheduler defaultScheduler = Dispatchers.Default;
        BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope(DefaultIoScheduler.INSTANCE), null, null, new CoroutinesUtils$myEmit$1(mutableSharedFlow, tmpResult, null), 3);
        resetCurrent();
        startIfNotStarted();
    }

    private final boolean prepareForStartFirst() {
        synchronized (this.stateLock) {
            if (this.current == null && !this.queue.isEmpty()) {
                this.current = this.queue.remove(0);
                return true;
            }
            return false;
        }
    }

    private final void resetCurrent() {
        synchronized (this.stateLock) {
            this.current = null;
            Unit unit = Unit.INSTANCE;
        }
    }

    private final void sendNotifications(TmpResult tmpResult) {
        Message message;
        for (TmpResult.Msg msg : tmpResult.getData()) {
            if (!msg.isAlreadyExists() && (message = msg.getMessage()) != null && isNotificationIntercepted(tmpResult.getAccountId(), message.getPeerId())) {
                LongPollNotificationHelper.INSTANCE.notifyAbountNewMessage(this.app, message);
            }
        }
    }

    private final void startIfNotStarted() {
        Entry entry;
        if (prepareForStartFirst()) {
            synchronized (this.stateLock) {
                entry = this.current;
                Unit unit = Unit.INSTANCE;
            }
            long currentTimeMillis = System.currentTimeMillis();
            Boolean valueOf = entry != null ? Boolean.valueOf(entry.isIgnoreIfExists()) : null;
            if (entry != null) {
                CoroutinesUtils coroutinesUtils = CoroutinesUtils.INSTANCE;
                final SafeFlow safeFlow = new SafeFlow(new RealtimeMessagesProcessor$startIfNotStarted$lambda$5$$inlined$toFlow$1(entry, null));
                BuildersKt.launch$default(NotificationScheduler.INSTANCE.getINSTANCE(), null, null, new RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$fromScopeToMain$1(FlowKt.flatMapConcat(FlowKt.flatMapConcat(new Flow<TmpResult>() { // from class: dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1

                    /* compiled from: Emitters.kt */
                    /* renamed from: dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2, reason: invalid class name */
                    /* loaded from: classes2.dex */
                    public static final class AnonymousClass2<T> implements FlowCollector {
                        final /* synthetic */ FlowCollector $this_unsafeFlow;

                        @DebugMetadata(c = "dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2", f = "RealtimeMessagesProcessor.kt", l = {50}, m = "emit")
                        /* renamed from: dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2$1, reason: invalid class name */
                        /* loaded from: classes2.dex */
                        public static final class AnonymousClass1 extends ContinuationImpl {
                            int I$0;
                            Object L$0;
                            Object L$1;
                            Object L$2;
                            Object L$3;
                            int label;
                            /* synthetic */ Object result;

                            public AnonymousClass1(Continuation continuation) {
                                super(continuation);
                            }

                            @Override // kotlin.coroutines.jvm.internal.BaseContinuationImpl
                            public final Object invokeSuspend(Object obj) {
                                this.result = obj;
                                this.label |= Integer.MIN_VALUE;
                                return AnonymousClass2.this.emit(null, this);
                            }
                        }

                        public AnonymousClass2(FlowCollector flowCollector) {
                            this.$this_unsafeFlow = flowCollector;
                        }

                        /* JADX WARN: Removed duplicated region for block: B:15:0x0038  */
                        /* JADX WARN: Removed duplicated region for block: B:8:0x0021  */
                        @Override // kotlinx.coroutines.flow.FlowCollector
                        /*
                            Code decompiled incorrectly, please refer to instructions dump.
                            To view partially-correct add '--show-bad-code' argument
                        */
                        public final java.lang.Object emit(java.lang.Object r13, kotlin.coroutines.Continuation r14) {
                            /*
                                r12 = this;
                                boolean r0 = r14 instanceof dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1.AnonymousClass2.AnonymousClass1
                                if (r0 == 0) goto L13
                                r0 = r14
                                dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2$1 r0 = (dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1.AnonymousClass2.AnonymousClass1) r0
                                int r1 = r0.label
                                r2 = -2147483648(0xffffffff80000000, float:-0.0)
                                r3 = r1 & r2
                                if (r3 == 0) goto L13
                                int r1 = r1 - r2
                                r0.label = r1
                                goto L18
                            L13:
                                dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2$1 r0 = new dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2$1
                                r0.<init>(r14)
                            L18:
                                java.lang.Object r14 = r0.result
                                kotlin.coroutines.intrinsics.CoroutineSingletons r1 = kotlin.coroutines.intrinsics.CoroutineSingletons.COROUTINE_SUSPENDED
                                int r2 = r0.label
                                r3 = 1
                                if (r2 == 0) goto L38
                                if (r2 != r3) goto L30
                                java.lang.Object r13 = r0.L$3
                                kotlinx.coroutines.flow.FlowCollector r13 = (kotlinx.coroutines.flow.FlowCollector) r13
                                java.lang.Object r13 = r0.L$1
                                dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1$2$1 r13 = (dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1.AnonymousClass2.AnonymousClass1) r13
                                kotlin.ResultKt.throwOnFailure(r14)
                                goto Ld2
                            L30:
                                java.lang.IllegalStateException r13 = new java.lang.IllegalStateException
                                java.lang.String r14 = "call to 'resume' before 'invoke' with coroutine"
                                r13.<init>(r14)
                                throw r13
                            L38:
                                kotlin.ResultKt.throwOnFailure(r14)
                                kotlinx.coroutines.flow.FlowCollector r14 = r12.$this_unsafeFlow
                                dev.ragnarok.fenrir.realtime.Entry r13 = (dev.ragnarok.fenrir.realtime.Entry) r13
                                dev.ragnarok.fenrir.realtime.TmpResult r2 = new dev.ragnarok.fenrir.realtime.TmpResult
                                int r4 = r13.getId()
                                long r5 = r13.getAccountId()
                                int r7 = r13.count()
                                r2.<init>(r4, r5, r7)
                                dev.ragnarok.fenrir.longpoll.FullAndNonFullUpdates r4 = r13.getUpdates()
                                boolean r5 = r4.hasFullMessages()
                                kotlin.collections.EmptyList r6 = kotlin.collections.EmptyList.INSTANCE
                                if (r5 == 0) goto L89
                                java.util.List r5 = r4.getFullMessages()
                                if (r5 != 0) goto L63
                                r5 = r6
                            L63:
                                java.util.Iterator r5 = r5.iterator()
                            L67:
                                boolean r7 = r5.hasNext()
                                if (r7 == 0) goto L89
                                java.lang.Object r7 = r5.next()
                                dev.ragnarok.fenrir.api.model.longpoll.AddMessageUpdate r7 = (dev.ragnarok.fenrir.api.model.longpoll.AddMessageUpdate) r7
                                int r8 = r7.getMessageId()
                                dev.ragnarok.fenrir.realtime.TmpResult$Msg r8 = r2.add(r8)
                                dev.ragnarok.fenrir.domain.mappers.Dto2Model r9 = dev.ragnarok.fenrir.domain.mappers.Dto2Model.INSTANCE
                                long r10 = r13.getAccountId()
                                dev.ragnarok.fenrir.api.model.VKApiMessage r7 = r9.transform(r10, r7)
                                r8.setDto(r7)
                                goto L67
                            L89:
                                boolean r5 = r4.hasNonFullMessages()
                                if (r5 == 0) goto Lbd
                                java.util.List r4 = r4.getNonFull()
                                if (r4 != 0) goto L96
                                goto L97
                            L96:
                                r6 = r4
                            L97:
                                java.util.Iterator r4 = r6.iterator()
                            L9b:
                                boolean r5 = r4.hasNext()
                                if (r5 == 0) goto Lbd
                                java.lang.Object r5 = r4.next()
                                dev.ragnarok.fenrir.api.model.longpoll.AddMessageUpdate r5 = (dev.ragnarok.fenrir.api.model.longpoll.AddMessageUpdate) r5
                                int r6 = r5.getMessageId()
                                dev.ragnarok.fenrir.realtime.TmpResult$Msg r6 = r2.add(r6)
                                dev.ragnarok.fenrir.domain.mappers.Dto2Model r7 = dev.ragnarok.fenrir.domain.mappers.Dto2Model.INSTANCE
                                long r8 = r13.getAccountId()
                                dev.ragnarok.fenrir.api.model.VKApiMessage r5 = r7.transform(r8, r5)
                                r6.setBackup(r5)
                                goto L9b
                            Lbd:
                                r13 = 0
                                r0.L$0 = r13
                                r0.L$1 = r13
                                r0.L$2 = r13
                                r0.L$3 = r13
                                r13 = 0
                                r0.I$0 = r13
                                r0.label = r3
                                java.lang.Object r13 = r14.emit(r2, r0)
                                if (r13 != r1) goto Ld2
                                return r1
                            Ld2:
                                kotlin.Unit r13 = kotlin.Unit.INSTANCE
                                return r13
                            */
                            throw new UnsupportedOperationException("Method not decompiled: dev.ragnarok.fenrir.realtime.RealtimeMessagesProcessor$startIfNotStarted$lambda$9$$inlined$map$1.AnonymousClass2.emit(java.lang.Object, kotlin.coroutines.Continuation):java.lang.Object");
                        }
                    }

                    @Override // kotlinx.coroutines.flow.Flow
                    public Object collect(FlowCollector<? super TmpResult> flowCollector, Continuation continuation) {
                        Object collect = Flow.this.collect(new AnonymousClass2(flowCollector), continuation);
                        return collect == CoroutineSingletons.COROUTINE_SUSPENDED ? collect : Unit.INSTANCE;
                    }
                }, new RealtimeMessagesProcessor$startIfNotStarted$3$2(this, null)), new RealtimeMessagesProcessor$startIfNotStarted$3$3(valueOf, this, null)), null, this, this, currentTimeMillis), 3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Function2<TmpResult, Continuation<? super Flow<TmpResult>>, Object> storeToCacheAndReturn() {
        return new RealtimeMessagesProcessor$storeToCacheAndReturn$1(this, null);
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public boolean isNotificationIntercepted(long j, long j2) {
        for (Map.Entry<Long, Pair<Long, Long>> entry : this.notificationsInterceptors.entrySet()) {
            if (entry.getValue().getFirst().longValue() == j && entry.getValue().getSecond().longValue() == j2) {
                return false;
            }
        }
        return true;
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public SharedFlow<TmpResult> observeResults() {
        return this.publishSubject;
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public int process(long j, int i, long j2, int i2, boolean z) throws QueueContainsException {
        if (hasInQueueOrCurrent(i)) {
            throw new QueueContainsException();
        }
        Logger.INSTANCE.d(TAG, "Register entry, aid: " + j + ", mid: " + i + ", ignoreIfExists: " + z);
        int incrementAndGet = ID_GENERATOR.incrementAndGet();
        Entry entry = new Entry(j, incrementAndGet, z);
        entry.append(i);
        addToQueue(entry);
        startIfNotStarted();
        return incrementAndGet;
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public int process(long j, List<AddMessageUpdate> updates) {
        Intrinsics.checkNotNullParameter(updates, "updates");
        int incrementAndGet = ID_GENERATOR.incrementAndGet();
        Entry entry = new Entry(j, incrementAndGet, false);
        Iterator<AddMessageUpdate> it = updates.iterator();
        while (it.hasNext()) {
            entry.append(it.next());
        }
        addToQueue(entry);
        startIfNotStarted();
        return incrementAndGet;
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public void registerNotificationsInterceptor(long j, Pair<Long, Long> aidPeerPair) {
        Intrinsics.checkNotNullParameter(aidPeerPair, "aidPeerPair");
        this.notificationsInterceptors.put(Long.valueOf(j), aidPeerPair);
    }

    @Override // dev.ragnarok.fenrir.realtime.IRealtimeMessagesProcessor
    public void unregisterNotificationsInterceptor(long j) {
        this.notificationsInterceptors.remove(Long.valueOf(j));
    }
}
