Friday, June 20, 2025
HomeProgrammingThe Fundamentals of Node.js Streams — SitePoint

The Fundamentals of Node.js Streams — SitePoint


Node.js is asynchronous and occasion pushed in nature. Because of this, it’s superb at dealing with I/O sure duties. If you’re engaged on an app that performs I/O operations, you possibly can benefit from the streams out there in Node.js. So, let’s discover Streams intimately and perceive how they’ll simplify I/O.

Key Takeaways

  • Node.js streams, that are asynchronous and event-driven, can simplify I/O operations by effectively dealing with knowledge in smaller, manageable chunks.
  • Streams could be categorized as Readable, Writable, Duplex (each readable and writable) or Remodel (modifying knowledge because it passes by means of).
  • The ‘pipe()‘ operate is a great tool in Node.js streams, permitting knowledge to be learn from a supply and written to a vacation spot with out manually managing the information circulation.
  • Fashionable Node.js supplies utilities like ‘stream.pipeline()‘ and ‘stream.completed()‘ together with Promise-based APIs for higher error dealing with and circulation management.
  • Streams can be utilized with async/await patterns for cleaner, extra maintainable code.

What are Streams

Streams in Node.js are impressed by Unix pipes and supply a mechanism to learn knowledge from a supply and pipe it to a vacation spot in a streaming style.

Merely put, a stream is nothing however an EventEmitter and implements some specials strategies. Relying on the strategies carried out, a stream turns into Readable, Writable, Duplex, or Remodel. Readable streams allow you to learn knowledge from a supply whereas writable streams allow you to write knowledge to a vacation spot.

You probably have already labored with Node.js, you will have come throughout streams. For instance, in a Node.js based mostly HTTP server, request is a readable stream and response is a writable stream. You may need used fs module which helps you to work with each readable and writable file streams.

Let’s perceive the several types of streams. On this article, we are going to focus totally on readable and writable streams, however may even briefly cowl Duplex and Remodel streams.

Readable Stream

A readable stream allows you to learn knowledge from a supply. The supply could be something. It may be a easy file in your file system, a buffer in reminiscence and even one other stream. As streams are EventEmitters, they emit a number of occasions at varied factors. We are going to use these occasions to work with the streams.

Studying From Streams

One of the simplest ways to learn knowledge from a stream is to hearken to knowledge occasion and fasten a callback. When a piece of information is out there, the readable stream emits a knowledge occasion and your callback executes. Check out the next snippet:


const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let knowledge = '';

readableStream.on('knowledge', operate(chunk) {
  knowledge += chunk;
});

readableStream.on('finish', operate() {
  console.log(knowledge);
});


readableStream.on('error', (err) => {
  console.error('Error studying stream:', err);
});

The operate name fs.createReadStream() offers you a readable stream. Initially, the stream is in a static state. As quickly as you hearken to knowledge occasion and fasten a callback it begins flowing. After that, chunks of information are learn and handed to your callback. The stream implementor decides how usually knowledge occasion is emitted. For instance, an HTTP request might emit a knowledge occasion as soon as a number of KB of information are learn. If you end up studying knowledge from a file you could determine you emit knowledge occasion as soon as a line is learn.

When there is no such thing as a extra knowledge to learn (finish is reached), the stream emits an finish occasion. Within the above snippet, we hearken to this occasion to get notified when the tip is reached.

With trendy ECMAScript options, we are able to rewrite this utilizing async/await:

const fs = require('fs');
const { Readable } = require('stream');
const { promisify } = require('util');

// Convert stream.on('finish') to a Promise
const streamToString = async (stream) => {
  const chunks = [];
  
  for await (const chunk of stream) {
    chunks.push(typeof chunk === 'string' ? chunk : chunk.toString());
  }
  
  return chunks.be part of('');
};

async operate readFile() {
  strive {
    const readableStream = fs.createReadStream('file.txt');
    const content material = await streamToString(readableStream);
    console.log(content material);
  } catch (err) {
    console.error('Error studying file:', err);
  }
}

readFile();

 Right here, we’re utilizing a number of newer JavaScript options:

  1. The for await...of loop permits us to iterate over async iterables (like streams in Node.js)
  2. We’re making a streamToString helper operate that collects all chunks from a stream and returns a Promise that resolves to the total string
  3. We wrap every little thing in a strive/catch block for correct error dealing with
  4. This method is extra linear and simpler to learn than the event-based method

Now there are two modes a Readable stream can function in:

