1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class Sink<P: PublisherType>: Disposable { private var _disposed: Bool = false private let _forward: P private let _eventGenerator: (Publisher<P.Element>) -> Disposable private let _compositeDisposable = CompositeDisposable() init(forward: P, eventGenerator: @escaping (Publisher<P.Element>) -> Disposable) { _forward = forward _eventGenerator = eventGenerator } func run() { let publisher = Publisher<P.Element>(forward) _compositeDisposable.add(disposable: _eventGenerator(publisher)) } private func forward(event: Event<P.Element>) { guard !_disposed else { return } _forward.pub(event: event) switch event { case .error(_), .finish: dispose() default: break } } func dispose() { _disposed = true _compositeDisposable.dispose() } }
|