Recently, I wanted to make use of the pool pattern, which is generally pretty simple in Go. Specifically, however, I wanted to be able to dynamically cap the level of concurrency for any given set of tasks submitted to the pool.
To clarify, let’s say we have a pool that consists of workers. For any given job A, consisting of tasks , we want no more than of the tasks in A to run concurrently, where and . My use case is a system to test HTTP resources. Each job might be a specific set of endpoints. I might want to hit some more “gently” than others, hence the need to dynamically cap the level of concurrency.
Having worked with the Erlang ecosystem a bit, I really like the idea of passing messages between independent “processes”. This is a very natural and fairly simple abstraction.
Go is a little different, though. In Erlang you create a process and then pass around its PID, which can be used to send it messages (like having its address). In Go, a goroutine (which we can think of as a process) is decoupled and independent (although shared mutable state is still possible).
In order to communicate between goroutines, Go makes use of channels, which are like pipes or queues and can be one- or two-way. This means that if you want to spawn a goroutine and then pass it messages, you need to give it a reference to the channel you plan to use.
I’ve included a very simple example below (note that there is a race condition in this code, it doesn’t matter because the point is to illustrate how channels work). In this case I have made the channel accessible to the goroutine using a closure, but I could have also passed it into the function.
This basic pattern can be used to construct a goroutine pool. We can spawn several goroutines that listen on a channel until they receive a task, complete it, send the result back through another channel, then start listening again. They’ll stop listening when the channel is “closed”. We can use a Wait Group, which is similar to a semaphore, to make sure we don’t move on until all the workers are finished.
This is great except for the fact that, given a set of tasks as described above, we might execute up to N of them concurrently depending on the workload of our pool. We need a way to group tasks together into what I called “jobs” above.
One solution (there may be others) is to take advantage of the fact that channels in Go are themselves just values, so they can be passed through other channels. Instead of workers that pull jobs from a shared queue, they can pull queues (channels) from a shared queue (channel).
Note that now our jobs channel is a channel of channels of integers. So before we submit the tasks associated with a particular job, we decide how many of the workers may work on these tasks concurrently and we submit the task channel that many times. The we feed the tasks into the task channel and, at most, that many workers receive our tasks.
A couple of caveats are in order. First, it is perfectly possible that fewer than the maximum number of workers will process the tasks if the rest are busy. In this case a worker will grab a task channel that has already been closed and immediately discard it. For this reason, this strategy might not be the best for long-running pools (eventually you could end up with a lot of closed channels in your queue, maybe that causes a problem for you, maybe it doesn’t).
Another thing to note is that each job (group of tasks) now requires its own channel. This might not be great for situations where each job is quite small and there are many jobs.
In any event, you can play around with the code and see for yourself that it works. Change the “1” on line 26 to a “3” and you should notice that the results come back mixed up instead of in order.
Image credit: Thomas Hawk