Как связаны контракты EventEmitter и Readable?nodejs-60

Контракты EventEmitter и Readable тесно связаны через механизм событийной модели Node.js. Readable поток является расширением EventEmitter, что формирует мощную абстракцию для работы с данными.

Иерархия наследования

const { Readable } = require('stream');
const { EventEmitter } = require('events');

console.log(Readable.prototype instanceof EventEmitter); // true

Это означает, что все Readable потоки:

  1. Наследуют все методы EventEmitter (on, emit, once и др.)
  2. Должны соблюдать контракт событийного интерфейса
  3. Могут быть использованы везде, где ожидается EventEmitter

Ключевые события Readable, реализующие контракт EventEmitter

1. Событие 'data'

const readable = new Readable({
  read() {
    this.push('Hello ');
    this.push('World');
    this.push(null); // EOF
  }
});

readable.on('data', (chunk) => {
  console.log('Received chunk:', chunk.toString());
});

2. Событие 'end'

readable.on('end', () => {
  console.log('No more data');
});

3. Событие 'error'

readable.on('error', (err) => {
  console.error('Stream error:', err);
});

Реализация контракта в Readable

Базовый скелет Readable класса:

class Readable extends EventEmitter {
  constructor(options) {
    super(); // Инициализация EventEmitter
    // ... инициализация состояния потока
  }

  _read(size) {
    // Абстрактный метод, должен быть реализован
  }

  pipe(destination) {
    // Реализация pipe использует EventEmitter под капотом
    this.on('data', (chunk) => {
      destination.write(chunk);
    });

    this.on('end', () => {
      destination.end();
    });
  }
}

Важные аспекты взаимодействия

  1. Режимы работы: Readable может работать в:

    • Режиме паузы (paused) - требует явного вызова read()
    • Режиме потока (flowing) - данные автоматически поступают через 'data'
  2. Буферизация: Readable управляет внутренним буфером, но уведомляет о данных через события

  3. Обратный давление: Реализуется через комбинацию событий и методов:

    writable.on('drain', () => {
      readable.resume();
    });
    

Пример полного цикла событий

const readable = fs.createReadStream('file.txt');

readable.on('open', (fd) => {
  console.log('File descriptor:', fd);
});

readable.on('ready', () => {
  console.log('Stream is ready');
});

readable.on('data', (chunk) => {
  console.log('Chunk size:', chunk.length);
  readable.pause(); // Приостановка потока
  setTimeout(() => readable.resume(), 1000);
});

readable.on('end', () => {
  console.log('All data consumed');
});

readable.on('close', () => {
  console.log('Stream closed');
});

Отличия от чистого EventEmitter

  1. Строгий контракт событий: Readable требует определенного набора событий
  2. Управление ресурсами: Автоматическое освобождение ресурсов при закрытии
  3. Внутреннее состояние: Специфические состояния (чтение, пауза, завершено)

Практическое значение связи

  1. Интероперабельность: Любой код, работающий с EventEmitter, может работать с Readable
  2. Композиция потоков: Возможность создавать цепочки обработчиков через pipe()
  3. Обработка ошибок: Единый механизм для всей экосистемы Node.js

Резюмируем:

Readable поток в Node.js — это специализированный EventEmitter с жестко определенным контрактом событий и дополнительной логикой управления потоком данных. Эта связь позволяет использовать всю мощь событийной модели для обработки данных, сохраняя при этом контроль над памятью и ресурсами. Понимание этой связи критически важно для эффективной работы с потоками в Node.js.