Clojure's core.async pipeline-async off-by-two error explained
Quite recently I’ve been working on my asynchronous programming library for Fennel, and fiddling with clojure.core.async
at the same time for my other project.
I’ve found an interesting function, called pipeline-async
, which seemed to be a good fit for my task.
Here’s the documentation for this function:
user> (doc clojure.core.async/pipeline-async)
-------------------------
clojure.core.async/pipeline-async
([n to af from] [n to af from close?])
Takes elements from the from channel and supplies them to the to
channel, subject to the async function af, with parallelism n. af
must be a function of two arguments, the first an input value and
the second a channel on which to place the result(s). The
presumption is that af will return immediately, having launched some
asynchronous operation whose completion/callback will put results on
the channel, then close! it. Outputs will be returned in order
relative to the inputs. By default, the to channel will be closed
when the from channel closes, but can be determined by the close?
parameter. Will stop consuming the from channel if the to channel
closes. See also pipeline, pipeline-blocking.
So the basic idea is that you can spawn n
asynchronous tasks that will be working in parallel processing data from the from
channel, and putting results to the to
channel, in the order of the inputs.
The order here is important because that’s why this function behaves the way I’ll show later.
Here’s a demonstration of this function processing data:
user> (require '[clojure.core.async :as a])
user> (let [out (a/chan)
data (a/to-chan! (range 10))
task (fn [value channel]
(a/go (a/<! (a/timeout 1000))
(a/>! channel (inc value))
(a/close! channel)))]
(a/pipeline-async 3 out task data)
(time (a/<!! (a/into [] out))))
"Elapsed time: 2017.977058 msecs"
[1 2 3 4 5 6 7 8 9 10]
In this example, I’ve created an out
channel, and a data
channel, containing numbers from 0
to 9
.
Then I’ve created a function task
that receives a value
and some channel
, sleeps for one second, and puts incremented value to that channel
, then closes it.
After that, I started the pipeline-async
with a parallelism of 3
and our channels and the task as arguments.
And finally, we use a/into
to convert a channel to a vector, and await the operation with a/<!!
.
Kinda like pmap
on a limited thread pool, except we’re doing things in an asynchronous way here.
So, as you can see, even though our asynchronous function sleeps for one second on each element, thanks to the parallelism of 3
we’ve finished processing ten numbers in just two seconds!
Wait, I don’t think that math checks out… shouldn’t it be close to four seconds instead?
I mean, 10
divided by 3
is not 2
.
Let’s try with the parallelism of 1
instead, it should be around ten seconds:
user> (let [out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/<! (a/timeout 1000))
(a/>! chan (inc value))
(a/close! chan)))]
(a/pipeline-async 1 out task data)
(time (a/<!! (a/into [] out))))
"Elapsed time: 4011.54529 msecs"
[1 2 3 4 5 6 7 8 9 10]
Wait, now it looks like it has a parallelism of three! Let’s add some logging:
user> (let [log (a/chan 10)
out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/>! log (str "started job " value))
(a/<! (a/timeout 1000))
(a/>! chan (inc value))
(a/close! chan)
(a/>! log (str "finished job " value))))]
(a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
(a/pipeline-async 1 out task data)
(time (a/<!! (a/into [] out)))
(a/close! log))
started job 2
started job 1
started job 0
finished job 1
finished job 0
finished job 2
started job 3
started job 4
started job 5
finished job 4
finished job 3
finished job 5
started job 6
started job 7
started job 8
finished job 6
finished job 7
finished job 8
started job 9
finished job 9
"Elapsed time: 4011.423488 msecs"
Ho, now I see it, there’s an off-by-two error in pipeline-async
.
Instead of launching 1
task, in this example 3
tasks were launched!
That explains, why we see four seconds with parallelism of 1
and only two seconds with parallelism of 3
- in the latter case 5 tasks are running simultaneously.
And ten over five is indeed two.
Off-by-two error
Now, this is actually a known bug, submitted to Clojure’s JIRA back in 2016.
However, it was closed with a “won’t fix” status, and I don’t think anyone is working on it.
I’ve studied the code of the pipeline*
function, which is a base for all pipeline-*
variants in core.async
, and couldn’t figure out why this happens just by looking at the code.
So, I decided to port this function, to Fennel using channel implementation from my fennel-async library. I did it, and as can be seen, the comment in my implementation mentions the same “off-by-two” error.
Because I got it as well.
Somehow.
Initially, I thought that my implementation may avoid this bug, because instead of callbacks I use Lua’s coroutines, to park threads, and a scheduler that runs through all asynchronous tasks in a loop.
However, as it turned out later, neither the channel implementation nor the core scheduling principles are strictly related to the problem.
It’s just how the code of pipeline-async
works.
Let’s look at the code for pipeline*
function, which is what pipeline-async
calls internally:
|
|
I’ve hidden some parts that are uninteresting to us (the ...
markers) because this function can work in two ways - asynchronous and parallel.
We’re only interested in the asynchronous one, so the process
function is not interesting for our case, same for the (:blocking :compute)
branch.
If called with the value of n
equal to 1
this function spawns three go-loop
threads, which are asynchronous threads running on the core.async
thread pool.
These threads can be parked when needed, and are based on the callback system, this library implements.
I’m not going to explain to you how core.async
works, because it is a huge topic, well covered by the author of the go
macro.
In short, each <!
in this code is a point of transformation, which turns all the code after the <!
call to a callback, attached to the channel of the go
block.
Maybe not exactly like that, but it’s beside the point, really.
What you need to know about this function is, that at the very least it will spawn three internal threads, and instinctively you might think that this is why there are three tasks running instead of one, as one would expect. And you’re right, well, in a way, but not exactly. It’s the interplay of these threads, and the channels they use for synchronization that’s causing this. Let me explain.
Explanation
So let’s try to understand why the off-by-two error happens in this code. After reimplementing this function in Fennel, and spending a good amount of time thinking instead of sleeping, I think I got why it works the way it works.
The main source of the issue is that this function wants to retain the order of outputs relative to inputs. I don’t exactly know why this was a decision because when processing data, asynchronously transferring them between channels, I kinda expect the order to be mixed, because some tasks may take more time then others, but for some reason, this function was implemented specifically to prevent this from happening. So, how it does it?
I’m going to refer to specific lines in the pipeline*
source code I’ve shown above, which is not equal to the original source, so keep that in mind.
Line numbers are links that should move your view to the mentioned line, but if you’re reading this on a big display, it’s better to open the code and the explanation side by side.
Sorry for the inconvenience.
In lines 5 and 6 we create two channels jobs
and results
.
These channels are buffered with the same buffer size as our parallelism parameter n
and this is what should give us the back-pressure needed to achieve a limited amount of tasks running.
Then there are three go-loop
blocks defined at lines 18, 22, and 30.
I’ll call them go-1
, go-2
, and go-3
respectively.
So, for the parallelism of 1
the process goes like this:
- go-2 takes data from the
from
channel. - go-2 puts the job to the
jobs
channel, and a new channel to theresults
channel in the next line.
Current state: jobs
: full, results
: full
-
go-1 takes a
job
from thejobs
channel, and spawns the asynchronous task at line 12.This task receives a value
v
and a channelres
, to which it will put the result andclose!
it. This will be important for thego-3
channel later. -
go-1 puts
res
to thep
channel.
Current state: jobs
: empty, results
: full
-
go-3 takes a value from the
results
channel. -
go-3 waits for the
p
channel to get theres
channel from the step 3. -
go-3 waits for the asynchronous job to finish.
This is when the asynchronous block started by the function we’ve passed as an argument to
pipeline-async
is awaited. Until it closes theres
channel, thego-3
thread will be parked.
Current state: jobs
: empty, results
: empty, go-3
: parked, one task is running.
- go-2 takes data from the
from
channel. - go-2 puts the job to the
jobs
channel, and a new channel to theresults
channel in the next line.
Current state: jobs
: full, results
: full, go-3
: parked, one task is running.
-
go-1 takes a
job
from thejobs
channel, and spawns the asynchronous task at line 12.This is when the second task is spawned, despite the fact that the first one is still running.
-
go-1 puts
res
to thep
channel.This should advance
go-3
block, but it is already parked.
Current state: jobs
: empty, results
: full, go-3
: parked, two tasks are running.
We’re still not done.
go-3
is still parked, awaiting the first task.- go-2 takes data from the
from
channel yet again. - go-2 puts the job to the
jobs
channel. - go-2 wants to put a new channel to the
results
channel but theresults
channel is full, so thego-2
parks.
Current state: jobs
: full, results
: full, go-3
: parked, go-2
: parked, two tasks are running.
-
go-1 takes a
job
from thejobs
channel, and spawns the asynchronous task at line 12.This is when the third task is spawned, despite the fact that the first one is still running.
-
go-1 puts
res
to thep
channel.This should advance
go-3
block, but it is already parked. -
go-1
wants to take a newjob
from thejobs
channel, but the producergo-2
is parked, sogo-1
parks as well.
Current state: jobs
: empty, results
: full, go-3
: parked, go-2
: parked, go-1
: parked, three tasks are running.
Phew! This is quite a convoluted process if you ask me!
So the reason for the extra two jobs running is tied to the fact that between the go-3
thread parking, go-2
is able to push two more tasks before parking on results
channel being full.
I’m not sure if you’ve noticed this, but we can turn this off-by-two error into an off-by-one error simply by changing the order we put stuff to the jobs
and results
channel.
So we simply need to change this:
|
|
Into this:
|
|
Now the whole process will park a bit earlier and only two tasks will run, as per the 10th step. This doesn’t fix the error but makes it a bit more manageable, in cases when you’re not aware of the problem in the first place, and you’re trying to access a resource with a limited amount of connections. Still can cause problems.
Unordered pipeline-async
implementation
The issue can be fixed if we don’t maintain the order of results, and instead, put them by the order of task completion. Here’s the implementation of such a function:
|
|
An interesting trick happens at line 16.
We’re creating a channel that has N-1
elements in it.
Each thread when it is done processing all data and the from
channel was closed will try to take from that channel.
If the result is nil
it means other threads already exhausted the channel, and we’re the last one, and it is our responsibility to close!
the to
channel, if desired.
The same example from above using this variant of pipeline
shows that there are exactly 3
tasks running at a time:
user> (let [log (a/chan 10)
out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/>! log (str "started job " value))
(a/<! (a/timeout 1000))
(a/>! chan (inc value))
(a/close! chan)
(a/>! log (str "finished job " value))))]
(a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
(pipeline-async-unordered 3 out task data)
(time (a/<!! (a/into [] out))))
started job 2
started job 1
started job 0
finished job 0
finished job 1
finished job 2
started job 3
started job 4
started job 5
finished job 4
finished job 3
finished job 5
started job 6
started job 7
started job 8
finished job 6
finished job 7
finished job 8
started job 9
finished job 9
"Elapsed time: 4007.215739 msecs"
[2 3 1 5 4 6 7 8 9 10]
As can be seen, the order of elements is not the same as the order of inputs, but we get exactly three tasks running at a time.
This actually has another huge benefit over to default pipeline-async
.
You see, if the first-ever task in pipeline-async
takes the most amount of time to complete, this will be the time needed to get the first result from the to
channel.
Yes, other N-1
elements will be processed in parallel, however, until the first-ever task is finished, no more tasks will be taken at all!
See for yourself:
|
|
In this example, I’ve made it so only the first task will sleep for 10 seconds.
In the log, you can see that we’ve started three jobs (because of the off-by-two error), and then jobs 2
and 1
finished immediately.
Then we waited for job 0
to finish before starting the job 3
.
Thus the time for the first value to appear in the out
channel is 10 seconds.
If we use pipeline-async-unordered
here, you can see that we have a much higher throughput:
(let [log (a/chan 10)
out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/>! log (str "started job " value))
(when (= value 0)
(a/<! (a/timeout 10000)))
(a/>! chan (inc value))
(a/close! chan)
(a/>! log (str "finished job " value))))]
(a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
(pipeline-async-unordered 3 out task data)
(time (a/<!! out)))
started job 2
started job 1
started job 0
"Elapsed time: 4.602578 msecs"
finished job 1
finished job 2
started job 3
finished job 3
finished job 0
3
In this case, the first job to complete was the job 2
, and then job 1
, 3
, and long after that job 0
.
If we change the example a bit and make other jobs sleep for, say 1 second, we’ll still get more throughput with an unordered version:
user> (let [log (a/chan 10)
out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/>! log (str "started job " value))
(if (= value 0)
(a/<! (a/timeout 10000))
(a/<! (a/timeout 1000)))
(a/>! chan (inc value))
(a/close! chan)
(a/>! log (str "finished job " value))))]
(a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
(a/pipeline-async 1 out task data) ;; 1 here is 3 actually
(time (a/<!! (a/into [] out))))
started job 2
started job 0
started job 1
finished job 1
finished job 2
finished job 0
started job 3
started job 4
started job 5
finished job 3
finished job 4
finished job 5
started job 6
started job 7
started job 8
finished job 6
finished job 7
finished job 8
started job 9
finished job 9
"Elapsed time: 13011.037871 msecs"
[1 2 3 4 5 6 7 8 9 10]
Here you can see that until we’ve finished the job 0
we were waiting before we could take other jobs.
Because of that we first spent 10 seconds on the first three tasks, and then 3 seconds on the remaining task.
With the unordered version and the same parallelism, we spend at most 10 seconds, because other tasks can still run even though the first task is not completed yet:
user> (let [log (a/chan 10)
out (a/chan)
data (a/to-chan! (range 10))
task (fn [value chan]
(a/go (a/>! log (str "started job " value))
(if (= value 0)
(a/<! (a/timeout 10000))
(a/<! (a/timeout 1000)))
(a/>! chan (inc value))
(a/close! chan)
(a/>! log (str "finished job " value))))]
(a/go-loop [] (when-some [line (a/<! log)] (println line) (recur)))
(pipeline-async-unordered 3 out task data)
(time (a/<!! (a/into [] out))))
started job 0
started job 2
started job 1
finished job 1
finished job 2
started job 3
started job 4
finished job 4
finished job 3
started job 5
started job 6
finished job 5
finished job 6
started job 7
started job 8
finished job 7
finished job 8
started job 9
finished job 9
finished job 0
"Elapsed time: 10003.531318 msecs"
[2 3 5 4 6 7 9 8 10 1]
I hope this gives a clear illustration of why you might want unordered version.
A possible fix for pipeline-async
Apart from the already mentioned line swap, which turns this problem from off-by-two to off-by-one, I don’t think I can come up with any other ideas.
We kinda have to put channels to the results
channel, and then take them out, awaiting on the inner channel in order to preserve the order.
As a hack, we can subtract one from the number of threads we spawn in the dotimes
block, if n
is greater than 1
, but this still leaves us with situations where two threads instead of one will run.
Disallowing parallelism of 1
is an option too, as it doesn’t make any sense to use this function with such parallelism.
In other words:
(defn pipeline-async
"..."
([n to af from] (pipeline-async n to af from true))
([n to af from close?]
(assert (> n 1) "don't use pipeline-async with parallelism of 1")
(pipeline* (dec n) to af from close? nil :async)))
This should take care of most use cases, but isn’t a proper solution by any means.
Given that I have the same problem in my fennel-async
library, I may eventually find the solution for it and propose it for the Clojure implementation too.
Unless it would use some Lua-specific features, that can’t easily be achieved without full coroutine support in the JVM, or other runtimes Clojure uses.
Until then, I hope this was a useful and interesting read!
A proper fix for pipeline-async
After a bit more time, I think I figured out a proper way to fix the pipeline-async
function.
All we really need is a way to synchronize the channel that processes the result and the channel that processes the jobs.
We can add one more channel to the mix like so:
diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj
index 55ae450..66bfad0 100644
--- a/src/main/clojure/clojure/core/async.clj
+++ b/src/main/clojure/clojure/core/async.clj
@@ -530,6 +530,7 @@ to catch and handle."
nil))
jobs (chan n)
results (chan n)
+ finishes (and (= type :async) (chan n))
process (fn [[v p :as job]]
(if (nil? job)
(do (close! results) nil)
@@ -540,7 +541,9 @@ to catch and handle."
true)))
async (fn [[v p :as job]]
(if (nil? job)
- (do (close! results) nil)
+ (do (close! results)
+ (close! finishes)
+ nil)
(let [res (chan 1)]
(xf v res)
(put! p res)
@@ -554,6 +557,7 @@ to catch and handle."
:async (go-loop []
(let [job (<! jobs)]
(when (async job)
+ (<! finishes)
(recur))))))
(go-loop []
(let [v (<! from)]
@@ -572,6 +576,8 @@ to catch and handle."
(let [v (<! res)]
(when (and (not (nil? v)) (>! to v))
(recur))))
+ (when finishes
+ (>! finishes :done))
(recur))))))))
;;todo - switch pipe arg order to match these (to/from)
This essentially means that the thread that spawns jobs will park until the thread that processes job results signals that the job is finished.
This way we basically ensure that there will be no more than n
active jobs.
I will look into the process of submitting this fix to the core.async
library soon.
Edit Nov 23 2022:
I’ve submitted the patch from above as a Clojure Q&A question because it seems there’s no other way to suggest changes unless you’re a part of the Clojure development team who has access to the project’s JIRA. The post is here. Alex Miller replied that they don’t think this is a problem that needs fixing, so I guess the bug is here to stay.
It is interesting, however, that neither
pipeline
orpipeline-blocking
share this behavior, as their implementation waits for the task result directly on the job consumer thread. Which makes it more awkward to combine them with the asynchronous variant, when you need to have the same amount of threads.It is also a bit weird that all variants of
pipeline
functions don’t return the out channel, so you can pass it through, and connect pipelines, so I end up using something like this when I need to mix pipelines:(defn process [parallelism blocking-fn data-ch] (let [out-ch (chan)] (pipeline-blocking parallelism out-ch (map blocking-fn) data-ch) out-ch)) (defn process-async [parallelism async-fn data-ch] (assert (> parallelism 2)) ; Compensate for the ASYNC-163 (let [out-ch (chan)] (pipeline-async (- parallelism 2) out-ch async-fn data-ch) out-ch))
Much like other Clojure functions that work with collections, these two accept the data channel as the last argument, so you can use the thread-last macro with them:
user> (->> (range 10) (to-chan!) (process 10 (fn [v] (Thread/sleep 1000) (inc v))) (process-async 10 (fn [v c] (go (<! (timeout 1000)) (>! c (inc v)) (close! c)))) (a/into []) <!! time) "Elapsed time: 2011.683893 msecs" [2 3 4 5 6 7 8 9 10 11]
This comes at a cost that you can’t use
pipeline-async
with parallelism less than three, but it doesn’t make sense to do it anyway.
Now, for real, thanks for reading!