Process a stream of data using a gatherer

Discover how to build a gatherer using the java.util.stream package By Nathan Good This article was originally published on IBM Developer. Since Java SE 8, streams in the java.util.stream package have provided the means to process large amounts of data using syntax that is also concise and declarative. As of Java 23, there is an addition to the Streams API called gatherers that allows you to build a stream processing element to "...transform elements in a one-to-one, one-to-many, many-to-one, or many-to-many fashion" (read JEP 461: Stream Gatherers (Preview)). This feature is still a preview feature as of Java 23, but you can easily enable the preview feature for a single project to try out the features before they're generally available in a release. About this series With the java.util.stream package, you can concisely and declaratively express possibly-parallel bulk operations on collections, arrays, and other data sources. In this series, gain a comprehensive understanding of the Streams library and learn how to use it to best advantage. This article builds off the other articles in the series that explores java.util.stream library by demonstrating how you can use gatherers to process data. The sample code in this article is in this GitHub repo. Using a gatherer to process data A few months ago, I was talking to someone about a scenario that is a perfect use case for using a gatherer to process a stream of data. The scenario was posting near-real-time transcription of city council meetings to a social media platform. The social media platform limits the size of its post to only 300 characters, which means that I have to chunk data (as it comes in) into readable chunks that are made up of whole words. That is, it's not good enough to just chunk the data into 300 characters--I need to keep track of words, so that a post doesn't just cut off in the middle of a word. Stream processing is perfect for something like this. First, when I start processing, I have no idea how large the amount of data is going to be. It could be just a minutes of transcribed text or it could be several hours of transcribed text. I want to process this in near-real-time, which means I cannot wait until I have all these text, load it into memory, and then carve it up like I may have been tempted to do before the Java Streams library was available. So, in the example of using gatherers in this article, I will demonstrate how to process an unknown amount of data, in a stream of characters, into another stream of Java Strings that contain full words but no longer than a specified maximum length. The code will end up looking like this: List posts = srcBuffer.lines() .gather(new CharactersGatherer().andThen( new MediaPostGatherer(maxLength.intValue()))).toList(); But first, you will need to enable preview features if you're using either Java 22 or 23. Enabling preview features As I mentioned in the introduction, gatherers are an preview feature starting in Java 22 and still as of Java 23. In Java, you can enable the features with a flag that you provide to the JVM: --enable-preview. If you are like me, you use an integrated development environment (IDE) like Visual Studio Code or you use Apache Maven to manage your Java project, so I'll cover here how to enable preview features in them. Enabling preview features in VS Code If you do not enable Java preview features in VS Code, references to the Gatherer interface will look like compile errors--as if the interfaces do not exist. A simple way to enable Java preview features in VS Code is to use Ctrl+Shift+P and then search for "Java: Open Project Settings." On the Compiler tab, select Enable preview features. Note: these are project-specific settings, so it is safe to turn this on for a project while you experiment with the code without affecting your other Java projects. Enabling preview features in Apache Maven If you want to use Apache Maven to manage your project, you can create the project using the Maven archetype and then optionally add the Maven wrapper: $ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes \ -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.5 $ mvn wrapper:wrapper To enable Java previews in Maven for both compiling Java projects, you configure the compiler plugin in pom.xml: maven-compiler-plugin 3.13.0 true The Maven archetype created a .mvn/jvm.config file in my project, into which I added the following line so that I can use Maven to run my Java project with preview features enabled: --enable-preview Among other things, this allows me to use the ./mvnw exec:java task to execute the main Java class in the project. Using the Maven exec plugin is not strictly necessary, but it makes running the Ja

Mar 17, 2025 - 14:23
 0
Process a stream of data using a gatherer

Discover how to build a gatherer using the java.util.stream package

By Nathan Good

This article was originally published on IBM Developer.

Since Java SE 8, streams in the java.util.stream package have provided the means to process large amounts of data using syntax that is also concise and declarative.

As of Java 23, there is an addition to the Streams API called gatherers that allows you to build a stream processing element to "...transform elements in a one-to-one, one-to-many, many-to-one, or many-to-many fashion" (read JEP 461: Stream Gatherers (Preview)).

This feature is still a preview feature as of Java 23, but you can easily enable the preview feature for a single project to try out the features before they're generally available in a release.

About this series

With the java.util.stream package, you can concisely and declaratively express possibly-parallel bulk operations on collections, arrays, and other data sources. In this series, gain a comprehensive understanding of the Streams library and learn how to use it to best advantage.

This article builds off the other articles in the series that explores java.util.stream library by demonstrating how you can use gatherers to process data. The sample code in this article is in this GitHub repo.

Using a gatherer to process data

A few months ago, I was talking to someone about a scenario that is a perfect use case for using a gatherer to process a stream of data.

The scenario was posting near-real-time transcription of city council meetings to a social media platform. The social media platform limits the size of its post to only 300 characters, which means that I have to chunk data (as it comes in) into readable chunks that are made up of whole words. That is, it's not good enough to just chunk the data into 300 characters--I need to keep track of words, so that a post doesn't just cut off in the middle of a word.

Stream processing is perfect for something like this. First, when I start processing, I have no idea how large the amount of data is going to be. It could be just a minutes of transcribed text or it could be several hours of transcribed text. I want to process this in near-real-time, which means I cannot wait until I have all these text, load it into memory, and then carve it up like I may have been tempted to do before the Java Streams library was available.

So, in the example of using gatherers in this article, I will demonstrate how to process an unknown amount of data, in a stream of characters, into another stream of Java Strings that contain full words but no longer than a specified maximum length. The code will end up looking like this:

List posts = srcBuffer.lines()
    .gather(new CharactersGatherer().andThen(
        new MediaPostGatherer(maxLength.intValue()))).toList();

But first, you will need to enable preview features if you're using either Java 22 or 23.

Enabling preview features

As I mentioned in the introduction, gatherers are an preview feature starting in Java 22 and still as of Java 23. In Java, you can enable the features with a flag that you provide to the JVM: --enable-preview.

If you are like me, you use an integrated development environment (IDE) like Visual Studio Code or you use Apache Maven to manage your Java project, so I'll cover here how to enable preview features in them.

Enabling preview features in VS Code

If you do not enable Java preview features in VS Code, references to the Gatherer interface will look like compile errors--as if the interfaces do not exist.

A simple way to enable Java preview features in VS Code is to use Ctrl+Shift+P and then search for "Java: Open Project Settings." On the Compiler tab, select Enable preview features.

Note: these are project-specific settings, so it is safe to turn this on for a project while you experiment with the code without affecting your other Java projects.

Enabling preview features in Apache Maven

If you want to use Apache Maven to manage your project, you can create the project using the Maven archetype and then optionally add the Maven wrapper:

$ mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes \
  -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.5
$ mvn wrapper:wrapper

To enable Java previews in Maven for both compiling Java projects, you configure the compiler plugin in pom.xml:

  
    
      
        
          maven-compiler-plugin
          3.13.0
          
            true
          
        
        
      
    
  

The Maven archetype created a .mvn/jvm.config file in my project, into which I added the following line so that I can use Maven to run my Java project with preview features enabled:

--enable-preview

Among other things, this allows me to use the ./mvnw exec:java task to execute the main Java class in the project. Using the Maven exec plugin is not strictly necessary, but it makes running the Java app easier once my classpath starts to grow.

With preview features enabled in either your IDE or Apache Maven, you are now ready to write the implement of the Gatherer interface.

Understanding the Gatherer interface

The declaration of the Gatherer interface from the java.util.stream package looks like this:

public interface Gatherer {
    ...
}

Where T is the type of the objects in the incoming stream. A is the type of the aggregator, which is basically just a holding area if you need it for any context while doing the processing. R is the return type, which is the type of the object that will be in the outgoing stream.

Because the first gatherer in the example processes a stream of strings into a stream of characters, in sequence from all the strings, the implementation of the Gatherer interface looks like this:

public class CharactersGatherer implements Gatherer {
    ...
}

You can read about the integrator and finisher for this implementation in the next two sections.

The second gatherer will accept a stream of characters and gather the characters into strings of characters, with each string containing whole words that are limited to a certain character length.

It wasn't strictly necessary to have done this in two steps, but doing it this way for purposes of the example is useful for two reasons. First, it demonstrates how gathers can be chained. This is explored in greater detail in "Chaining gatherers together". Second, it demonstrates two different implementations, where one of the implementations doesn't even need to use the aggregation.

Understanding the integrator

One of the methods of the Gatherer interface is integrator(), which returns a Integrator.

At first, the Integrator interface declaration can seem a little non-intuitive because the types are in different order than the Gatherer.

The integrator uses the aggregator (A) as a holding area or class while integrating the input type (T) into the return type (R). For the uses of this example, the important method of the Integrator interface is integrate:

boolean integrate(A state, T element, Downstream downstream);

The integrate method processes incoming element T, using state A and pushes the result downstream (Downstream) if necessary.

For the first gatherer, CharactersGatherer, the integrator method looks like this:


    @Override
    public Integrator integrator() {
        return Integrator.of(
                (buffer, str, downstream) -> {
                    str.chars().forEachOrdered(c -> {
                        if ((char)c != '\n') {
                            downstream.push(Character.valueOf((char) c));
                        }
                    });
                    // There is no point at which we would stop processing the stream.
                    return true;
                });
    }

The code processes each incoming String, str, and for each str simply pushes each character (in the stream of characters returned by char()) downstream. There is no other processing, except for just skipping the newlines because we don't need them. The buffer, of type CharBuffer (A) is not used in this method.

The second gatherer, MediaPostGatherer is a bit more complicated. It needs to accept the incoming stream of characters and hold them until it reaches the maximum. The integrator then looks at the current character once the maximum has been reached. If the character is a space, then it sends what it has in the state to the downstream. If it is not a space, it rewinds the characters until it finds a space and then sends that downstream, holding back the remainder in the state for the next chunk.

@Override
    public Integrator integrator() {
        return Integrator.of(
                (buffer, current, downstream) -> {
                    if (buffer.length() == 0 && current.equals(SPACE)) {
                        // don't add leading spaces.
                        return true;
                    }
                    // Before adding the Character to the buffer, check the size of the buffer
                    if (buffer.length() == maxLength) {
                        var tempBuffer = new StringBuffer();
                        // Now, take a look at the current c and make sure it's not space so we
                        // can just add it to the buffer.
                        if (!current.equals(SPACE)) {
                            buffer.append(current.charValue());
                            // roll back the buffer to the last space, taking the characters
                            // back out of the buffer and placing them on a temp
                            for (int i = buffer.length() - 1; i > 0; i--) {
                                var c = buffer.charAt(i);
                                if (c == SPACE.charValue()) {
                                    // Push the string downstream...
                                    downstream.push(buffer.substring(0, i + 1).trim());
                                    buffer.delete(0, buffer.length());
                                    buffer.append(tempBuffer.reverse());
                                    return true;
                                } else {
                                    tempBuffer.append(c);
                                }
                            }
                        } else {
                            // just add the character, push the string downstream and
                            // make sure to clear out the buffer;
                            downstream.push(buffer.toString());
                            buffer.delete(0, buffer.length());
                        }
                    }
                    buffer.append(current.charValue());
                    return true;
                });
    }

The integrator() method returns true here because there is no point at which it should stop processing. But it could return false and the gatherer will stop its processing for the current stream, which is useful for performance.

Now that you have seen how the integrate() methods are implemented in the two different gatherers.

Understanding the finisher

The purpose of the finisher() method of the Gatherer is to perform any remaining processing on the state that wasn't done in the integrate() method when it has come to the end of the stream.

In the MediaPostGatherer example, the StringBuffer used to hold the characters as they are processed will have whatever characters are left over when we have come to the end of the incoming stream of characters.

It should have fewer than the maximum characters, and in this case the method is simple, as it just flushes the rest of the characters downstream without any further processing.


    @Override
    public BiConsumer> finisher() {
        return (buffer, downstream) -> {
            if (!buffer.isEmpty()) {
                downstream.push(buffer.toString().trim());
            }
        };
    }

Chaining gatherers together

Up to this point, you have seen two different implementations of a Gatherer interface. A very powerful feature of gatherers is to use the andThen() method to chain gathers together. I can process a stream of strings into another stream of strings by chaining the two gatherers together: a stream of strings into a stream of characters, and then a stream of characters into a stream of strings of a maximum size. That provides flexibility by composability:

  List posts = srcBuffer.lines()
            .gather(new CharactersGatherer().andThen(
                new MediaPostGatherer(maxLength.intValue())
                )
            ).toList();

Running the complete example

Here is the complete example, as shown in a unit test. The unit test takes a file and then compares it to another file that contains the expected output. The source file does not have any line breaks; the file containing the expected output has a newline separating which each "post" might look like:

    public void testCreateMediaPosts(String inputFile, String expectedFile, Integer maxLength) {
        // Read from a file that has a bunch of text in it, and convert the content into
        // individual posts that are limited to 80 characters, but each one has full
        // words...
        try (
                InputStream isSrc = this.getClass().getClassLoader().getResourceAsStream("examples/%s".formatted(inputFile));
                InputStream isExpected = this.getClass().getClassLoader()
                        .getResourceAsStream("examples/%s".formatted(expectedFile));) {
            try (
                    InputStreamReader srcReader = new InputStreamReader(isSrc);
                    BufferedReader srcBuffer = new BufferedReader(srcReader);
                    InputStreamReader expReader = new InputStreamReader(isExpected);
                    BufferedReader expBuffer = new BufferedReader(expReader);) {
                List posts = srcBuffer.lines()
                        .gather(new CharactersGatherer().andThen(
                            new MediaPostGatherer(maxLength.intValue())
                            )
                        ).toList();
                List expected = expBuffer.lines().toList();
                assertEquals(expected.size(), posts.size());
                for (int i = 0; i < expected.size(); i++) {
                    assertEquals(expected.get(i), posts.get(i));
                }
            }
        } catch (Exception ex) {
            fail("Error while loading resources for tests:", ex);
        }
    }

Using built-in gatherers

In addition to writing your own gatherers, there are a number of built-in Gatherer implementations provided on the Gatherers class. These gatherers include fold, mapConcurrent, scan, windowFixed, and windowSliding.

An example, for instance, of using the built-in gatherer fold is shown here:

    @Test
    public void testFold() {
        var actual = Stream.of(1, 2, 3, 4, 5)
                .gather(
                        Gatherers.fold(
                                () -> Integer.valueOf(1),
                                (result, element) -> {
                                    return result * element;
                                }))
                // remember, output of a gatherer is a stream, so if we want a
                // single value, we just call findFirst()
                .findFirst()
                .get();

        // This would end up being 5 factorial
        assertEquals(Integer.valueOf(120), actual);
    }

You can use the built-in gatherers by themselves or compose them using the andThen method to chain them together.

Summary

In these examples, I demonstrated how to build two gatherers: one that processes a stream of strings into a stream of characters and one that processes the stream of characters into a stream of strings each with a maximum size.

The examples show how the gatherers are chained together to perform even more complex processing in a very concise syntax.