DevToolBoxGRATUIT
Blog

Node.js Streams : Guide complet Readable, Writable, Transform et Pipeline

13 minpar DevToolBox

Node.js Streams Guide: Readable, Writable, Transform, Duplex, Pipeline, Backpressure & Performance

Master Node.js Streams: Readable, Writable, Transform, Duplex, and PassThrough streams. Learn pipeline API, backpressure handling, object mode, async iterators, HTTP streaming, file streaming, Web Streams API compatibility, and real-world patterns for CSV processing, log parsing, and high-performance data transformation.

TL;DRNode.js Streams process data in chunks rather than loading everything into memory, making them essential for handling large files, HTTP requests, and real-time data. The four stream types (Readable, Writable, Transform, Duplex) compose via pipe() or the pipeline() API which handles error propagation and cleanup. Backpressure is automatic when using pipe but requires manual drain event handling with write(). Object mode streams process JavaScript objects instead of buffers. Async iterators (for await...of) provide the cleanest consumption pattern. The Web Streams API is available in Node.js for cross-platform compatibility.
Key Takeaways
  • Streams process data chunk by chunk with constant memory usage regardless of file size
  • Prefer pipeline() over pipe() — it handles error propagation and stream cleanup automatically
  • Backpressure is the key concept: pause writing when write() returns false and wait for the drain event
  • Async iterators (for await...of) are the cleanest way to consume Readable streams
  • Object mode lets streams process structured data instead of raw buffers
  • Transform streams are the building blocks for data processing pipelines
  • Web Streams API is available in Node.js for cross-runtime compatibility

1. Stream Fundamentals

Streams in Node.js are abstract interfaces for working with streaming data. Instead of loading all data into memory at once, they process it in small chunks incrementally. This makes it possible to handle gigabytes or even terabytes of data with only a few megabytes of memory.

Stream TypeDescriptionCommon Examples
ReadableSource of data you can read fromfs.createReadStream, http.IncomingMessage, process.stdin
WritableDestination for data you can write tofs.createWriteStream, http.ServerResponse, process.stdout
TransformModifies data as it passes throughzlib.createGzip, crypto.createCipher
DuplexIndependent readable and writable sidesnet.Socket, WebSocket
PassThroughPasses data through without modificationTesting, monitoring, stream teeing

2. Creating Custom Streams

Readable Stream

Custom Readable streams implement the _read() method. Node.js calls this method when the consumer requests more data. Use this.push() to send chunks, and push(null) to signal the end of the stream.

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super(); // default: Buffer mode
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current <= this.max) {
      this.push(String(this.current++) + '\n');
    } else {
      this.push(null); // signal end of stream
    }
  }
}

const counter = new CounterStream(5);
counter.pipe(process.stdout);
// Output: 0 1 2 3 4 5

Writable Stream

const { Writable } = require('stream');

class LogWriter extends Writable {
  _write(chunk, encoding, callback) {
    const line = chunk.toString().trim();
    const timestamp = new Date().toISOString();
    console.log(`[\${timestamp}] \${line}`);
    callback(); // signal done, ready for next chunk
  }

  _final(callback) {
    console.log('--- Log stream closed ---');
    callback();
  }
}

const logger = new LogWriter();
logger.write('Server started\n');
logger.end('Shutdown complete\n');

Transform Stream

const { Transform } = require('stream');

class UpperCaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }

  _flush(callback) {
    // Called once at end — use for final cleanup
    this.push('\n--- END ---\n');
    callback();
  }
}

// Usage: stdin → uppercase → stdout
process.stdin
  .pipe(new UpperCaseTransform())
  .pipe(process.stdout);

3. Piping and Pipeline API

pipe() is the basic method for chaining streams, but it has a critical flaw: it does not propagate errors automatically. If a mid-chain stream errors, the source stream is not destroyed, causing memory leaks. pipeline() solves these problems.

pipe() vs pipeline()