1. Flowing mode – Information is learn routinely and offered as rapidly as doable by means of occasions
2. Paused mode – You could explicitly name learn() to get knowledge chunks repeatedly till each chunk of information has been learn.

const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let knowledge = '';
let chunk;

readableStream.on('readable', operate() {
  whereas ((chunk = readableStream.learn()) != null) {
    knowledge += chunk;
  }
});

readableStream.on('finish', operate() {
  console.log(knowledge);
});

The learn() operate reads some knowledge from the interior buffer and returns it. When there may be nothing to learn, it returns null. So, within the whereas loop we examine for null and terminate the loop. Be aware that the readable occasion is emitted when a piece of information could be learn from the stream.

Setting Encoding

By default the information you learn from a stream is a Buffer object. If you’re studying strings this is probably not appropriate for you. So, you possibly can set encoding on the stream by calling Readable.setEncoding(), as proven beneath.

const fs = require('fs');
const readableStream = fs.createReadStream('file.txt');
let knowledge = '';

readableStream.setEncoding('utf8');

readableStream.on('knowledge', operate(chunk) {
  knowledge += chunk;
});

readableStream.on('finish', operate() {
  console.log(knowledge);
});

Within the above snippet we set the encoding to utf8. Because of this, the information is interpreted as utf8 and handed to your callback as string.

Piping

Piping is a superb mechanism in which you’ll learn knowledge from the supply and write to vacation spot with out managing the circulation your self. Check out the next snippet:

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.pipe(writableStream);

The above snippet makes use of the pipe() operate to write down the content material of file1 to file2. As pipe() manages the information circulation for you, you shouldn’t fear about sluggish or quick knowledge circulation. This makes pipe() a neat device to learn and write knowledge. You also needs to be aware that pipe() returns the vacation spot stream. So, you possibly can simply make the most of this to chain a number of streams collectively. Let’s see how!

Nonetheless, one limitation of pipe() is that it doesn’t present good error dealing with. That is the place trendy Node.js supplies higher utilities:

const fs = require('fs');
const { pipeline } = require('stream');
const { promisify } = require('util');

const pipelineAsync = promisify(pipeline);

async operate copyFile() {
  strive {
    const readableStream = fs.createReadStream('file1.txt');
    const writableStream = fs.createWriteStream('file2.txt');
    
    await pipelineAsync(readableStream, writableStream);
    console.log('File copied efficiently');
  } catch (err) {
    console.error('Pipeline failed:', err);
  }
}

copyFile();

Right here:

  1. We’re utilizing the pipeline operate from the stream module, which routinely handles errors and useful resource cleanup.
  2. We convert the callback-based pipeline to a Promise utilizing promisify
  3. We are able to then use async/await for a cleaner circulation.
  4. All errors are correctly caught in a single strive/catch block.
  5. If any stream within the pipeline emits an error, pipeline routinely destroys all streams and calls the callback with the error.

Chaining

Assume that you’ve an archive and wish to decompress it. There are a selection of the way to realize this. However the best and cleanest method is to make use of piping and chaining. Take a look on the following snippet:

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('enter.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

First, we create a easy readable stream from the file enter.txt.gz. Subsequent, we pipe this stream into one other stream zlib.createGunzip() to un-gzip the content material. Lastly, as streams could be chained, we add a writable stream so as to write the un-gzipped content material to the file.

A extra sturdy method utilizing pipeline:

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');

pipeline(
  fs.createReadStream('enter.txt.gz'),
  zlib.createGunzip(),
  fs.createWriteStream('output.txt'),
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

Right here we’re utilizing pipeline with a number of streams:

  1. Not like pipe() which doesn’t correctly ahead errors, pipeline handles errors from any stream within the chain.
  2. If any stream within the pipeline fails (like if the file doesn’t exist or the content material isn’t legitimate gzip), the callback receives the error.
  3. Pipeline routinely cleans up sources by destroying all streams if any stream errors.
  4. The final argument is a callback that tells us if the operation succeeded or failed.

Further Strategies

We mentioned among the vital ideas in readable streams. Listed here are some extra stream strategies you want to know:

  1. Readable.pause() – This methodology pauses the stream. If the stream is already flowing, it received’t emit knowledge occasions anymore. The information might be stored in buffer. When you name this on a static (non-flowing) stream, there is no such thing as a impact and the stream stays paused.
  2. Readable.resume() – Resumes a paused stream.
  3. readable.unpipe() – This removes vacation spot streams from pipe locations. If an argument is handed, it stops the readable stream from piping into the actual vacation spot stream. In any other case, all of the vacation spot streams are eliminated.

Writable Streams

Writable streams allow you to write knowledge to a vacation spot. Like readable streams, these are additionally EventEmitters and emit varied occasions at varied factors. Let’s see varied strategies and occasions out there in writable streams.

Writing to Streams

To jot down knowledge to a writable stream you want to name write() on the stream occasion. The next snippet demonstrates this method.

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('knowledge', operate(chunk) {
  writableStream.write(chunk);
});

The above code is simple. It merely reads chunks of information from an enter stream and writes to the vacation spot utilizing write(). This operate returns a Boolean worth indicating if the operation was profitable.

The return worth of writableStream.write(chunk) signifies whether or not the interior buffer is prepared for extra knowledge, which is essential for dealing with backpressure:

  • true: The information was efficiently written, and you may proceed writing extra knowledge instantly.
  • false: The inner buffer is full (reaching the highWaterMark restrict). It doesn’t imply an error occurred however alerts that it is best to pause writing to stop overloading the buffer. It’s best to look ahead to the 'drain' occasion earlier than resuming writing.

A greater method that handles backpressure:

const fs = require('fs');
const readableStream = fs.createReadStream('file1.txt');
const writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('knowledge', operate(chunk) {
  
  const canContinue = writableStream.write(chunk);
  if (!canContinue) {
    readableStream.pause();
  }
});

writableStream.on('drain', operate() {
  
  readableStream.resume();
});

readableStream.on('finish', operate() {
  writableStream.finish();
});


readableStream.on('error', (err) => {
  console.error('Learn error:', err);
  writableStream.finish();
});

writableStream.on('error', (err) => {
  console.error('Write error:', err);
});

This instance handles backpressure, which is a vital idea in streams:

  1. When write() returns false, it means the interior buffer is full, and we must always cease sending extra knowledge.
  2. We pause the readable stream to cease receiving knowledge quickly.
  3. When the writable stream emits ‘drain’, it means the buffer has emptied and we are able to resume studying.
  4. We’ve additionally added correct error dealing with for each streams.
  5. When studying completes, we name finish() on the writable stream to sign completion.
  6. This method prevents reminiscence from rising unbounded when the author can’t sustain with the reader.

Finish of Information

If you don’t have extra knowledge to write down you possibly can merely name finish() to inform the stream that you’ve completed writing. Assuming res is an HTTP response object, you usually do the next to ship the response to browser:

res.write('Some Information!!');
res.finish('Ended.');

When finish() is known as and each chunk of information has been flushed, a end occasion is emitted by the stream. Simply be aware that you would be able to’t write to the stream after calling finish(). For instance, the next will lead to an error.

res.write('Some Information!!');
res.finish();
res.write('Making an attempt to write down once more'); 

Listed here are some vital occasions associated to writable streams:

  1. error – Emitted to point that an error has occurred whereas writing/piping.
  2. pipe – When a readable stream is piped right into a writable stream, this occasion is emitted by the writable stream.
  3. unpipe – Emitted while you name unpipe on the readable stream and cease it from piping into the vacation spot stream.

Duplex and Remodel Streams

Duplex streams are readable and writable streams mixed. They keep two separate inner buffers, one for studying and one for writing, which function independently from one another.

Duplex streams are helpful while you want simultaneous however separate enter and output streams, resembling in community sockets (like TCP).

const { Duplex } = require('stream');


const myDuplex = new Duplex({
  learn(measurement) {
    
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  },
  write(chunk, encoding, callback) {
    
    console.log(chunk.toString());
    callback();
  }
});


myDuplex.currentCharCode = 65; 

This instance creates a customized Duplex stream:

  1. The learn() methodology generates uppercase letters from A to Z (ASCII codes 65-90).
  2. Every time learn() is known as, it pushes the following letter and increments the counter.
  3. After we attain ‘Z’, we push null to sign the tip of the learn stream.
  4. The write() methodology merely logs any knowledge written to the stream to the console.
  5. Duplex streams are helpful while you want unbiased learn and write operations in a single stream.

Remodel streams are a particular kind of Duplex stream that may modify or remodel the information as it’s written and skim. Not like Duplex streams, the place the enter and output are separate, Remodel streams have their output instantly associated to the enter. Typical examples embody zlib streams for compression/decompression and crypto streams for encryption/decryption.

const { Remodel } = require('stream');


const upperCaseTr = new Remodel({
  remodel(chunk, encoding, callback) {
    
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});


course of.stdin
  .pipe(upperCaseTr)
  .pipe(course of.stdout);

This Remodel stream instance:

  1. Creates a remodel stream that converts enter textual content to uppercase.
  2. The remodel() methodology takes enter chunks, transforms them, and pushes them to the output.
  3. We’re piping from customary enter, by means of our transformer, to straightforward output.
  4. If you run this code, something you kind might be displayed in uppercase.
  5. Remodel streams are perfect for processing or modifying knowledge because it flows by means of, like parsing JSON, changing encodings, or encrypting knowledge.

Conclusion

This was all in regards to the fundamentals of streams. Streams, pipes, and chaining are the core and strongest options in Node.js. If used responsibly, streams can certainly enable you write neat and performant code to carry out I/O. Simply be certain to deal with stream errors and shut streams appropriately to stop reminiscence leaks.

With the newer additions to the Node.js API like stream.pipeline(), stream.completed(), and Promise-based stream APIs, dealing with streams has turn into extra sturdy and simpler to work with. When coping with giant quantities of information, streams must be your go-to resolution for environment friendly reminiscence utilization and efficiency.

What are Node.js Streams?

What are Node.js streams?

Node.js streams are a characteristic of the Node.js customary library that will let you work with knowledge in a extra environment friendly and scalable method, by processing it in smaller, extra manageable chunks, versus loading whole knowledge units into reminiscence.

What are the principle kinds of Node.js streams?

Node.js streams are available 4 predominant varieties: Readable, Writable, Duplex, and Remodel. Readable streams are for studying knowledge, Writable streams are for writing knowledge, Duplex streams enable each studying and writing, and Remodel streams modify the information because it passes by means of.

How do I create a Readable stream in Node.js?

To create a Readable stream, you need to use the stream.Readable class offered by Node.js. You’ll be able to lengthen this class and implement the _read methodology to offer knowledge to be learn.

What are the frequent use instances for Readable streams?

Readable streams are helpful for studying giant information, processing knowledge from exterior sources like HTTP requests, and dealing with knowledge in real-time, resembling log file monitoring.

How do I create a Writable stream in Node.js?

To create a Writable stream, you need to use the stream.Writable class offered by Node.js. You could implement the _write methodology to deal with knowledge because it’s written to the stream.

What are some frequent makes use of of Writable streams?

Writable streams are used for saving knowledge to information, sending knowledge to exterior companies, or processing and filtering knowledge because it’s written.

What’s a Duplex stream in Node.js?

A Duplex stream is a mix of a Readable and Writable stream, permitting each studying and writing. It’s helpful when you want to remodel knowledge whereas additionally offering an interface for additional knowledge enter.

What are Remodel streams and when ought to I exploit them?

Remodel streams are a subclass of Duplex streams that enable knowledge to be modified because it passes by means of. They’re usually used for duties like knowledge compression, encryption, and parsing.

How can I pipe knowledge between streams in Node.js?

You’ll be able to pipe knowledge between streams utilizing the .pipe() methodology. For instance, you possibly can pipe knowledge from a Readable stream to a Writable stream, permitting for environment friendly knowledge switch with out manually managing the information circulation.

Are there any greatest practices for working with Node.js streams?

Some greatest practices embody utilizing streams for dealing with giant datasets effectively, dealing with errors and backpressure accurately, and utilizing the util.promisify operate for working with streams in a extra promise-friendly method.

What are some great benefits of utilizing streams.pipeline() over pipe()?

The streams.pipeline() methodology supplies automated error dealing with and cleanup of sources when an error happens, which pipe() doesn’t. It additionally supplies a callback when the operation completes or errors, and has a Promise-based model to be used with async/await.

How can I convert conventional callback-based stream operations to make use of Guarantees?

You need to use the util.promisify() operate to transform callback-based stream strategies to Promise-based ones. Moreover, Node.js now supplies built-in Promise-based APIs for streams within the ‘stream/guarantees’ module ranging from Node.js 15.0.0.

How do I deal with backpressure in Node.js streams?

Backpressure happens when a writable stream can’t sustain with the readable stream offering knowledge. You’ll be able to deal with this by monitoring the return worth of the write() methodology and pausing the readable stream if it returns false, then resuming when the ‘drain’ occasion is emitted.

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments