Hacker Newsnew | past | comments | ask | show | jobs | submitlogin

Hey, please consider my rants to be of good faith, plus I'm a rookie :-)

On Rx, the step method is synchronous. It works for the original Rx protocol and it works for RxJS because RxJS has implemented backpressure with the suspend/resume model (e.g. in the observer you get a handle with which you can suspend or resume the data source), but otherwise the model is pretty much like the original, in that the source calls `onNext(elem)` continuously until it has nothing left, so it is pretty much compatible with transducers. RxJava started as a clone of Rx.NET, but is now migrating to the Reactive Streams protocol, which means that the observer gets a `request(n)` function and a `cancel()`.

And this changes everything because `request(n)` and `cancel()` have asynchronous behavior, very much unlike the step function in transducers. And furthermore, implementing operators that are asynchronous in nature requires juggling with `request(n)` and with buffering. For example the classic flatMap (or concatMap as called in Rx) requires to `request(1)`, then subscribe to the resulting stream and only after we're done we need to request the next element in the original stream.

Then there's the issue that the protocol can be dirty for practical reasons. For example `onComplete` can be sent without the observer doing a `request(n)`, so no back-pressure can happen for that final notification and if the logic in the last onNext is happening asynchronously, well this can mean concurrency issues even for simple operators. Although truth be told for such cases there isn't much that the transducers stuff can do.

I might be wrong, I'd love to see how people can implement this stuff on top of transducers.



Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: