"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.IOStream = void 0; const cancellable_1 = require("./cancellable"); const stream_1 = require("stream"); class IOStream extends stream_1.Duplex { constructor(impl) { super({}); this.impl = impl; this.pending = new Set(); this.cancellable = new cancellable_1.Cancellable(); } async _destroy(error, callback) { this.cancellable.cancel(); for (const operation of this.pending) { try { await operation; } catch (e) { } } try { await this.impl.close(); } catch (e) { } callback(error); } _read(size) { const operation = this.impl.read(size, this.cancellable) .then((data) => { const isEof = data.length === 0; if (isEof) { this.push(null); return; } this.push(data); }) .catch((error) => { if (this.impl.isClosed) { this.push(null); } this.emit("error", error); }); this.track(operation); } _write(chunk, encoding, callback) { let data; if (Buffer.isBuffer(chunk)) { data = chunk; } else { data = Buffer.from(chunk, encoding); } const operation = this.impl.write(data, this.cancellable) .then(() => { callback(null); }) .catch((error) => { callback(error); }); this.track(operation); } track(operation) { this.pending.add(operation); operation .catch(_ => { }) .finally(() => { this.pending.delete(operation); }); } } exports.IOStream = IOStream;