Skip Links

Alexander Morse

A Passable Explanation - Node.js Streams

Posted on Aug 13, 2021 by Alexander Morse.

You’ve got a lot of options for handling asynchronous data in Node.js, each one has a slightly different personality. Promises? Easy to misunderstand at first, but friendly and reliable once you get to know them. A sort of asynchronous golden retriever. Events? Unfeeling and mercenary — they’ll follow orders to the letter. Good for people who like to delegate.

Then you've got streams, the stand-offish postal workers of the async world.

Streams work for you, but not with you, making it simple to receive and send data, of any size, to and from almost anywhere. Just take a source stream, a destination stream, and glue them together with the pipe method. It’s almost too easy!

Just don’t expect to get much say in how these streams go about their business — at best, you might get to provide a couple options during their construction. After that, they can feel a lot like black boxes with a nice API. Useful, but opaque.

Of course, streams aren’t that hard to grasp — the documentation for the Node.js Stream API explains in plain English how to go about creating one. These things aren’t inherently mysterious. I think a lot of the black-boxiness of the interface (at least for web developers like me) comes from the strong divide between stream implementors and consumers. Promises are useful everywhere, both in client code and libraries, so everyone tends to get used to them. By contrast, not every developer will need to write a custom stream — they’ll get by just fine using those written by others.

Like with Promises, though, the best way to get a feel for how streams work is to write a few from scratch. So let’s spend some time doing that.

Keep in mind that the goal of this article isn’t to provide a complete guide to implementing and using streams — in fact, I’m about to write some very silly code that ignores a number of real-world concerns (for example, I will use the term ‘backpressure’ exactly once in this article — that was it just now). Once you’ve got a handle on the general behavior of streams, you should be in a good place to learn about proper implementation — just try to learn it from somebody smarter than me.

Let's get started. There are really only two types of stream that need to be studied: Readable and Writable. Throughout the article, it will be useful to think of both in terms of the job that each is expected to perform. Once we understand what those jobs are, implementing new streams is a piece of cake.

Readable Streams

There are a few basic examples of Readables in the Node.js library that we could cover. For example, fs.createReadStream(filename) creates a Readable that outputs the contents of a file. Let’s forget about that for now, though, and skip straight to writing our own.

Implementing Readables

Again, the key thing to keep in mind about Readables is that they are objects with a job. Specifically, they have the job of giving us data. We don’t care how the Readable gets the data — file-reading, API-fetching, ferret-smuggling — anything goes.

Let’s take our example Readable. This will be a simple stream that emits a series of three letters: ‘a’, ‘b’, and ‘c’. After that, the stream will signal that it has finished emitting data. We’ll call it "alphabetReadable".

Let’s start by creating the Readable.

const { Readable } = require("stream");

const alphabetReadable = new Readable({});

// Use the readable somehow...?

Run this code, and it doesn’t throw an error, at least. But it does raise a good question - where exactly does a Readable’s data come from? And how do we get data out of it? Let’s start with the first question.

Remember, the Readable has the job of fetching data (somehow), and placing onto an internal buffer. But it doesn’t know how to do that job yet, so we need to teach it how. In this case, we can do that by passing the read function into the Readable constructor. See how we can make data available for reading with the push method:

const { Readable } = require("stream");

const alphabetReadable = new Readable({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},
});

Okay! Now we’ve taught our Readable how to do its job…and that’s really all there is to it. At this point, we should take a step back and let the underlying system handle things — streams don’t like to be micromanaged, even those we’ve written ourselves.

All we need to know is that the method will get called whenever more data should be pushed to the buffer. In fact, unless we end the stream by pushing ‘null’, we can expect the method will be called repeatedly. For example, if we removed the null value from the stream we’d get an infinite, repeating sequence of ‘abc’s.

Reading the Readable

So, now our Readable knows how to fetch and push the data we want to read from it. But now that we know how to implement a Readable, how do we actually use it?

When nothing is actively reading the Readable, we say that it is in “Paused Mode”. When it’s paused, we can read from it manually with the read method, specifying how many bytes we want to read off the buffer at once. Let’s read them off one-by-one:

const { Readable } = require("stream");

const alphabetReadable = new Readable({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},
});

// toString() converts each piece of data
// back into the original character.
alphabetReadable.read(1).toString(); // a
alphabetReadable.read(1).toString(); // b
alphabetReadable.read(1).toString(); // c
alphabetReadable.read(1); // null

Depending on the stream, manually using read like this can be cumbersome to work with. That’s why we typically work with Readables in “Flowing Mode”. In this mode, all data generated by the Readable will be pushed automatically into some destination. There are a few ways to turn on Flowing Mode, but one of the simplest ways is to listen on the Readable’s ‘data’ event, like so:

const { Readable } = require("stream");

const alphabetReadable = new Readable({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},
});

// Listen on the 'data' event for new data.
alphabetReadable.on("data", (data) => {
console.log(data.toString());
});

Now, each time some data is pushed to the buffer, it will automatically trigger the listener function.

There is another way of using Flowing Mode — the pipe method. But to fully appreciate how that works, let’s take a look at Writables next.

Writable Streams

Just like with Readables, Writables have a specific job. As you might expect, it’s job involves “writing” the data that we pass to it somehow. It could “write” the data in a number of different ways: it might write the data to a text file, or send it across the internet. Ours will just log the data to the console.

Once again, we need to teach the Writable how to do its job. Since we’ve seen the pattern already, I’ll go ahead and give a full example up front:

const { Writable } = require("stream");

const loggerWritable = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());

callback();
},
});

// Write some characters to be processed.
loggerWritable.write("a");
loggerWritable.write("b");

// Write 'c' and also end the stream.
loggerWritable.end("c");

As you can see, the pattern is mostly the same - we teach the Writable how to deal with new data by providing it with the write function. The biggest difference is in the function signature: you’ll notice there are three arguments instead of one.

The first argument to write, ‘chunk’, is the data that has been written to the buffer. The second is ‘encoding’, which specifies the character encoding (if the chunk is a string). The third argument, callback, is important. This should be called only when the current chunk is finished being processed — this ensures that all chunks are written in the correct order. Since it doesn’t take any time to log each chunk, we can just invoke the callback immediately.

Finally, let’s look at how we’re using the Writable. We write chunks of data to it with the writable.write(...) method, and we can signal that we won’t write anything further with the writable.end(...) method, which optionally takes one last chunk to be written.

We can teach our Writable to process data however we want — it could choose to ignore data it doesn’t like, or it could concatenate data over multiple chunks and output it all at once. Take this example, in which our Writable simply counts the number of chunks we’ve sent to it, only logging the number once the stream has ended:

const { Writable } = require("stream");

const loggerWritable = new Writable({
// Keep track of the chunks pushed to
// the stream.
write(chunk, encoding, callback) {
this.chunkCount = this.chunkCount || 0;
this.chunkCount++;
callback();
},

// Log our final total just before the
// stream ends.
final(callback) {
console.log(this.chunkCount);
callback();
},
});

loggerWritable.write("a");
loggerWritable.write("b");
loggerWritable.end("c");

Here, we make use of the final method — this is called after we’ve signaled that there is no more incoming data (by calling end), but before the stream closes entirely. It’s useful for cleaning up underlying resources, or for writing “final” bits of data. Like with the write method, we need to invoke the callback to signal when we’re done.

(You might notice the slightly awkward syntax in the write method that we use to ensure that chunkCount attribute is properly set. To this, I should probably repeat what I mentioned earlier: read the documentation if you want to write real-world streams. “Real” streams are usually implemented using prototypal inheritance, which allows for a finer-grained control over things like internal state. Passing constructor options works great for simple streams, but it can start to break down with more complex behaviors.)

So far as the basics go, this is all there is to Writables.

Duplex Streams

Pop quiz! What do you get when you cross a Readable and a Writable?

If you’ve caught onto the spirit of this article so far, your answer might sound something like this: you get an object that performs the jobs of both a Readable and Writable. We have a name for this object: the Duplex.

The most iconic Duplex is probably the TCP Socket, which is used to let two systems communicate over the internet - we can read data that the other system sends us, and write data that should be sent back.

I wouldn’t want to spoil this article by accidentally writing something useful, though. So let’s just combine our earlier Readable and Writable examples into a single Duplex:

const { Duplex } = require("stream");

const pointlessDuplex = new Duplex({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},

write(chunk, encoding, callback) {
console.log(chunk.toString());

callback();
},
});

// Use the Readable half
pointlessDuplex.on("data", (data) => {
console.log(data.toString());
});

// Use the Writable half
pointlessDuplex.write("c");
pointlessDuplex.write("d");
pointlessDuplex.end("e");

This might be the dumbest code I’ve ever written, and it’s facing some serious competition there. Usually, the Readable and Writable halves will have some sort of logical relation, but that relation will depend on the type of Duplex.

Transform Streams

The Transform is a special case of a Duplex. Like it, it has two halves: Readable and Writable. However these two halves aren’t just related — the output of the Readable half is directly informed by the input of the Writable half. That is, its job is to transform the written data in some way.

