Andrey Listopadov

Lazy Sequences and Java Streams

@programming java clojure ~10 minutes read

When Clojure 1.12.0 was released in September, the release note had a lot of cool features, such as virtual threads, but one feature caught my eye in particular: support for Java Streams and functional interface support. That was just what I needed at work to port one of our internal libraries to Java, to make it easier to reuse from projects written in different JVM languages. Little did I know that Java Streams suck.

The problem

We have a small library, that implements queues with lazy sequences. It has both producer and consumer parts, so several projects can reuse the same library, one being a producer, and the other being a consumer. Today we’re interested in the consumer part.

One of the features this library has is that we can have several separate queues, and we can take items from all of them in a fixed order. Each queue is sorted, so when we have multiple queues, we need to get elements in order and still in a lazy way. Let’s look at some examples:

(def queue1
  (filter odd? (range)))

(def queue2
  (filter even? (range)))

Here, I created two infinite sequences of sorted numbers, just as an example:

user> (take 10 queue1)
(1 3 5 7 9 11 13 15 17 19)
user> (take 10 queue2)
(0 2 4 6 8 10 12 14 16 18)

It’s not that different from actual queues used in our library, as each sequence is potentially infinite and sorted. What we need to get as a result is a single queue with items of both queues combined with retained ordering:

(0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 ...)

Lazy Seq solution

To achieve this we can implement a merge sort function that will produce a single queue from multiple queues:

(defn merge-sort-sequences [sequences]
  (lazy-seq
   (let [[sequence & sequences] (sort-by first sequences)]
     (cons (first sequence)
           (merge-sort-sequences (conj sequences (rest sequence)))))))

Note, that I’ve omitted some checks for things like empty sequences, and such for the sake of simplicity.

In short, it evaluates like this:

  1. Let’s say, we have [(1 3 5 7...) (0 2 4 6...)] as an input;
  2. We sort it by the first element of each sequence: [(0 2 4 6...) (1 3 5 7...)];
  3. We take the first element from the first queue: 0, and cons it to the recursive call to which we pass back all of the sequences, except we remove the first element from the first queue;
    • Since the function returns a lazy sequence the recursive call is effectively trampolined.

And we can see that it works:

user> (take 10 (merge-sort-sequences [queue1 queue2]))
(0 1 2 3 4 5 6 7 8 9)
user> (take 20 (merge-sort-sequences
                [(take-nth 3 (range))          ; (0 3 6 9 12 ...)
                 (take-nth 3 (drop 1 (range))) ; (1 4 7 10 13 ...)
                 (take-nth 3 (drop 2 (range))) ; (2 5 8 11 14 ...)
                ]))
(0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19)
user> (take 20 (merge-sort-sequences
                [(filter odd? (range))
                 (filter even? (drop 20 (range)))]))
   (1 3 5 7 9 11 13 15 17 19 20 21 22 23 24 25 26 27 28 29)
;;  \_first_sequence_only_/  \___both_sequences_merged___/

Of course, it won’t work if sequence items are in arbitrary order, but that’s not the case for our implementation.

Java Stream solution

Now, as I mentioned, I had to rewrite this library in Java, for it to be reused in other projects written in other JVM languages. It is possible to use a Clojure library from, say, Scala, but it is quite tedious to do so. We either have to AOT compile the library to a jar, and provide a bunch of methods via genclass. Alternatively, it’s possible to load Clojure, compile the sources, and use it this way, but it introduces way too many hoops to jump through, that rarely anyone would want to do so in our team. And the library in question is small, about 400 LOC with documentation strings and comments, so rewriting it in Java wouldn’t be that hard.

Or so I thought.

I’m not a Java programmer, and I have very limited knowledge of Java. Thankfully, Java is a simple language, unless you wrap everything in an unnecessary amount of classes, use abstract fabric builders, and so on. This library, thankfully, required neither of those cursed patterns - it’s a single static class with no need for instancing, with a bunch of pure methods.

So, knowing that Java has a Stream class, and looking at its interface I thought that I would be able to implement this library. And in truth, it wasn’t a problem, until I got to the lazy merge sort part. That’s when it started looking cursed.

First of all, a Stream is not a data structure - you can’t work with it as if it were data, you have to use pipelines, and then either consume it or pass it around. Moreover, most examples use streams as an intermediate transformation step and return a collection, or suggest passing in a transformer, instead of returning a stream, so I wonder where is this coming from:

Java APIs increasingly return Streams and are hard to consume because they do not implement interfaces that Clojure already supports, and hard to interop with because Clojure doesn’t directly implement Java functional interfaces.

From Clojure 1.12.0 release notes.

Anyhow, I’ve created functions that return streams of items in the queue, much like the ones I showed above. So it was time to implement the merge sort. Let’s look at the skeleton of our function:

public static Stream<Object> mergeSortStreams(Stream<Object>[] streams) {
	return Stream.generate(() -> {
        // implement merge sort somehow...
	});
}

Stream.generate(Supplier) produces an infinite stream, generated by calling the supplier. Basically, it’s what we need here, we can do our sorting inside the supplier. However, there’s a problem - Streams are not data structures. And there’s no way to take one element without consuming the stream. I mean, there’s stream.findFirst() but if we look at the documentation for it, we’ll see that:

Optional<T> findFirst()

Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. If the stream has no encounter order, then any element may be returned.

This is a short-circuiting terminal operation.

Returns: an Optional describing the first element of this stream, or an empty Optional if the stream is empty Throws: NullPointerException - if the element selected is null

What is a short-circuiting terminal operation you ask? A terminal operation may traverse the stream to produce a result or a side effect. A short-circuiting terminal operation does the same, but even if presented with an infinite stream, it can finish in a finite time. And after the terminal operation is performed, the stream is considered consumed, and can no longer be used.

But even if we could use findFirst without closing the stream, it wouldn’t be useful to us, because remember - we need to take the first element from each stream and sort the streams themselves. But findFirst is a destructive operation, it removes the element from the stream.

In Clojure, sequences are immutable - all we can do is construct a new sequence if we wish to add items or take its tail if we need fewer items. Thus first does nothing to the sequence in question, we can freely call it on any sequence, obtain an element, do stuff with it, and be done. You can think of first like of an iterator peek, where you look at what the next element is in the iterator without advancing it.

Thankfully, we can convert a Stream to an Iterator, with it staying lazy and potentially infinite. Only, there’s no peek method in the base Iterator class in Java. Oh well.

Well, we can always implement our own wrapper for the Iterator class:

package org.example;

import java.util.Iterator;

public class PeekingIterator<T> implements Iterator<T> {
    Iterator<T> iterator;
    private boolean peeked = false;
    private T peeked_item;

    public PeekingIterator(Iterator<T> it) { iterator = it; }

    public T next() {
        if (peeked) {
            peeked = false;
            T tmp = peeked_item;
            peeked_item = null;
            return tmp;
        } else
            return iterator.next();
    }

    public boolean hasNext() { return iterator.hasNext(); }

    public T peek() {
        if (!peeked && iterator.hasNext()) {
            peeked = true;
            peeked_item = iterator.next();
        }
        return peeked_item;
    }
}

Of course, we could use a dependency, but due to circumstances, we have to keep the amount of dependencies as low as possible. Now, we can get back and implement our merge sort:

public static Stream<Object> mergeSortStreams(Stream<Object>[] streams) {
	List<PeekingIterator<Object>> iterators = new ArrayList<>();
	for (Stream<Object> s : streams) {
		iterators.add(new PeekingIterator<>(s.iterator()));
	}
	return Stream.generate(() -> {
		iterators.sort(Comparator.comparingInt(a -> (Integer) a.peek()));
		return iterators.getFirst().next();
	});
}

Testing it with Clojure examples from above reveals that it works as expected:

System.out.println(
		mergeSortStreams(
				new Stream[]{
						Stream.iterate(1, i -> i + 2),
						Stream.iterate(0, i -> i + 2)
				})
				.limit(20)
				.collect(Collectors.toList())
);
// => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
System.out.println(
		mergeSortStreams(
				new Stream[]{
						Stream.iterate(0, i -> i + 3),
						Stream.iterate(1, i -> i + 3),
						Stream.iterate(2, i -> i + 3)
				})
				.limit(20)
				.collect(Collectors.toList())
);
// => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
System.out.println(
		mergeSortStreams(
				new Stream[]{
						Stream.iterate(1, i -> i + 2),
						Stream.iterate(20, i -> i + 2)
				})
				.limit(20)
				.collect(Collectors.toList())
);
// => [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]

Obviously, I’m omitting most of the things in the code here, but this should be enough to give you an idea of what I had to do.

Sequences vs Streams

Let’s compare our versions.

Clojure:

(defn merge-sort-sequences [sequences]
  (lazy-seq
    (let [[sequence & sequences] (sort-by first sequences)]
      (cons (first sequence)
            (merge-sort-sequences (conj sequences (rest sequence)))))))

A pretty straightforward way of doing this kind of operation, in my opinion.

Java:

public static Stream<Object> mergeSortStreams(Stream<Object>[] streams) {
	List<PeekingIterator<Object>> iterators = new ArrayList<>();
	for (Stream<Object> s : streams) {
		iterators.add(new PeekingIterator<>(s.iterator()));
	}
	return Stream.generate(() -> {
		iterators.sort(Comparator.comparingInt(a -> (Integer) a.peek()));
		return iterators.getFirst().next();
	});
}

The general idea is the same but the fact that we had to create a peeking iterator, and store it in an array is disturbing. In Clojure, we manipulate lazy sequences as if they were ordinary data. In Java, we don’t have any real data, so we have to make our own way of accessing it. We have to create an intermediate array list to sort it when every item in the stream is generated. The same happens in Clojure, of course when we call sort-by, and this is possibly worse, as in Java we only create the array once, and sort it in place, and in Clojure, we create a new list every time. JVM is good at collecting garbage though, and the rate at which this sequence is consumed is far greater than the time to clean up the garbage, but it is a thing to consider. Java streams also don’t specify anything about the order and can be processed in parallel, so I’m not sure how my PeekingIterator would behave.

And all of that is simply because Java streams are a half-backed interface made in a rush, or at least it feels like that. Yes, it supports data pipelines with its own implementation of map, filter, etc., however, it makes me appreciate Clojure even more because it has one implementation of map that works across everything (also, it has transducers). In a more complete version of this Java library, we have to map over streams, transform them into iterators just to do some stuff, that is not implemented for streams, transform iterators back into streams, and so forth. The Clojure version is much more straightforward, and concise.

In hindsight, I wish it was easier to use Clojure from other JVM languages. It would save me the time it took to re-implement everything in Java for sure.

In the end, I hooked the old Clojure implementation to use the Java version as a core and retained the interface by converting streams to sequences via stream-seq!. It passed all of the library tests, so I moved on.