My Adventure with NATS
NATS likes to call itself the "central nervous system of distributed computing" but really it's just a modern take on traditional message queues such as Kafka, RabbitMQ, and so on. Still, the design principles and the configurability of NATS made it a compelling choice for a recent project I worked on. Unfortunately, the hyper-flexibility of NATS also makes it harder to implement correctly when you have little experience with it, and so if anyone else is looking to get started with NATS then I hope this helps to avoid some pitfalls.
Why NATS?
Let me set the scene: the company was all-in with Google Cloud, we already had Kafka and Cloud Pub/Sub message streams for other parts of our system, and the application being developed needed an at-least-once delivery guarantee for at least a subset of incoming messages. Additionally, I wanted to use a push-subscription so that I could simplify my consumer to just accept messages over an HTTP endpoint or a TCP connection.
Technical Note: for those unfamiliar with message queue integrations, a push-subscription refers to passively "listening" for incoming messages. That means you have no control over which messages you receive, how often you receive them, or in which order you receive them. A pull-subscription on the other hand means your application code has to explicitly request a new batch of messages from the queue, which gives developers significantly more flow control but requires a bit more in-application management. Modern libraries make the effort low but I was naively hoping to get away with the lowest effort possible with this particular choice.
Cloud Pub/Sub push-based subscriptions require a named URI, i.e. it won't accept an internal IP address. I didn't want to spend the time trying to get proper DNS resolution to happen from Cloud Pub/Sub to a service running inside Kubernetes (via GKE), so I was planning to go with Kafka. But Kafka, in the configuration we had it at the time, couldn't give me true at-least-once delivery guarantees. Therefore I decided to run a proof-of-concept build using NATS.
NATS has a very simple-to-use set of Helm charts and I was able to quickly set up and start using NATS from my application within a single day. I also installed a Kubernetes Controller for NATS which meant I could maintain my streams and consumers as Kubernetes manifests, effectively allowing me to codify the entire NATS ecosystem using an existing configuration scheme.
But what about the messages already ending up in Cloud Pub/Sub and Kafka? I had two small consumers for each of those brokers that acted as forwarding agents. On the Kafka pipeline there was also a bit of data transformation happening, but primarily these consumers just took a message and passed it along to NATS as quickly as possible.
It was easy, it was effective, it was friendly. At least, it was for a while.
Starting Strong
In the beginning, NATS was incredible to work with. The application was written in Elixir and the official library for it was a little lacking, but overall it was a simple text-over-TCP communication protocol and all of the necessary abstractions and interfaces worked exactly as necessary to get push-subscriptions working.
Although pull-subscriptions were supported by the library, a relatively new feature called Jetstream was not natively supported in the Elixir library. This meant I could not pull messages from a Jetstream Consumer resource without writing a fair bit of custom code. Since I wanted push-subscriptions anyway this seemed like a no-brainer. But, spoiler alert, I would end up wanting pull-subscriptions later on and the lack of support for this with Jetstream Consumers delayed my attempts to transition which ended up exacerbating my problems.
But I'm getting ahead of myself. We had a very strong start using pull-subscriptions. Everything seemed to be going great until we had a significantly larger-than-typical message throughput occur and everything went awful.
Flexible... In a Dangerous Way
There are lots of ways you can fine-tune your NATS resources. But really the biggest change to behavior comes from choosing push or pull consumers. The documentation didn't explicitly state this at the time I was initially getting started with Jetstream, but now it includes this handy snippet:
We recommend pull consumers for new projects. In particular when scalability, detailed flow control or error handling are a concern.
Even though push consumers technically are "simpler" in how you integrate them with your application code, pull consumers are still suggested for new projects. Although it doesn't explicitly say so (but it should), my assertion after using NATS is that pull consumers are recommended because the default configuration options will hurt you less when things go wrong. You can react to problems with greater flexibility.
Also notice that it says when scalability is a concern. Well, this definitely needed to scale dynamically for me, so obviously this would have been a great note to have had prominent within the documentation when Jetstream was first made generally available.
Alright, so, I should have gone with pull consumers but didn't. I guess at the very least I can say that I've learned a lot more about NATS than I would have liked to in a relatively short amount of time trying to debug all the compounding issues when problems arose. And let's start with the first problem: dropping new messages even when I had explicitly set the discard policy to drop old messages first and prioritize storing new messages.
Always Set Storage Limits
Maximum storage limits can be configured for message age, disk space, or total number of messages. Retention policy can either be set solely to the maximum storage limits (always store new messages until you hit one of the configured limits), interest based on consumers, or "work queue" which is complicated and not something I used so I won't discuss it directly. However, the big thing to keep in mind here is: if you don't configure a maximum storage limit, that's equivalent to saying "don't accept new messages when you hit a natural limit".
In reality, there's only one "natural limit" that you would need to worry about if you didn't explicitly configure any of the storage limits: disk space. It's totally possible to run out of disk space if your messages become uncharacteristically backed up. And if you run out of disk space, NATS just throws out incoming messages. Publishers, to my knowledge, weren't even receiving a visible error. This meant a message would be published, it looked successfully, we moved on, and then NATS immediately deleted it with no chance of consumers ever seeing it.
Apparently this is "intended behavior" and the maintainers of NATS explicitly told me that there's no point in even changing the documentation to be more clear on this subject because it's "obvious". But you can choose your discard policy to be either new or old messages, and even though the new messages were being dropped rather than discarded, the end result was equivalent to the "discard new" policy. And I was using the default "discard old" policy. I explained as much but my complaints fell on deaf ears.
In short: if you want your discard policy to be respected, you must set at least one maximum storage limit option explicitly. You specifically should be setting MaxBytes
at the very least.
Be Careful With Wildcards
The reason why I had run out of disk space was because of a bug with how changing the subject filter on a Consumer interacted with an interest-based retention policy.
For me, I liked the idea of an interest-based retention policy because it meant that messages were only deleted from the Stream when all attached Consumers had acknowledged it. Of course, this meant that the Stream needed to know which Consumers were using an explicit ack policy for which subjects so that it could keep track of when to delete a message. Seemed like the obvious choice, don't delete a message until all consumers have seen it, cool.
Unfortunately, I noticed that I was using a wildcard on the Consumer subject filter and, because my application code was ignoring subjects it didn't need to process, there was a ton of messages sitting in storage that would never get proactively deleted. And that's why I eventually hit my disk space limitations and then saw my discard policy wasn't being applied and found out why that is.
I had tried to instead change from a wildcard filter to a finite list of explicit subjects that I knew my code would actually try process. But that didn't do anything. It ended up being a bug which I investigated on behalf of NATS and it was eventually fixed.
Even though this is fixed and you could change the subject filter on a consumer and have message interest updated accordingly in the Stream, I would still recommend avoiding wildcards at the Consumer level. Wildcards on Streams, totally acceptable, although keep in mind that a limits-based retention policy will store everything that matches the Stream subject filter. For interest and work queue retention policies, its the Consumer subject filters which are most important.
You Probably Need Pull Consumers
For any consumers which have an explicit ack policy, the MaxAckPending
option is used, which defaults to 1,000. I'll just copy/paste the description here:
Defines the maximum number of messages, without an acknowledgement, that can be outstanding. Once this limit is reached message delivery will be suspended. This limit applies across all of the consumer's bound subscriptions.
You can technically set this to -1
which then means no upper bound limit. However, I was not able to edit this value to -1
after it had been set to a value greater than zero. In theory, the option should accept -1
after the consumer has been created, but I think it was just a bug that I couldn't do so. And that ended up being highly problematic.
You see, even if you set this value very high, you have to consider how quickly your code will run into messages it can't process that will end up in a pending state for the ack timeout duration. I had my timeout set to like 5 seconds, but I could easily deliver 10k messages across all of my consumer instances in under 5 seconds. Even at 100k pending acks I was reaching ~80k pending acks at peak.
But how would a pull consumer help with this, you might be wondering? With a push consumer the MaxAckPending
option is the only form of flow control. Nothing the application does can perform flow control. In a push consumer, the application itself can slow down and be more selective in how it proceeds when individuals instances are overwhelmed.
In my case, my application code was redirecting a huge number of messages to a single instance, which was causing every consumer instance that talked to the one over-loaded instance to effectively timeout. Which meant a growing list of pending acks because Jetstream wasn't explicitly supported in the official library and so I didn't have a simple way to explicitly nack messages at this time.
Once I had the ability to explicitly nack messages, that definitely helped, but still had a ton of messages timing out due to the bottleneck of a single "power user" within our system. But there was one more thing that I should have considered sooner...
More Streams and Consumers, Not Fewer
With NATS, it is actually more beneficial to have more streams and more consumers, not fewer. Even if two kind of messages seem similar, you might actually want separate consumers. Streams are good for gathering similar messages but consumers are good for separating the consumption logic for a set of messages.
In my case, I had two similar but distinct events being triggered, and one of them was way more important to handle than the other. But I had a single consumer and therefore the less-frequent-but-more-important messages were getting unnecessarily buried under the far-more-frequent-yet-less-important messages. Once I split them into two separate consumers, my problems became significantly more manageable.
With a second consumer in place, I could also use the push consumer flow control via MaxAckPending
to my advantage: I had no limit on how many important messages could be pending acknowledgement and this was fine because I could guarantee there would never be enough, even during a 100x peak burst, to overload the pipeline in the same way the other type of event was doing. And then I could actually significantly lower then MaxAckPending
for the original consumer so that I could avoid overwhelming my application instances during large bursts.
I also had idempotency logic built into the application code, so I was getting exactly-once behavior despite using an at-least-once delivery guarantee. This meant that I could lower the AckWait
option to expire pending messages more quickly and I could be certain that I would never incorrectly process the same message twice. So even if 70% of the pending messages were for a single user, it would only wait one second at most before moving onto the next batch of messages.
Or Just Use Cloud Pub/Sub
Honestly, after all of this was done I just shrugged and looked at migrating to Cloud Pub/Sub. Since pull consumers are preferred for greater flow control and scalability, I could do pull subscriptions via Cloud Pub/Sub. But then I wouldn't need to worry about things like running out of disk space and dropping incoming messages, or constantly tweaking MaxAckPending
to figure out what worked best for me, or implementing explicit nack replies to work around throughput problems.
And Broadway has an official Cloud Pub/Sub integration for pull subscriptions, so it actually should have been relatively simple to do even from the onset, but I hadn't done enough research into the Broadway Cloud Pub/Sub integration and that was simply an oversight on my part. I did so much research on so many other things but somehow didn't bother looking specifically at the Cloud Pub/Sub integration.
However, I do think that NATS supplies a range of useful possibilities that you just can't get with existing message queues. NATS, for example, allows for request/reply messages, which is actually how acknowledgements are handled. Basically any request generates a unique internal subject and awaits the next message to be sent on that subject before handing that back to the publishing client and closing the reply subject.
If you're wondering why that's useful, imagine gRPC for a moment. You need to have a protobuf schema, which needs a protobuf compiler, to create language-specific protobuf application code to help with encoding/decoding messages, and you also have to create a service stub to actually invoke RPC functions, and honestly it's just a massive amount of overhead to setup and maintain that whole thing. Instead, you could just subscribe to an "RPC subject" (e.g. myapp.rpc
) and then use request/reply messages to emulate RPC functions. It's all binary data, so you can use whatever data transport format you want on top of it, even just opting for JSON strings if that's simplest.
I actually did this and it was really nice. It's more nice if you're already using NATS, because this effectively removes the need to build entirely separate integration points for internal service communication by re-using NATS and running request/reply messages.
And that's among the reasons why NATS calls itself a "connective technology that powers modern distributed systems". It's a little grandiose but the potential certainly is there. The flexibility is outstanding. But the documentation needs to be a lot more clear about combinations of configuration options play out. And there should be a lot more practical guides to look at for examples of non-trivial implementations, including the configuration of streams and consumers.
Overall though, I'm happy with NATS, and I might consider it again in the future, but I'm only using push consumers and only if we're not already using other sufficient message queues.