So how do we teach a Transform to do its job? We’ll use the same approach as we did with Readable and Writable, but now we’ll use a special transform method. Here’s an example that uppercases everything we write to it.

const { Transform } = require("stream");

const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});

// Use the Readable half
uppercaseTransform.on("data", (data) => {
console.log(data.toString());
});

// Use the Writable half
uppercaseTransform.write("a");
uppercaseTransform.write("b");
uppercaseTransform.end("c");

As you can see, the transform function is kind of a cross between read and write. It has the signature of write, but we’re expected to push data to the Readable buffer as if it was a read method. We also need to call the provided callback when we’re finished transforming the data.

As we saw with Writables, the transform function doesn’t need to be one-to-one. We also have access to the flush method, which is like final for Writables, except that it’s used for pushing data to the Readable buffer. Take this example:

const { Transform } = require("stream");

const concatTransform = new Transform({
// Store all received chunks and don't push
// anything.
transform(chunk, encoding, callback) {
this.chunks = this.chunks || [];
this.chunks.push(chunk.toString());
callback();
},

// Concatenate all chunks and push the whole
// thing at once.
flush(callback) {
const finalString = this.chunks.join("");
this.push(finalString);
callback();
},
});

// Use the Readable half
concatTransform.on("data", (data) => {
console.log(data.toString());
});

// Use the Writable half
concatTransform.write("a");
concatTransform.write("b");
concatTransform.end("c");

This stream will only ever push a single chunk to its Readable half — the concatenated sum of all the data written to the Writable half.

The 'pipe' Method

If you’ve seen Node.js streams covered elsewhere, or used them yourself, you might have noticed I’ve avoided talking about the Readable’s pipe method.

Sorry about that. Like I said at the start of this article, I think pipe makes things a little too simple. It’s easy to start using it as a shortcut, never needing to think too hard about what the connected streams are doing.

But now that we’ve spent time studying streams, we know exactly what each of them is doing. So now it’s time to make things a lot easier on ourselves.

Let’s start simple: pipe is a method of Readables, into which we pass a Writable. Using it changes the Readable into “Flowing” mode, so it will automatically write its data to the Writable.

Let’s take our simple Readable and Writable examples, and chain them together with pipe.

const { Readable, Writable } = require("stream");

const alphabetReadable = new Readable({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},
});

const loggerWritable = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());

callback();
},
});

alphabetReadable.pipe(loggerWritable);

(Incidentally, process.stdout is also a Writable stream. We could pipe the Readable into that to get a similar effect.)

That’s already pretty useful, but there’s more: the pipe function returns the same Writable we pass to it. That’s pretty useless on its own...unless the stream is not just a Writable, but a Duplex or (more likely) a Transform. Then it’s also a Readable. If it's a Readable, then we can call pipe on it again.

Let’s take the last example, but this time add our uppercase transform stream to the middle of the chain:

const { Readable, Writable, Transform } = require("stream");

const alphabetReadable = new Readable({
read(n) {
// Push some letters to the buffer.
this.push("a");
this.push("b");
this.push("c");

// End the stream.
this.push(null);
},
});

const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});

const loggerWritable = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());

callback();
},
});

alphabetReadable
.pipe(uppercaseTransform)
.pipe(loggerWritable);

And that’s all there is to pipe.

Object Mode

One last thing — if you’ve been experimenting with streams, you might have noticed that streams are picky about what values you can work with: by default, it only likes Buffers and Uint8Arrays, though they’re nice enough to automatically convert certain types for us, like strings. Try to write something like an object, though, and you’ll get an error.

But because streams are so useful, there is an option to turn on ‘object mode’, which lets us work with any JavaScript value we want. It changes a few things about the streams (instead of working with bytes, you’re working with discrete objects), but in general everything will work the same.

You can turn on objectMode by setting the following properties on your streams:

const { Readable, Writable, Duplex } = require("stream");

const myReadable = new Readable({
objectMode: true,
// ...
});

const myWritable = new Writable({
objectMode: true,
// ...
});

const myDuplex = new Duplex({
readableObjectMode: true,
writableObjectMode: true,
});

Conclusion

Once again, I should stress that I haven’t really covered how to write streams for real-world use cases — we looked at implementing them only to better understand how they work, and to hopefully make functions like pipe seem less magical. I don’t think that any of the remaining “real-world” concerns are especially hard to implement, but they do deserve careful consideration. Plus, you’ll find that you can do more tricks with streams than we’ve covered here. Read the docs.