Node.js Streams Guide: Readable, Writable, Transform, Duplex, Pipeline, Backpressure & Performance
Master Node.js Streams: Readable, Writable, Transform, Duplex, and PassThrough streams. Learn pipeline API, backpressure handling, object mode, async iterators, HTTP streaming, file streaming, Web Streams API compatibility, and real-world patterns for CSV processing, log parsing, and high-performance data transformation.
- Streams process data chunk by chunk with constant memory usage regardless of file size
- Prefer pipeline() over pipe() — it handles error propagation and stream cleanup automatically
- Backpressure is the key concept: pause writing when write() returns false and wait for the drain event
- Async iterators (for await...of) are the cleanest way to consume Readable streams
- Object mode lets streams process structured data instead of raw buffers
- Transform streams are the building blocks for data processing pipelines
- Web Streams API is available in Node.js for cross-runtime compatibility
1. Stream Fundamentals
Streams in Node.js are abstract interfaces for working with streaming data. Instead of loading all data into memory at once, they process it in small chunks incrementally. This makes it possible to handle gigabytes or even terabytes of data with only a few megabytes of memory.
| Stream Type | Description | Common Examples |
|---|---|---|
Readable | Source of data you can read from | fs.createReadStream, http.IncomingMessage, process.stdin |
Writable | Destination for data you can write to | fs.createWriteStream, http.ServerResponse, process.stdout |
Transform | Modifies data as it passes through | zlib.createGzip, crypto.createCipher |
Duplex | Independent readable and writable sides | net.Socket, WebSocket |
PassThrough | Passes data through without modification | Testing, monitoring, stream teeing |
2. Creating Custom Streams
Readable Stream
Custom Readable streams implement the _read() method. Node.js calls this method when the consumer requests more data. Use this.push() to send chunks, and push(null) to signal the end of the stream.
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max) {
super(); // default: Buffer mode
this.max = max;
this.current = 0;
}
_read() {
if (this.current <= this.max) {
this.push(String(this.current++) + '\n');
} else {
this.push(null); // signal end of stream
}
}
}
const counter = new CounterStream(5);
counter.pipe(process.stdout);
// Output: 0 1 2 3 4 5Writable Stream
const { Writable } = require('stream');
class LogWriter extends Writable {
_write(chunk, encoding, callback) {
const line = chunk.toString().trim();
const timestamp = new Date().toISOString();
console.log(`[\${timestamp}] \${line}`);
callback(); // signal done, ready for next chunk
}
_final(callback) {
console.log('--- Log stream closed ---');
callback();
}
}
const logger = new LogWriter();
logger.write('Server started\n');
logger.end('Shutdown complete\n');Transform Stream
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
_flush(callback) {
// Called once at end — use for final cleanup
this.push('\n--- END ---\n');
callback();
}
}
// Usage: stdin → uppercase → stdout
process.stdin
.pipe(new UpperCaseTransform())
.pipe(process.stdout);3. Piping and Pipeline API
pipe() is the basic method for chaining streams, but it has a critical flaw: it does not propagate errors automatically. If a mid-chain stream errors, the source stream is not destroyed, causing memory leaks. pipeline() solves these problems.
pipe() vs pipeline()
| Feature | pipe() | pipeline() |
|---|---|---|
| Error propagation | Not automatic | Automatic to all streams |
| Stream cleanup | Streams not destroyed on error | All streams destroyed on error |
| Completion signal | Must listen to multiple events | Callback / Promise |
| Recommended for | Simple prototyping | Production code |
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
// pipe() — errors not propagated
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'));
// If gzip errors, readStream keeps reading!
// pipeline() — production-ready
async function compressFile(src, dest) {
await pipeline(
fs.createReadStream(src),
zlib.createGzip(),
fs.createWriteStream(dest)
);
console.log('Compression complete');
}
compressFile('input.txt', 'input.txt.gz')
.catch(err => console.error('Pipeline failed:', err));4. Backpressure Handling
Backpressure occurs when a writable stream cannot process data as fast as it receives it. If you ignore backpressure, data accumulates in memory indefinitely, eventually crashing the process. The write() method returns false when the internal buffer is full — this is the signal to pause writing.
const fs = require('fs');
const readable = fs.createReadStream('huge-file.csv');
const writable = fs.createWriteStream('output.csv');
// Manual backpressure handling
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Buffer is full — pause reading
readable.pause();
writable.once('drain', () => {
// Buffer drained — resume reading
readable.resume();
});
}
});
readable.on('end', () => writable.end());
// BETTER: pipe() handles backpressure automatically
// readable.pipe(writable);5. Object Mode Streams
By default, streams operate on Buffer or string data. Object mode allows streams to process any JavaScript value — objects, arrays, numbers, etc. The highWaterMark in object mode counts the number of objects rather than bytes.
const { Transform, Readable } = require('stream');
// Transform that filters JS objects
const filterAdults = new Transform({
objectMode: true,
transform(user, encoding, callback) {
if (user.age >= 18) {
this.push(user);
}
callback();
}
});
const formatJSON = new Transform({
objectMode: true,
writableObjectMode: true,
readableObjectMode: false,
transform(user, encoding, callback) {
this.push(JSON.stringify(user) + '\n');
callback();
}
});
// Feed objects into the pipeline
const users = Readable.from([
{ name: 'Alice', age: 25 },
{ name: 'Bob', age: 16 },
{ name: 'Charlie', age: 30 }
]);
users.pipe(filterAdults).pipe(formatJSON).pipe(process.stdout);
// {"name":"Alice","age":25}
// {"name":"Charlie","age":30}6. Stream Events and Error Handling
Each stream type emits specific events. Listening for these events correctly is the key to writing robust stream code. Unhandled error events will crash the process.
| Event | Stream Type | When Emitted |
|---|---|---|
| data | Readable | When a chunk of data is available |
| end | Readable | When there is no more data to read |
| drain | Writable | When buffer is drained and safe to write again |
| finish | Writable | When all data has been flushed to underlying system |
| error | All | When an error occurs |
| close | All | When the stream and its underlying resource are closed |
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');
async function safeProcess() {
try {
await pipeline(
fs.createReadStream('data.json'),
new Transform({
transform(chunk, enc, cb) {
try {
const parsed = JSON.parse(chunk);
cb(null, JSON.stringify(parsed) + '\n');
} catch (e) {
cb(new Error('Invalid JSON: ' + e.message));
}
}
}),
fs.createWriteStream('output.jsonl')
);
console.log('Processing complete');
} catch (err) {
// All streams are automatically destroyed
console.error('Pipeline error:', err.message);
}
}7. File Streaming
File streams are the most common use case for Node.js streams. fs.createReadStream() and fs.createWriteStream() process files chunk by chunk with constant memory. For large files, the difference versus readFile/writeFile is dramatic.
const fs = require('fs');
const { pipeline } = require('stream/promises');
const zlib = require('zlib');
// Read a 10GB log file — only ~64KB in memory
async function processLargeFile() {
const input = fs.createReadStream('server.log', {
highWaterMark: 64 * 1024, // 64KB chunks
encoding: 'utf8'
});
let lineCount = 0;
let errorCount = 0;
for await (const chunk of input) {
const lines = chunk.split('\n');
lineCount += lines.length;
errorCount += lines.filter(
l => l.includes('ERROR')
).length;
}
console.log(`Lines: \${lineCount}, Errors: \${errorCount}`);
}
// Compress a file with streaming
async function compressFile(src, dest) {
await pipeline(
fs.createReadStream(src),
zlib.createGzip({ level: 9 }),
fs.createWriteStream(dest)
);
}8. HTTP Streaming
Node.js HTTP requests and responses are both streams. http.IncomingMessage is a Readable stream and http.ServerResponse is a Writable stream. Leveraging this enables efficient large file uploads/downloads and request proxying.
const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const server = http.createServer(async (req, res) => {
if (req.url === '/download') {
// Stream a large file to the client
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Transfer-Encoding': 'chunked'
});
await pipeline(
fs.createReadStream('large-dataset.csv'),
res
);
}
if (req.url === '/upload' && req.method === 'POST') {
// Stream upload directly to disk
await pipeline(
req,
fs.createWriteStream('upload.bin')
);
res.end('Upload complete');
}
});
server.listen(3000);9. Async Iterators with Streams
Readable streams implement the async iterable protocol. Using for await...of loops to consume stream data is cleaner and handles backpressure automatically. This is the recommended way to consume streams in modern Node.js.
const fs = require('fs');
const readline = require('readline');
// Read file line by line with async iterators
async function processLines(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity
});
const stats = { total: 0, errors: 0, warnings: 0 };
for await (const line of rl) {
stats.total++;
if (line.includes('ERROR')) stats.errors++;
if (line.includes('WARN')) stats.warnings++;
}
return stats;
}
// Create a Readable from an async generator
const { Readable } = require('stream');
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield JSON.stringify({ id: i, ts: Date.now() }) + '\n';
}
}
const stream = Readable.from(generateData());
stream.pipe(process.stdout);10. Stream Composition Patterns
Combining multiple Transform streams into reusable data processing pipelines simplifies complex logic. Each Transform handles a single responsibility, composed via pipeline.
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');
// Reusable transforms
function parseCSVLine() {
let header = null;
return new Transform({
objectMode: true,
transform(line, enc, cb) {
const cols = line.toString().trim().split(',');
if (!header) { header = cols; return cb(); }
const obj = {};
header.forEach((h, i) => obj[h] = cols[i]);
cb(null, obj);
}
});
}
function filterBy(field, value) {
return new Transform({
objectMode: true,
transform(obj, enc, cb) {
if (obj[field] === value) this.push(obj);
cb();
}
});
}
function toJSON() {
return new Transform({
writableObjectMode: true,
readableObjectMode: false,
transform(obj, enc, cb) {
cb(null, JSON.stringify(obj) + '\n');
}
});
}
// Compose: CSV → parse → filter → JSON
await pipeline(
createReadStream('users.csv'),
parseCSVLine(),
filterBy('role', 'admin'),
toJSON(),
createWriteStream('admins.jsonl')
);11. Memory Efficiency and Performance
The core value of streams is memory efficiency. The comparison below shows the difference between stream and non-stream approaches for processing large files.
| Approach | 1GB File Memory | Use Case |
|---|---|---|
| fs.readFileSync() | ~1GB+ | Tiny config files only |
| fs.readFile() | ~1GB+ | Files under 100MB |
| createReadStream() | ~64KB | Any file size |
// Memory comparison: Buffer vs Stream
const fs = require('fs');
// BAD: loads entire file into memory
async function bufferApproach() {
const data = await fs.promises.readFile('big.csv', 'utf8');
const lines = data.split('\n'); // 2x memory!
return lines.filter(l => l.includes('ERROR'));
}
// GOOD: constant memory, processes line by line
async function streamApproach() {
const rl = require('readline').createInterface({
input: fs.createReadStream('big.csv')
});
const errors = [];
for await (const line of rl) {
if (line.includes('ERROR')) errors.push(line);
}
return errors;
}
// Performance tip: tune highWaterMark
// Default: 16KB for Readable, 16384 bytes
// Increase for sequential reads of large files
fs.createReadStream('data.bin', {
highWaterMark: 256 * 1024 // 256KB for throughput
});12. Web Streams API (Node.js Compatibility)
The Web Streams API (ReadableStream, WritableStream, TransformStream) is the WHATWG standard available in browsers, Deno, Cloudflare Workers, and Node.js. Node.js provides conversion methods for interoperability.
const { Readable } = require('stream');
// Convert Node stream → Web stream
const nodeReadable = Readable.from(['hello', ' ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);
// Consume with Web Streams API
const reader = webReadable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(value);
}
// Convert Web stream → Node stream
const webStream = new ReadableStream({
start(controller) {
controller.enqueue('data chunk 1');
controller.enqueue('data chunk 2');
controller.close();
}
});
const nodeStream = Readable.fromWeb(webStream);
nodeStream.pipe(process.stdout);13. Real-World Patterns
CSV Processing Pipeline
const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');
// Split buffer chunks into individual lines
function lineSplitter() {
let buffer = '';
return new Transform({
transform(chunk, enc, cb) {
buffer += chunk.toString();
const lines = buffer.split('\n');
buffer = lines.pop(); // keep partial line
for (const line of lines) {
if (line.trim()) this.push(line);
}
cb();
},
flush(cb) {
if (buffer.trim()) this.push(buffer);
cb();
}
});
}
async function processCSV(input, output) {
let header = null;
let count = 0;
await pipeline(
fs.createReadStream(input),
lineSplitter(),
new Transform({
objectMode: true,
transform(line, enc, cb) {
const cols = line.toString().split(',');
if (!header) { header = cols; return cb(); }
const row = {};
header.forEach((h, i) => row[h.trim()] = cols[i]);
count++;
cb(null, JSON.stringify(row) + '\n');
}
}),
fs.createWriteStream(output)
);
console.log(`Processed \${count} rows`);
}Log Parsing and Aggregation
const { Transform } = require('stream');
// Parse structured log lines into objects
function logParser() {
return new Transform({
objectMode: true,
transform(line, enc, cb) {
const match = line.toString().match(
/^\[(.*?)\]\s+(\w+)\s+(.*)/
);
if (match) {
this.push({
timestamp: new Date(match[1]),
level: match[2],
message: match[3]
});
}
cb();
}
});
}
// Aggregate counts by level
function aggregator() {
const counts = {};
return new Transform({
objectMode: true,
transform(entry, enc, cb) {
counts[entry.level] = (counts[entry.level] || 0) + 1;
cb();
},
flush(cb) {
this.push(JSON.stringify(counts, null, 2));
cb();
}
});
}Duplex Stream Example
const { Duplex } = require('stream');
class MessageProtocol extends Duplex {
constructor() {
super();
this._buffer = [];
}
_write(chunk, encoding, callback) {
// Writable side: receive raw bytes, frame them
const msg = chunk.toString().trim();
const framed = Buffer.from(
JSON.stringify({ len: msg.length, data: msg }) + '\n'
);
this._buffer.push(framed);
callback();
}
_read(size) {
// Readable side: output framed messages
const item = this._buffer.shift();
if (item) {
this.push(item);
} else {
setTimeout(() => this._read(size), 10);
}
}
}Conclusion
Node.js Streams are the core tool for handling large-scale data. Master the four stream types (Readable, Writable, Transform, Duplex) and their composition, use pipeline() over pipe() for error handling and resource cleanup, understand backpressure to prevent memory overflow, and leverage async iterators for cleaner code. When you need cross-runtime compatibility, use the Web Streams API and convert between the two APIs with toWeb()/fromWeb(). Stream-oriented thinking is the foundation of high-performance Node.js programming — any data larger than a few MB should be processed with streams.
Frequently Asked Questions
What are the four types of Node.js streams?
Node.js has four fundamental stream types: Readable (source of data, e.g., fs.createReadStream), Writable (destination for data, e.g., fs.createWriteStream), Transform (modifies data as it passes through, e.g., zlib.createGzip), and Duplex (both readable and writable independently, e.g., net.Socket). There is also PassThrough, a trivial Transform that passes data unchanged.
What is backpressure and how do you handle it?
Backpressure occurs when a writable stream cannot process data as fast as it receives it. When write() returns false, you must pause writing and wait for the drain event. pipe() and pipeline() handle backpressure automatically. Ignoring backpressure leads to excessive memory usage and potential crashes.
What is the difference between pipe() and pipeline()?
pipe() chains streams but does not propagate errors or clean up resources. If a mid-chain stream errors, the source keeps reading and may leak memory. pipeline() propagates errors to all streams, destroys all streams on error, and supports a callback or promise for completion. Always prefer pipeline() in production code.
How do async iterators work with streams?
Readable streams implement the async iterable protocol, so you can consume them with for await...of loops. Backpressure is handled automatically, and the stream is destroyed when the loop exits via break or throw.
What is object mode in Node.js streams?
By default, streams operate on Buffer or string data. Object mode (objectMode: true) allows streams to process any JavaScript value. The highWaterMark counts objects rather than bytes. Useful for parsed JSON records, database rows, or CSV lines.
How do you stream large files without running out of memory?
Use fs.createReadStream() to read the file in chunks and pipe it to a writable destination via pipeline(). Memory usage stays constant regardless of file size. Never use readFileSync() or readFile() for large files.
How do you handle errors in stream pipelines?
Use the pipeline() function which propagates errors to all streams and calls a single callback on completion or error. With the promise version (from stream/promises), wrap it in try/catch. Always handle stream errors to prevent unhandled exceptions from crashing the process.
What is the Web Streams API and how does it differ?
The Web Streams API is the WHATWG standard available in browsers, Deno, Workers, and Node.js. Node streams are more feature-rich with pipe(), pipeline(), and better backpressure. Web Streams are more portable. Node.js provides Readable.toWeb() and Readable.fromWeb() for conversion.