This looks great. Are there any plans for topic-based pubsub?
With AMQP (specifically, RabbitMQ) I can set up a fanout topology such that a publisher simply publishes to an exchange. Clients then bind their queues to this exchange based on a routing key, and any messages matching the key will be copied there. If no clients have bound, messages disappear into the aether.
For example, the publisher can use the exchange "changes" to write messages with keys such as "create.post.123" or "update.user.321". A consumer can then bind a queue to the exchange with the filters "create." and "update." to get all create and update messages. A different app can listen to just ".user." to get anything related to users.
This is a really powerful tool, allowing publishers to be dumb firehoses and clients to surgically pick what they need. Of course you don't need exchanges and bindings to accomplish this, only topics.
> If no clients have bound, messages disappear into the aether
To everyone who thinks this sounds scary, don't worry. You can bind dedicated "Undelivered queues" and "Dead letter" queues to exchanges to make sure that when routing fails, you don't lose any messages.
We're using RabbitMQ in a few newer projects and it's really a joy to work with!
Let's say you have a web app that sends emails, and you have many worker processes actually sending emails. If you use something like Disque, instead of sending the email directly from the web app, you send a message to Disque, into the send_email queue: "Please send an email to foo@example.com". Workers sending emails will fetch from the send_email queue, and will get the message. Workers sending emails are supposed to acknowledged to Disque when they were actually able to process the message, so if they fail in some way, the message is re-queued and delivered again to the same or another worker, after a time you specify. So basically it adds a reliable middle-layer between the different moving parts of your application. The user will receive its email, or during failures it may receive the email multiple times, but it can never happen that no email is received by the user, since Disque guarantees that the message will be removed only once acknowledged.
It depends on the use-case. You may want the request to your web server to wait for a response from your transaction processor (e.g. Stripe) before returning.
That shouldn't be done in a web request. Not only can it lead to wasted time in the request handler, but it can lead to race conditions like double purchases.
> it can lead to race conditions like double purchases
"Let's return as soon as possible rather than blocking" is not the real way to handle race conditions. All you are doing is reducing the surface (time) that such a thing can happen in. It makes more sense to plan these things out using something else (e.g. locking).
How do you implement "no really, this request should not return until the server has successfully completed external communication"?
You can do some sort of websocket-y mechanism to tell the client that the server has completed a previous request, or polling, but both of those bring a lot of infrastructure overhead.
How are you architecting things that an upstream error will cause your server to die?
Look, in both cases, the request needs to be synchronous. You need to tell the customer that their purchase went through. If you're going to add a whole other bus into the thing and make it asynchronous and have an elaborate polling solution on the client, that's fine, but I would (and do) just have them wait the two extra seconds and either say "your card has been charged" or handle the rare case with "there has been an upstream error, please don't close this window while we retry the request".
If Stripe is your payment processor, how do you propose that one should process payments if Stripe is down? How does making payment processing asynchronous help? Why would your entire server die because Stripe's API failed to return?
Message queues are "email for applications". They allow you to break up your application into components that talk to each other across processes, machines and time. For example if you need to print and send a letter to your customer, you can have the web frontend send a message to the printer task. Those messages can be queued up (in case the printer is down), and handled slowly or quickly by the printer. If you have two printers you can have them both pulling jobs off the same queue. Good message queues can offer reliability guarantees like at most once delivery, and persistence (saving message queues to disk in case machines go down). Also primitives like pub/sub where applications subscribe to data (think: stock feeds), and other applications publish that data.
Simple use cases for message queues in a web application include:
- Improving response latency by moving deferrable tasks out of the page serving code
- Improving response latency by simplifying implementation of concurrent tasks (whether across cores or servers)
- Allowing capacity planning for average load instead of peak load for deferrable tasks
The bigger story is that message queues make a good substrate for building fault-tolerant, highly available, scalable distributed systems that are easy to reason about and have predictable performance. This is because system components can be cleanly decoupled, implemented with redundancy and scalability, the message queue itself takes over much of the complexity of failover and delivery/ordering guarantees, and the message queue makes an excellent point for monitoring the system, giving visibility into logjams and problems in a live system.
Personally, I look forward to using this instead of Redis and RabbitMQ as the message broker for Celery. Celery is a background tasks framework for Python. You write code that defines tasks, then certain things trigger tasks. Classic example: user uploads a video, then in the background the video is re-encoded in some standard format. So after the upload is done, the web application triggers a task by sending a message to the message broker (RabbitMQ, Redis, or now Disque).
Why not just use RabbitMQ or Redis? Well, RabbitMQ is, in my experience, complex and fragile. It's got a ton of different features, which means you have to configure it to do anything beyond just the basics, and its management tools are somewhat lacking (why rabbitmqctl and rabbitmqadmin?). I recently started switching to Redis, because Redis is pretty much plug and play. It just works, and even minimal configuration that is necessary for this use case is very clear and simple. Moreover, it's got a very simple API for examining what's going on, no complex permissions management, vhosts, etc. It's only downside? It's not a message broker; it just has some of the right primitives to act like one.
This is not to say that RabbitMQ or Redis are bad. They are great for what they do. I simply don't want to use them as the backend for Celery for the reasons stated above.
rabbitmqctl is the core tool to interact with a node. It can show the cluster status, status of the node process etc., stop and start the app and so on. It works at a lower level.
rabbitmqadmin comes with the Management plugin, and is a client for the plugin's HTTP API. You have to enable the plugin to expose that API. The Management plugin adds some overhead, I believe (it samples statistics continually to serve through the API), and as a result it's optional. Not everyone would want or need to enable it.
rabbitmqctl does the basics, whereas rabbitmqadmin is a higher-level tool.
Thanks for that. Yes, I figured that out at some point. Unfortunately, IIRC there were cases where I needed to accomplish what seemed like pretty basic things, but they could only be done with the Management plugin. This was on the order of "does this user's password hash match what I have?" The reason was to get Puppet to perform configuration management on the node. It worked eventually, but cost me an hour or two, whereas complete lack of user management (and permissions managed at the network level), is more appropriate for my use cases and works just as well.
I did not. I probably should have, but there are only so many hours in the day, and I didn't have one for this particular task. Since I still use RabbitMQ in some places I expect I will be dealing with this more, and if the situation doesn't improve I'll contact them then.
INFO and LLEN. By contrast, here's my command to view the length of the queue with Rabbit:
sudo rabbitmqctl -p example list_queues name messages messages_ready messages_unacknowledged consumers | grep 'celery \|other_celery '
Note that I need to use sudo to get this information here. I am sure the Management plug could help here. I do have Flower set up, but it gets terribly confused about multiple queues and multiple task servers and sets of workers, so it's basically useless. It seems to latch onto one of the queues and only shows the stats for that in most of its views.
as a communication layer between modules or micro services or any distributed system. if you want to push global updates to a system, you can push a message out to a channel and all of the subscribers will get the message.
I would suggest that one of the reasons so many people use Redis as a message queue is because they do not want another piece of infrastructure. The fact that people use Redis as a message queue does not, in my mind, mean that this is for lack of proven alternatives. I think this is a cool project, but for me, I will stick with Redis.
I think it's more than that other message queues are "another piece of infrastructure." In fact, many other message queues are several pieces of software, and while I can appreciate the modular-nature / separation of concerns, it can be very overwhelming to introduce, for example, JVM, Zookeeper, another DB, and the queuing software to handle the above. I'd much rather have a single, small, and yet still powerful piece of software like Redis, and if Disque can be that without the performance issues introduced by mapping a message queue over Redis, that seems to be a huge gain.
This is different enough from Redis that I don't see how they directly compete. Of course, if you already have a system setup and no performance constraints with what you have, of course you shouldn't change.
For giggles I spent an hour this evening and added support for Disque to the pluggable system I help maintain [0].
Things I can say: the ruby disque library isn't really fleshed out yet. It's alpha, so that's fine, but it's got a ways to go. For example, it doesn't directly expose ACKJOB as a command. The server is equally alpha, things like HELP don't do anything yet, and some options on some commands appear broken. But hey, it was a fun way to spend an hour.
Okay, looking at this I can see it as a decent replacement for Beanstalk, which has been my go-to simple queue server for years now, but has been on inactive development for some time.
I built a failover system for it involving NFS mounts and heartbeat, but having it all be automatic would be quite nice. Looking forward to a prod-ready version.
While this looks promising, why not make Disque part of Redis by introducing the "message queue" data structure? This would allow to build more powerful, application-specific messaging abstractions, for instance a compare-and-swap-key-with-reliable-broadcast (CASKWREB in Redis terminology).
It is not possible to replicate Disque features as a Redis data type, it is fundamentally a different best unfortunately. The project started exactly as a feature for Redis but was migrated away as a separated system because while I was working on it the tension between what Redis was and what Disque would be were bigger every day.
That's close. It's meant to be more like dysfunctional queue.
From the README:
DIStributed QUEue but is also a joke with "dis" as negation (like in disorder) of the strict concept of queue, since Disque is not able to guarantee the strict ordering you expect from something called queue. And because of this tradeof it gains many other interesting things.
How do people solve the resource allocation problem with distributed job queues?
By resource allocation problem, I mean that jobs may be small (so that lots of them can occur in parallel) or large (occupying a significant fraction of a machine's CPU, memory, bandwidth, whatever), and may be mixed together. Trying to do too much can effectively crash a system with OOM killer or paging.
Does everybody just roll their own resource-based scheduler?
In the case of RabbitMQ, it starts paging messages to disk, IIRC. If you run into descriptor / memory / disk limits, it starts turning away new messages until it returns back to a safe threshold.
There's a certain amount it can do, but ultimately, you have to apply backpressure at some point unless you're willing to start losing messages at the message broker layer.
After that, there's still a lot you could do: spool them to disk on the publisher, retry later, etc. You still risk message loss, filling up THOSE disks, etc... but again, something has to eventually give.
I'm not worried about too many messages piling up; I'm worried about the consumers pulling too many messages off and acting on them concurrently using up all the resources on the machine, or pulling off too few messages, and leaving too many resources idle.
My first exposure to the resource allocation problem and solutions came at Google, whose system evolved into Kubernetes (now an open source project). It's so damn effective I hope it takes off everywhere.
Well, my first concern about a "job queue" (as opposed to a message queue) is getting work done efficiently and reliably; do too little, and resources aren't being used efficiently, but do too much, and you might never get anything done.
We have this exact problem at the startup where I work. We have a home-made solution (a combo of Akka and Play for API / admin UI) that works OK, but really we'd prefer not to be in the business of writing job queues and schedulers.
Something big and cluster-oriented like Hadoop isn't a good fit; we typically only have one or two actual machines servicing jobs, because of our business model. Financial entities don't like their data being mixed with other people's data, so we give everybody tiny little networks of VMs, and can't farm work out to a giant cluster.
Redis is used as the backing storage for our homemade job queue. But without resource monitoring and allocation, Disque doesn't help with our problem. I'm sure it'll find much use elsewhere, though.
Both are very equal it seems, with only some minor differences. Disque isn't used in production systems yet so most likely the real comparisons will arrive when Disque is more mature.
Some key differences are that beanstalkd offers (feature wise):
1. Bury / kick
2. Priorities
3. Designed for speed
And Disque:
1. Designed for HA, scalable, distributed
2. Fault tolerant
3. Peek to multiple jobs
With the amount of momentum behind Redis and the fairly unknown beanstalkd, I guess Disque will gain popularity quite soon.
I think the thing this is most comparable to is NSQ, but NSQ takes a different approach to distribution.
- disque: send to one, read from one. The message is handed off to N replicas, and efforts are made to avoid duplicated or dropped messages. Disque will refuse new messages if RAM is filled across the cluster.
- nsq: send to any. Read from all. NSQ nodes do not communicate or coordinate with each other. Since only one node originates the message, there's no duplication, but a node outage can drop messages, and a partition can isolate them with no consumer. NSQ can grow its queue beyond RAM, so it will keep accepting new messages even if it is partitioned from the consumer.
Personally I think NSQ's approach looks like it's doing a lot less work and achieving almost all the same guarantees.
Tbh, I'd have preferred a synchronous replication option for Redis nodes. That seems to be essentially the "improvement" in Disque and it'd be easier to maintain two replication nodes than two separate projects imo.
Hello, Redis + sync repl is a valid point of view IMHO, there are reasons why this was discarded, but it is definitely an option. However Redis + sync replication does not give you Disque at all. Disque is a specialized distributed system hard to simulate with Redis, nodes runs protocols in order to coordinate re-queueing the message, in order to federate to route messages, and so forth.
I am basically using something I could fit in Redis + sync replication [in terms of data model / function] as a job queue presently so I suppose that is just where my mind jumps to.
Looking forward to giving this a try - we currently use an offshoot of "resque" for the Java ecosystem called "jesque", but it's always good to explore alternatives.
I'm excited to see what comes out of this. I'm a huge fan of Redis, mainly because it gives you a bunch of really tiny, easy-to-understand primitives that you can combine to make something very powerful.
I don't know if this will follow the same philosophy, but I'll be keeping a close eye on this as it evolves.
I would say that Disque follows the minimalistic philosophy of Redis. However, Disque is a specialization of one of the most common use cases of Redis: queues. So I wouldn't expect so many primitives--Disque knows about jobs and queues, so you don't have to build those yourself like people have been doing on top of Redis.
As a RabbitMQ user, I'll switch to the first viable alternative that is production ready.
RabbitMQ's clustering isn't great. It's sensitive to partitions, which can occur not just from actual network hiccups but also simply due to high CPU or I/O load, and it does not have a good strategy to recover from such partitions.
RabbitMQ is not multi-master by default. A queue is owned by a specific node, and if you have a partitioned cluster, that queue (and related objects such as exchanges and bindings) will disappear from other nodes.
You can patch RabbitMQ's clustering by enabling "high availability", which is their term for mirrored queues. Each queue will get a designated master, and be replicated to other nodes automatically. If a partition happens, the nodes elect a node to become a new master for a mirrored queue.
Unfortunately, this opens the cluster up to conflicts. Let's say you get brief partition. Now all the nodes see each other again, and you have conflicting queues: Node A used to be master of queue X, now node B is also master of queue X. During the split, their contents diverged a little bit. But RabbitMQ has no way to consolidate the two masters, so the queue is not operational.
To fix this, either you need to reconstruct the queue manually (usually impossible from an application's point of view), or wipe it (hardly a solution) or simply have RabbitMQ automatically pick a winning master and discard the other master(s). This mode is called "autoheal", and picks based on which master has the most messages; the previous master(s) are wiped and become slaves. This is coincidentally the only mode in which RabbitMQ can continue to run after a partition without manual intervention.
In practice, recovery has proved flaky for us. Nodes stay partitioned even after they should be able to see each other. We have also encountered a lot of bugs — for example, bindings or exchanges disappear on some nodes but not on others, or queues are inexplicably lost, or nodes otherwise just misbehave. We're on a cloud provider which is otherwise rock solid; of all the software (databases etc.) we employ in our clusters, RabbitMQ is the only one that misbehaves.
This is anecdotal, of course. Fortunately, the author of Jepsen, Kyle Kingsbury/"Aphyr", has done the maths to back this up, demonstrating that RabbitMQ's clustering is both theoretically and practically unsound [1].
This may be overly harsh. RabbitMQ is a decent project. RabbitMQ has a lot of features. Things like routing keys, flexible durability, TTLs and dead letter exchanges are great [†]. When it works, it works really well. But in the real world, I wouldn't want to run it more than two nodes, and preferably not at all.
[†] Although unfortunately DLXes are effectively unusable in a looping topology configuration (ie., for timed retries), as AMQP frames will increase indefinitely in size.
How do the AMQP frames increase in size indefinitely? (I use DLXs and message TTLs to do timed retries, without issue, although at a very low retry rate, so maybe I'm just not hitting your purported issue with increasing frame sizes?)
Every time a dead-lettering happens, RabbitMQ will do this [1]:
The dead-lettering process adds an array to the header of each dead-lettered message named x-death. This array contains an entry for each time the message was dead-lettered.
This table is never truncated. It exists in the AMQP frame and will grow indefinitely.
The AMQP spec mandates that clients negotiate a frame-max size per connection [2], and I believe RabbitMQ is in violation of the spec through this behaviour if the client specifies a non-zero value. (RabbitMQ even ignored this negotiation prior to 3.1, and there's a document [3] which indicates it ignores limits entirely.)
As a result, clients that follow frame-max strictly according to spec (such as amqplib for Node.js [4]) will refuse to handle violating frames, which is understandable given that clients also like to have predictable buffer allocation.
http://antirez.com/news/88 Salvatore talked a little about his motivations a little over a month ago. While he doesn't offer a direct comparison, it may help.
Interesting. Gearman with a few additional promises (sudh as providing transactional and durability guarantees). I'm looking forward to seeing this mature.
I haven't tried it actually. I evaluated it a few years ago and I remember that the queue persistance was missing (though now I see it is supported), and it lacked handling retries out of the box (which Gearman does).
Now I've wrote my own wrapper to handle retries on the client side so I might give it a try.
We use in live environment for about 3 years. We didn't had any incident with it. Persistence is a file, that you can backup regularly to a secondary storage.
- We have on peek 10k OPs/second.
- features we like is priority, TTR, pause tube (while deploying new code)
- responds really fast to stats (we have on a Stats/Graphite dashboard)
With AMQP (specifically, RabbitMQ) I can set up a fanout topology such that a publisher simply publishes to an exchange. Clients then bind their queues to this exchange based on a routing key, and any messages matching the key will be copied there. If no clients have bound, messages disappear into the aether.
For example, the publisher can use the exchange "changes" to write messages with keys such as "create.post.123" or "update.user.321". A consumer can then bind a queue to the exchange with the filters "create." and "update." to get all create and update messages. A different app can listen to just ".user." to get anything related to users.
This is a really powerful tool, allowing publishers to be dumb firehoses and clients to surgically pick what they need. Of course you don't need exchanges and bindings to accomplish this, only topics.