Featurepipe()pipeline()
Error propagationNot automaticAutomatic to all streams
Stream cleanupStreams not destroyed on errorAll streams destroyed on error
Completion signalMust listen to multiple eventsCallback / Promise
Recommended forSimple prototypingProduction code
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

// pipe() — errors not propagated
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'));
// If gzip errors, readStream keeps reading!

// pipeline() — production-ready
async function compressFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    zlib.createGzip(),
    fs.createWriteStream(dest)
  );
  console.log('Compression complete');
}

compressFile('input.txt', 'input.txt.gz')
  .catch(err => console.error('Pipeline failed:', err));

4. Backpressure Handling

Backpressure occurs when a writable stream cannot process data as fast as it receives it. If you ignore backpressure, data accumulates in memory indefinitely, eventually crashing the process. The write() method returns false when the internal buffer is full — this is the signal to pause writing.

const fs = require('fs');

const readable = fs.createReadStream('huge-file.csv');
const writable = fs.createWriteStream('output.csv');

// Manual backpressure handling
readable.on('data', (chunk) => {
  const canContinue = writable.write(chunk);
  if (!canContinue) {
    // Buffer is full — pause reading
    readable.pause();
    writable.once('drain', () => {
      // Buffer drained — resume reading
      readable.resume();
    });
  }
});

readable.on('end', () => writable.end());

// BETTER: pipe() handles backpressure automatically
// readable.pipe(writable);
Tip: Never ignore the return value of write(). If you keep writing to a stream that returned false, the highWaterMark buffer overflows and memory usage spikes dramatically. pipe() and pipeline() handle backpressure automatically — prefer them in production code.

5. Object Mode Streams

By default, streams operate on Buffer or string data. Object mode allows streams to process any JavaScript value — objects, arrays, numbers, etc. The highWaterMark in object mode counts the number of objects rather than bytes.

const { Transform, Readable } = require('stream');

// Transform that filters JS objects
const filterAdults = new Transform({
  objectMode: true,
  transform(user, encoding, callback) {
    if (user.age >= 18) {
      this.push(user);
    }
    callback();
  }
});

const formatJSON = new Transform({
  objectMode: true,
  writableObjectMode: true,
  readableObjectMode: false,
  transform(user, encoding, callback) {
    this.push(JSON.stringify(user) + '\n');
    callback();
  }
});

// Feed objects into the pipeline
const users = Readable.from([
  { name: 'Alice', age: 25 },
  { name: 'Bob', age: 16 },
  { name: 'Charlie', age: 30 }
]);

users.pipe(filterAdults).pipe(formatJSON).pipe(process.stdout);
// {"name":"Alice","age":25}
// {"name":"Charlie","age":30}

6. Stream Events and Error Handling

Each stream type emits specific events. Listening for these events correctly is the key to writing robust stream code. Unhandled error events will crash the process.

EventStream TypeWhen Emitted
dataReadableWhen a chunk of data is available
endReadableWhen there is no more data to read
drainWritableWhen buffer is drained and safe to write again
finishWritableWhen all data has been flushed to underlying system
errorAllWhen an error occurs
closeAllWhen the stream and its underlying resource are closed
const { pipeline } = require('stream/promises');
const { Transform } = require('stream');
const fs = require('fs');

async function safeProcess() {
  try {
    await pipeline(
      fs.createReadStream('data.json'),
      new Transform({
        transform(chunk, enc, cb) {
          try {
            const parsed = JSON.parse(chunk);
            cb(null, JSON.stringify(parsed) + '\n');
          } catch (e) {
            cb(new Error('Invalid JSON: ' + e.message));
          }
        }
      }),
      fs.createWriteStream('output.jsonl')
    );
    console.log('Processing complete');
  } catch (err) {
    // All streams are automatically destroyed
    console.error('Pipeline error:', err.message);
  }
}

7. File Streaming

File streams are the most common use case for Node.js streams. fs.createReadStream() and fs.createWriteStream() process files chunk by chunk with constant memory. For large files, the difference versus readFile/writeFile is dramatic.

const fs = require('fs');
const { pipeline } = require('stream/promises');
const zlib = require('zlib');

