package tv.twitch.android.core.pubsub;

import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.subjects.PublishSubject;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import kotlin.jvm.internal.Intrinsics;
import tv.twitch.android.core.crashreporter.CrashReporterUtil;
import tv.twitch.android.core.user.TwitchAccountManager;
import tv.twitch.android.sdk.GenericSubscriberListener;
import tv.twitch.android.sdk.IPubsubController;
import tv.twitch.android.util.LogArg;
import tv.twitch.android.util.RxHelperKt;

@Singleton
/* loaded from: classes5.dex */
public final class PubSubController {
    private final TwitchAccountManager accountManager;
    private final Map<PubSubResourceTopic, PublishSubject<String>> activeTopics;
    private final Gson gson;
    private final IPubsubController pubsubController;

    @Inject
    public PubSubController(IPubsubController pubsubController, TwitchAccountManager accountManager, GsonPubSubFactory gsonFactory) {
        Intrinsics.checkNotNullParameter(pubsubController, "pubsubController");
        Intrinsics.checkNotNullParameter(accountManager, "accountManager");
        Intrinsics.checkNotNullParameter(gsonFactory, "gsonFactory");
        this.pubsubController = pubsubController;
        this.accountManager = accountManager;
        this.activeTopics = new LinkedHashMap();
        this.gson = gsonFactory.createGsonInstance();
    }

    public final Gson getGson() {
        return this.gson;
    }

    public final void maybeUnsubscribeFromTopic(PubSubResourceTopic topic) {
        Intrinsics.checkNotNullParameter(topic, "topic");
        PublishSubject<String> publishSubject = this.activeTopics.get(topic);
        if (publishSubject == null || publishSubject.hasObservers()) {
            return;
        }
        this.activeTopics.remove(topic);
        this.pubsubController.disconnectGenericTopicListener(topic.getSubscription());
    }

    public final PublishSubject<String> registerTopicSubject(PubSubResourceTopic topic, int i) {
        Intrinsics.checkNotNullParameter(topic, "topic");
        final PublishSubject<String> create = PublishSubject.create();
        Intrinsics.checkNotNullExpressionValue(create, "PublishSubject.create<String>()");
        this.pubsubController.connectGenericTopicListener(topic.getSubscription(), i, new GenericSubscriberListener() { // from class: tv.twitch.android.core.pubsub.PubSubController$registerTopicSubject$listener$1
            @Override // tv.twitch.android.sdk.GenericSubscriberListener
            public void eventTopicData(String data) {
                Intrinsics.checkNotNullParameter(data, "data");
                PublishSubject.this.onNext(data);
            }
        });
        this.activeTopics.put(topic, create);
        return create;
    }

    public final <T> Flowable<T> subscribeToTopic(final PubSubResourceTopic topic, final Class<T> objectType) {
        Intrinsics.checkNotNullParameter(topic, "topic");
        Intrinsics.checkNotNullParameter(objectType, "objectType");
        if (topic.getRequiresAuth() && !this.accountManager.isLoggedIn()) {
            CrashReporterUtil.INSTANCE.throwDebugAndLogProd(R$string.pubsub_logged_out_subscribe, new LogArg.Unsafe(topic.getTopicName()));
            Flowable<T> empty = Flowable.empty();
            Intrinsics.checkNotNullExpressionValue(empty, "Flowable.empty()");
            return empty;
        }
        PublishSubject<String> publishSubject = this.activeTopics.get(topic);
        if (publishSubject == null) {
            publishSubject = registerTopicSubject(topic, this.accountManager.getUserId());
        }
        Observable<R> flatMap = publishSubject.flatMap(new Function<String, ObservableSource<? extends T>>() { // from class: tv.twitch.android.core.pubsub.PubSubController$subscribeToTopic$1
            @Override // io.reactivex.functions.Function
            public final ObservableSource<? extends T> apply(String it) {
                Intrinsics.checkNotNullParameter(it, "it");
                try {
                    return Observable.just(PubSubController.this.getGson().fromJson(it, (Class) objectType));
                } catch (JsonParseException unused) {
                    return Observable.empty();
                }
            }
        });
        Intrinsics.checkNotNullExpressionValue(flatMap, "subject\n            .fla…          }\n            }");
        Flowable<T> doFinally = RxHelperKt.flow(flatMap).doOnError(new Consumer<Throwable>() { // from class: tv.twitch.android.core.pubsub.PubSubController$subscribeToTopic$2
            @Override // io.reactivex.functions.Consumer
            public final void accept(Throwable throwable) {
                CrashReporterUtil crashReporterUtil = CrashReporterUtil.INSTANCE;
                Intrinsics.checkNotNullExpressionValue(throwable, "throwable");
                crashReporterUtil.logNonFatalException(throwable, R$string.failed_to_correctly_map_json_data_from_pubsub_topic);
            }
        }).doFinally(new Action() { // from class: tv.twitch.android.core.pubsub.PubSubController$subscribeToTopic$3
            @Override // io.reactivex.functions.Action
            public final void run() {
                PubSubController.this.maybeUnsubscribeFromTopic(topic);
            }
        });
        Intrinsics.checkNotNullExpressionValue(doFinally, "subject\n            .fla…opic(topic)\n            }");
        return doFinally;
    }
}
