One of ZooKeeper’s nice features is the ability to set up a watch on a node, and be updated whenever it changes.
This is a multi-part series:
1. ephemeral nodes, 2. observable<zookeeper>, 3. where is zookeeper?, 4. .net api and tasks, 5. c# iobservable
One of ZooKeeper’s nice features is the ability to set up a watch on a node, and be updated whenever it changes. The Java API (usable from .NET) is straightforward, if a bit verbose.
You can getData for a node (or just check that it exists), and register a Watcher that will be pinged if the node changes.
Like many remote observables, ZooKeeper only gives you one update. If you want to get more updates, you have to re-watch the node. (Oracle continuous query notifications work the same way.) It’s a defensive mechanism so that the server isn’t swamped with lots of callback registrations if clients go missing.
As before, we wanted to translate this very imperative, fairly verbose, model into a more composable model. We want a query that looks more like
The IObservable pattern returns a sequence of results that may terminate in success or with an error.
As with our previous getData example, there are some fairly obvious semantics: we should raise Maybe.Empty if the node does not exist (or is deleted), and Maybe.Return<byte[]> if the node comes into existence or changes values. But what about starting and ending the stream?
We decided there was no obvious reason the observable stream should end, since nodes can be deleted and recreated at will. We can leave that decision to the client of this API, who could TakeUntil(m => m.IsEmpty) if she felt like it, or unsubscribe when the user no longer cared to observe the value (such as a web socket watching a progress bar).
How should the stream start? A typical observable stream only emits values when something changes, such as a new stock quote, or key being pressed. But it would be frustrating to have to first do a getData call, and follow it with a watchData if you needed to know the current value of something and watch for changes. So we decided that the stream would immediately return with the current value of getData, e.g. empty or a byte array, then follow it with the result of changes.
We have lots to do at once
Let’s do it all from one object, and kick everything off in the constructor, as before
Just as with Tasks, there’s no natural IObservable in the ZooKeeper API that we can work with. Instead, we implement our own. In general I’ve noticed that directly using an ISubject — tempting in this case — is an anti-pattern. It usually results in a bunch of race conditions and lifetime management issues that are complicated. It’s best if you just take an IObserver and call methods on it directly, managing the IObservable implementation by using the Rx framework’s Observable.Create. Assuming the watcher works correctly, you use it quite simply, like this:
Every time we call WatchData, we return an object which can start a query. The query won’t actually start until someone subscribes to the observable, triggering the call to the lambda supplied, which creates the ZookeeperNodeDataChangeWatcher we’ll describe in a bit.
We want to both read data and register for watches. The getData will get data and register a watch. The exists call simply establishes a watch on a node — it doesn’t matter why we watch it, we will get events whenever it changes. As we did previously, we pass this as the callback object for both calls, which will be used for both the asynchronous result and the watch updates.
It’s a slightly tedious state machine to manage, but we just soldier through the cases. First, this callback comes from our first async getData, i.e. as soon as we start.
Now whether we’ve found data or an empty node, we are subscribed for further events. Subscribing for further events was done by calling the exists API, which also needs an asynchronous callback
And finally, after doing all that work to sign up for watches, here’s where we’re told something has happened:
While none of this code is particularly complicated, it comes in so many varieties in the underlying ZK API, it’s easy to get confused and implement lots of callbacks. By registering all callbacks on one object and re-subscribing for watches on new events, we roll up enormous complexity to a very simple and composable interface. In a sense, the ZK API has spilled its guts into its API, and we’ve tried to stitch them back together.
What about the various calls to IsDisposed in the code above? What was that about? Observable.Create expects to receive a IDisposable object to do its work. The created Observable will call the Dispose method when the caller unsubscribes. This allows us to remove the watch and stop taking up server resources. As you see above, this is straightforward to do in some cases: we just don’t register for another watch when we get a watch update if we have been disposed. But what if a watch is already outstanding and we haven’t received a callback yet?
There’s no API to unregister interest in a watch. It’s a known bug/feature request, but wasn’t released when we wrote the code above. This means excessive querying of nodes with this feature will result in a memory leak in the server and client. We managed that with some IOC Container trickery, which we should really describe sometime as well.
You’ll also notice there’s no code to manage multi-threaded access to the member variables of our watcher, or calls to the supplied IObserver. This lovely state of affairs is due to the design choices we and the Zookeeper team made:
Our C# programmers found this interface much easier to work with than the primitives exposed by the Java API, mostly because it rolled up common behaviors in composable monads like Task and IObservable. This is one of the reasons a simple SWIG-like generated interface doesn’t always do the trick when using APIs from a different language. The verbose, callable-object paradigm that ZK found convenient to expose to C and Java programmers wasn’t very like modern C# programmers expect, and made it hard to re-use the massive leverage available in libraries like the TPL and Reactive Extensions. It takes extra work to make language-specific bindings in this way, but the work is worth it.
Someone in the node.js ecosystem did a similar wrapper (zkplus) for ZooKeeper there. He put it best:
If you haven’t worked with the ZooKeeper API directly, and question the value of this project, do so, then get back to me.
Tell us what you need and one of our experts will get back to you.