// Read a 10GB log file — only ~64KB in memory
async function processLargeFile() {
  const input = fs.createReadStream('server.log', {
    highWaterMark: 64 * 1024,  // 64KB chunks
    encoding: 'utf8'
  });

  let lineCount = 0;
  let errorCount = 0;

  for await (const chunk of input) {
    const lines = chunk.split('\n');
    lineCount += lines.length;
    errorCount += lines.filter(
      l => l.includes('ERROR')
    ).length;
  }

  console.log(`Lines: \${lineCount}, Errors: \${errorCount}`);
}

// Compress a file with streaming
async function compressFile(src, dest) {
  await pipeline(
    fs.createReadStream(src),
    zlib.createGzip({ level: 9 }),
    fs.createWriteStream(dest)
  );
}

8. HTTP Streaming

Node.js HTTP requests and responses are both streams. http.IncomingMessage is a Readable stream and http.ServerResponse is a Writable stream. Leveraging this enables efficient large file uploads/downloads and request proxying.

const http = require('http');
const fs = require('fs');
const { pipeline } = require('stream/promises');

const server = http.createServer(async (req, res) => {
  if (req.url === '/download') {
    // Stream a large file to the client
    res.writeHead(200, {
      'Content-Type': 'application/octet-stream',
      'Transfer-Encoding': 'chunked'
    });
    await pipeline(
      fs.createReadStream('large-dataset.csv'),
      res
    );
  }

  if (req.url === '/upload' && req.method === 'POST') {
    // Stream upload directly to disk
    await pipeline(
      req,
      fs.createWriteStream('upload.bin')
    );
    res.end('Upload complete');
  }
});

server.listen(3000);

9. Async Iterators with Streams

Readable streams implement the async iterable protocol. Using for await...of loops to consume stream data is cleaner and handles backpressure automatically. This is the recommended way to consume streams in modern Node.js.

const fs = require('fs');
const readline = require('readline');

// Read file line by line with async iterators
async function processLines(filePath) {
  const rl = readline.createInterface({
    input: fs.createReadStream(filePath),
    crlfDelay: Infinity
  });

  const stats = { total: 0, errors: 0, warnings: 0 };

  for await (const line of rl) {
    stats.total++;
    if (line.includes('ERROR')) stats.errors++;
    if (line.includes('WARN')) stats.warnings++;
  }

  return stats;
}

// Create a Readable from an async generator
const { Readable } = require('stream');

async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    yield JSON.stringify({ id: i, ts: Date.now() }) + '\n';
  }
}

const stream = Readable.from(generateData());
stream.pipe(process.stdout);

10. Stream Composition Patterns

Combining multiple Transform streams into reusable data processing pipelines simplifies complex logic. Each Transform handles a single responsibility, composed via pipeline.

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const { createReadStream, createWriteStream } = require('fs');

// Reusable transforms
function parseCSVLine() {
  let header = null;
  return new Transform({
    objectMode: true,
    transform(line, enc, cb) {
      const cols = line.toString().trim().split(',');
      if (!header) { header = cols; return cb(); }
      const obj = {};
      header.forEach((h, i) => obj[h] = cols[i]);
      cb(null, obj);
    }
  });
}

function filterBy(field, value) {
  return new Transform({
    objectMode: true,
    transform(obj, enc, cb) {
      if (obj[field] === value) this.push(obj);
      cb();
    }
  });
}

function toJSON() {
  return new Transform({
    writableObjectMode: true,
    readableObjectMode: false,
    transform(obj, enc, cb) {
      cb(null, JSON.stringify(obj) + '\n');
    }
  });
}

// Compose: CSV → parse → filter → JSON
await pipeline(
  createReadStream('users.csv'),
  parseCSVLine(),
  filterBy('role', 'admin'),
  toJSON(),
  createWriteStream('admins.jsonl')
);

11. Memory Efficiency and Performance

The core value of streams is memory efficiency. The comparison below shows the difference between stream and non-stream approaches for processing large files.

