package net.jokubasdargis.rxbus;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;

/* loaded from: classes.dex */
final class DefaultDispatcher implements Dispatcher {
    private final Bus bus;
    private final Scheduler busScheduler;
    private final ErrorListener errorListener;
    private final Flusher flusher;
    private final Map<Station<?>, Subscription> subscriptions = new ConcurrentHashMap(8);

    /* loaded from: classes.dex */
    private static class StationSubscriber<T> extends Subscriber<T> {
        private final ErrorListener errorListener;
        private final Flusher flusher;
        private final Station<T> station;

        StationSubscriber(Station<T> station, Flusher flusher, ErrorListener errorListener) {
            this.station = station;
            this.flusher = flusher;
            this.errorListener = errorListener;
        }

        @Override // rx.Observer
        public void onCompleted() {
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
        }

        @Override // rx.Observer
        public void onNext(T t) {
            try {
                this.station.receive(t);
            } catch (Throwable th) {
                this.errorListener.onError(th);
            } finally {
                this.flusher.schedule(this.station);
            }
        }

        public String toString() {
            return "StationSubscriber{station=" + this.station + '}';
        }
    }

    private DefaultDispatcher(Bus bus, Scheduler scheduler, Flusher flusher, ErrorListener errorListener) {
        this.bus = bus;
        this.busScheduler = scheduler;
        this.flusher = flusher;
        this.errorListener = errorListener;
    }

    public static Dispatcher create(Bus bus, Scheduler scheduler, Flusher flusher, ErrorListener errorListener) {
        return new DefaultDispatcher(bus, scheduler, flusher, errorListener);
    }

    @Override // net.jokubasdargis.rxbus.Dispatcher
    public <T> void register(Queue<T> queue, Station<T> station) {
        synchronized (this.subscriptions) {
            if (!this.subscriptions.containsKey(station)) {
                this.subscriptions.put(station, this.bus.subscribe(queue, new StationSubscriber(station, this.flusher, this.errorListener), this.busScheduler));
            }
        }
    }
}
