hotmess.codes

Photo by Marc Wieland

On streams and promises

July 04, 2019 by ChristianChristian Danielsen

I recently worked on a service that involved consuming large amounts of data, processing it in various ways, and then sending it somewhere else. This seemed like the perfect opportunity to dig into one of node’s core concepts: streams.

Streams are everywhere in node, and once you get the hang of using them, they are delightful. I found many solid articles that cover the basics, but I wanted to touch on something that came up while combining streams with another core JS concept: promises.

Let’s say we wanted to create a function that would stream a large data source to a file:

const streamToFile = (inputStream, filePath) => {
  const fileWriteStream = fs.createWriteStream(filePath)
  inputStream
    .pipe(fileWriteStream)
}

Assuming the arguments were valid, this would get the job done, but this function would start the streaming process and then return immediately with undefined. Not very helpful, as we probably would want to know when the stream had finished (or if something went wrong), so we could react accordingly.

Luckily, streams are all event emitters, so we can register listener callbacks on various events that the stream emits. Stream methods like .pipe and .on generally return the stream itself, so we can succinctly chain these registrations on the back of the .pipe call:

const reactToStreamEvents = error => {
  if (error) {
    console.log('Uh oh!')
  } else {
    console.log('All done!')
  }
}

const streamToFile = (inputStream, filePath, callback) => {
  const fileWriteStream = fs.createWriteStream(filePath)
  inputStream
    .pipe(fileWriteStream)
    .on('finish', callback)
    .on('error', callback)
}

// Some later usage...

streamToFile(
  someYugeReadableStream,
  '/path/to/bigFile.txt',
  reactToStreamEvents
)

This works, but promises provide a nicer way to accomplish the same thing, by wrapping the stream code:

const streamToFile = (inputStream, filePath) => {
  return new Promise((resolve, reject) => {
    const fileWriteStream = fs.createWriteStream(filePath)
    inputStream
      .pipe(fileWriteStream)
      .on('finish', resolve)
      .on('error', reject)
  })
}

Now streamToFile immediately returns a promise object that we can pass around, await in some other async function, or chain .then and .catch handlers to. It abstracts away the core of what we were doing with the reactToStreamEvents callback: signaling for success and failure, while offloading the responsibility of reacting to whatever code might be consuming the promise, giving us more flexibility. Neat.

While I think promises are pretty straightforward once you use them for a while, the async/await sugar syntax makes them really simple to reason about. If I can use either (which is almost always) I generally use async/await. In this case though, we cannot use async/await inside streamToFile, because the event emitter interface expects you to pass a function (a callback!) as the second argument to the .on(event, callback) method signature.

The un-sugary (but still quite sexy, imho) Promise executor function gives us just the callbacks we need: resolve and reject. As such, plain jane promises and streams play quite nicely together. In my next post, I’ll cover the peculiarities of testing our streamToFile function.


©2023 Christian Danielsen. All Rights Reserved.