Approach1GB File MemoryUse Case
fs.readFileSync()~1GB+Tiny config files only
fs.readFile()~1GB+Files under 100MB
createReadStream()~64KBAny file size
// Memory comparison: Buffer vs Stream
const fs = require('fs');

// BAD: loads entire file into memory
async function bufferApproach() {
  const data = await fs.promises.readFile('big.csv', 'utf8');
  const lines = data.split('\n'); // 2x memory!
  return lines.filter(l => l.includes('ERROR'));
}

// GOOD: constant memory, processes line by line
async function streamApproach() {
  const rl = require('readline').createInterface({
    input: fs.createReadStream('big.csv')
  });
  const errors = [];
  for await (const line of rl) {
    if (line.includes('ERROR')) errors.push(line);
  }
  return errors;
}

// Performance tip: tune highWaterMark
// Default: 16KB for Readable, 16384 bytes
// Increase for sequential reads of large files
fs.createReadStream('data.bin', {
  highWaterMark: 256 * 1024 // 256KB for throughput
});

12. Web Streams API (Node.js Compatibility)

The Web Streams API (ReadableStream, WritableStream, TransformStream) is the WHATWG standard available in browsers, Deno, Cloudflare Workers, and Node.js. Node.js provides conversion methods for interoperability.

const { Readable } = require('stream');

// Convert Node stream → Web stream
const nodeReadable = Readable.from(['hello', ' ', 'world']);
const webReadable = Readable.toWeb(nodeReadable);

// Consume with Web Streams API
const reader = webReadable.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  console.log(value);
}

// Convert Web stream → Node stream
const webStream = new ReadableStream({
  start(controller) {
    controller.enqueue('data chunk 1');
    controller.enqueue('data chunk 2');
    controller.close();
  }
});

const nodeStream = Readable.fromWeb(webStream);
nodeStream.pipe(process.stdout);

13. Real-World Patterns

CSV Processing Pipeline

const { Transform } = require('stream');
const { pipeline } = require('stream/promises');
const fs = require('fs');

// Split buffer chunks into individual lines
function lineSplitter() {
  let buffer = '';
  return new Transform({
    transform(chunk, enc, cb) {
      buffer += chunk.toString();
      const lines = buffer.split('\n');
      buffer = lines.pop(); // keep partial line
      for (const line of lines) {
        if (line.trim()) this.push(line);
      }
      cb();
    },
    flush(cb) {
      if (buffer.trim()) this.push(buffer);
      cb();
    }
  });
}

async function processCSV(input, output) {
  let header = null;
  let count = 0;

  await pipeline(
    fs.createReadStream(input),
    lineSplitter(),
    new Transform({
      objectMode: true,
      transform(line, enc, cb) {
        const cols = line.toString().split(',');
        if (!header) { header = cols; return cb(); }
        const row = {};
        header.forEach((h, i) => row[h.trim()] = cols[i]);
        count++;
        cb(null, JSON.stringify(row) + '\n');
      }
    }),
    fs.createWriteStream(output)
  );

  console.log(`Processed \${count} rows`);
}

Log Parsing and Aggregation

const { Transform } = require('stream');

// Parse structured log lines into objects
function logParser() {
  return new Transform({
    objectMode: true,
    transform(line, enc, cb) {
      const match = line.toString().match(
        /^\[(.*?)\]\s+(\w+)\s+(.*)/
      );
      if (match) {
        this.push({
          timestamp: new Date(match[1]),
          level: match[2],
          message: match[3]
        });
      }
      cb();
    }
  });
}

// Aggregate counts by level
function aggregator() {
  const counts = {};
  return new Transform({
    objectMode: true,
    transform(entry, enc, cb) {
      counts[entry.level] = (counts[entry.level] || 0) + 1;
      cb();
    },
    flush(cb) {
      this.push(JSON.stringify(counts, null, 2));
      cb();
    }
  });
}

Duplex Stream Example

const { Duplex } = require('stream');

class MessageProtocol extends Duplex {
  constructor() {
    super();
    this._buffer = [];
  }

  _write(chunk, encoding, callback) {
    // Writable side: receive raw bytes, frame them
    const msg = chunk.toString().trim();
    const framed = Buffer.from(
      JSON.stringify({ len: msg.length, data: msg }) + '\n'
    );
    this._buffer.push(framed);
    callback();
  }

  _read(size) {
    // Readable side: output framed messages
    const item = this._buffer.shift();
    if (item) {
      this.push(item);
    } else {
      setTimeout(() => this._read(size), 10);
    }
  }
}

Conclusion

Node.js Streams are the core tool for handling large-scale data. Master the four stream types (Readable, Writable, Transform, Duplex) and their composition, use pipeline() over pipe() for error handling and resource cleanup, understand backpressure to prevent memory overflow, and leverage async iterators for cleaner code. When you need cross-runtime compatibility, use the Web Streams API and convert between the two APIs with toWeb()/fromWeb(). Stream-oriented thinking is the foundation of high-performance Node.js programming — any data larger than a few MB should be processed with streams.

Frequently Asked Questions

What are the four types of Node.js streams?

Node.js has four fundamental stream types: Readable (source of data, e.g., fs.createReadStream), Writable (destination for data, e.g., fs.createWriteStream), Transform (modifies data as it passes through, e.g., zlib.createGzip), and Duplex (both readable and writable independently, e.g., net.Socket). There is also PassThrough, a trivial Transform that passes data unchanged.

What is backpressure and how do you handle it?

Backpressure occurs when a writable stream cannot process data as fast as it receives it. When write() returns false, you must pause writing and wait for the drain event. pipe() and pipeline() handle backpressure automatically. Ignoring backpressure leads to excessive memory usage and potential crashes.

What is the difference between pipe() and pipeline()?

pipe() chains streams but does not propagate errors or clean up resources. If a mid-chain stream errors, the source keeps reading and may leak memory. pipeline() propagates errors to all streams, destroys all streams on error, and supports a callback or promise for completion. Always prefer pipeline() in production code.

How do async iterators work with streams?

Readable streams implement the async iterable protocol, so you can consume them with for await...of loops. Backpressure is handled automatically, and the stream is destroyed when the loop exits via break or throw.

What is object mode in Node.js streams?

By default, streams operate on Buffer or string data. Object mode (objectMode: true) allows streams to process any JavaScript value. The highWaterMark counts objects rather than bytes. Useful for parsed JSON records, database rows, or CSV lines.

How do you stream large files without running out of memory?

Use fs.createReadStream() to read the file in chunks and pipe it to a writable destination via pipeline(). Memory usage stays constant regardless of file size. Never use readFileSync() or readFile() for large files.

How do you handle errors in stream pipelines?

Use the pipeline() function which propagates errors to all streams and calls a single callback on completion or error. With the promise version (from stream/promises), wrap it in try/catch. Always handle stream errors to prevent unhandled exceptions from crashing the process.

What is the Web Streams API and how does it differ?

The Web Streams API is the WHATWG standard available in browsers, Deno, Workers, and Node.js. Node streams are more feature-rich with pipe(), pipeline(), and better backpressure. Web Streams are more portable. Node.js provides Readable.toWeb() and Readable.fromWeb() for conversion.

𝕏 Twitterin LinkedIn
Cet article vous a-t-il aidé ?

Restez informé

Recevez des astuces dev et les nouveaux outils chaque semaine.

Pas de spam. Désabonnez-vous à tout moment.

Essayez ces outils associés

{ }JSON Formatter±Text Diff CheckerB64Base64 Encoder/Decoder

Articles connexes

Guide Docker Networking : Réseaux Bridge, Host et Overlay

Guide complet du réseau Docker : bridge, host, overlay et macvlan.

Guide du Rate Limiting API : strategies, algorithmes et implementation

Guide complet du rate limiting API. Token bucket, sliding window, leaky bucket avec exemples de code. Middleware Express.js, Redis distribue et bonnes pratiques.