'use strict'; const safeBuffer = require('safe-buffer'); const Limiter = require('async-limiter'); const zlib = require('zlib'); const bufferUtil = require('./BufferUtil'); const Buffer = safeBuffer.Buffer; const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); const EMPTY_BLOCK = Buffer.from([0x00]); const kWriteInProgress = Symbol('write-in-progress'); const kPendingClose = Symbol('pending-close'); const kTotalLength = Symbol('total-length'); const kCallback = Symbol('callback'); const kBuffers = Symbol('buffers'); const kError = Symbol('error'); const kOwner = Symbol('owner'); // // We limit zlib concurrency, which prevents severe memory fragmentation // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913 // and https://github.com/websockets/ws/issues/1202 // // Intentionally global; it's the global thread pool that's an issue. // let zlibLimiter; /** * permessage-deflate implementation. */ class PerMessageDeflate { /** * Creates a PerMessageDeflate instance. * * @param {Object} options Configuration options * @param {Boolean} options.serverNoContextTakeover Request/accept disabling * of server context takeover * @param {Boolean} options.clientNoContextTakeover Advertise/acknowledge * disabling of client context takeover * @param {(Boolean|Number)} options.serverMaxWindowBits Request/confirm the * use of a custom server window size * @param {(Boolean|Number)} options.clientMaxWindowBits Advertise support * for, or request, a custom client window size * @param {Number} options.level The value of zlib's `level` param * @param {Number} options.memLevel The value of zlib's `memLevel` param * @param {Number} options.threshold Size (in bytes) below which messages * should not be compressed * @param {Number} options.concurrencyLimit The number of concurrent calls to * zlib * @param {Boolean} isServer Create the instance in either server or client * mode * @param {Number} maxPayload The maximum allowed message length */ constructor (options, isServer, maxPayload) { this._maxPayload = maxPayload | 0; this._options = options || {}; this._threshold = this._options.threshold !== undefined ? this._options.threshold : 1024; this._isServer = !!isServer; this._deflate = null; this._inflate = null; this.params = null; if (!zlibLimiter) { const concurrency = this._options.concurrencyLimit !== undefined ? this._options.concurrencyLimit : 10; zlibLimiter = new Limiter({ concurrency }); } } /** * @type {String} */ static get extensionName () { return 'permessage-deflate'; } /** * Create extension parameters offer. * * @return {Object} Extension parameters * @public */ offer () { const params = {}; if (this._options.serverNoContextTakeover) { params.server_no_context_takeover = true; } if (this._options.clientNoContextTakeover) { params.client_no_context_takeover = true; } if (this._options.serverMaxWindowBits) { params.server_max_window_bits = this._options.serverMaxWindowBits; } if (this._options.clientMaxWindowBits) { params.client_max_window_bits = this._options.clientMaxWindowBits; } else if (this._options.clientMaxWindowBits == null) { params.client_max_window_bits = true; } return params; } /** * Accept extension offer. * * @param {Array} paramsList Extension parameters * @return {Object} Accepted configuration * @public */ accept (paramsList) { paramsList = this.normalizeParams(paramsList); var params; if (this._isServer) { params = this.acceptAsServer(paramsList); } else { params = this.acceptAsClient(paramsList); } this.params = params; return params; } /** * Releases all resources used by the extension. * * @public */ cleanup () { if (this._inflate) { if (this._inflate[kWriteInProgress]) { this._inflate[kPendingClose] = true; } else { this._inflate.close(); this._inflate = null; } } if (this._deflate) { if (this._deflate[kWriteInProgress]) { this._deflate[kPendingClose] = true; } else { this._deflate.close(); this._deflate = null; } } } /** * Accept extension offer from client. * * @param {Array} paramsList Extension parameters * @return {Object} Accepted configuration * @private */ acceptAsServer (paramsList) { const accepted = {}; const result = paramsList.some((params) => { if ( (this._options.serverNoContextTakeover === false && params.server_no_context_takeover) || (this._options.serverMaxWindowBits === false && params.server_max_window_bits) || (typeof this._options.serverMaxWindowBits === 'number' && typeof params.server_max_window_bits === 'number' && this._options.serverMaxWindowBits > params.server_max_window_bits) || (typeof this._options.clientMaxWindowBits === 'number' && !params.client_max_window_bits) ) { return; } if ( this._options.serverNoContextTakeover || params.server_no_context_takeover ) { accepted.server_no_context_takeover = true; } if ( this._options.clientNoContextTakeover || (this._options.clientNoContextTakeover !== false && params.client_no_context_takeover) ) { accepted.client_no_context_takeover = true; } if (typeof this._options.serverMaxWindowBits === 'number') { accepted.server_max_window_bits = this._options.serverMaxWindowBits; } else if (typeof params.server_max_window_bits === 'number') { accepted.server_max_window_bits = params.server_max_window_bits; } if (typeof this._options.clientMaxWindowBits === 'number') { accepted.client_max_window_bits = this._options.clientMaxWindowBits; } else if ( this._options.clientMaxWindowBits !== false && typeof params.client_max_window_bits === 'number' ) { accepted.client_max_window_bits = params.client_max_window_bits; } return true; }); if (!result) throw new Error("Doesn't support the offered configuration"); return accepted; } /** * Accept extension response from server. * * @param {Array} paramsList Extension parameters * @return {Object} Accepted configuration * @private */ acceptAsClient (paramsList) { const params = paramsList[0]; if ( this._options.clientNoContextTakeover === false && params.client_no_context_takeover ) { throw new Error('Invalid value for "client_no_context_takeover"'); } if ( (typeof this._options.clientMaxWindowBits === 'number' && (!params.client_max_window_bits || params.client_max_window_bits > this._options.clientMaxWindowBits)) || (this._options.clientMaxWindowBits === false && params.client_max_window_bits) ) { throw new Error('Invalid value for "client_max_window_bits"'); } return params; } /** * Normalize extensions parameters. * * @param {Array} paramsList Extension parameters * @return {Array} Normalized extensions parameters * @private */ normalizeParams (paramsList) { return paramsList.map((params) => { Object.keys(params).forEach((key) => { var value = params[key]; if (value.length > 1) { throw new Error(`Multiple extension parameters for ${key}`); } value = value[0]; switch (key) { case 'server_no_context_takeover': case 'client_no_context_takeover': if (value !== true) { throw new Error(`invalid extension parameter value for ${key} (${value})`); } params[key] = true; break; case 'server_max_window_bits': case 'client_max_window_bits': if (typeof value === 'string') { value = parseInt(value, 10); if ( Number.isNaN(value) || value < zlib.Z_MIN_WINDOWBITS || value > zlib.Z_MAX_WINDOWBITS ) { throw new Error(`invalid extension parameter value for ${key} (${value})`); } } if (!this._isServer && value === true) { throw new Error(`Missing extension parameter value for ${key}`); } params[key] = value; break; default: throw new Error(`Not defined extension parameter (${key})`); } }); return params; }); } /** * Decompress data. Concurrency limited by async-limiter. * * @param {Buffer} data Compressed data * @param {Boolean} fin Specifies whether or not this is the last fragment * @param {Function} callback Callback * @public */ decompress (data, fin, callback) { zlibLimiter.push((done) => { this._decompress(data, fin, (err, result) => { done(); callback(err, result); }); }); } /** * Compress data. Concurrency limited by async-limiter. * * @param {Buffer} data Data to compress * @param {Boolean} fin Specifies whether or not this is the last fragment * @param {Function} callback Callback * @public */ compress (data, fin, callback) { zlibLimiter.push((done) => { this._compress(data, fin, (err, result) => { done(); callback(err, result); }); }); } /** * Decompress data. * * @param {Buffer} data Compressed data * @param {Boolean} fin Specifies whether or not this is the last fragment * @param {Function} callback Callback * @private */ _decompress (data, fin, callback) { const endpoint = this._isServer ? 'client' : 'server'; if (!this._inflate) { const key = `${endpoint}_max_window_bits`; const windowBits = typeof this.params[key] !== 'number' ? zlib.Z_DEFAULT_WINDOWBITS : this.params[key]; this._inflate = zlib.createInflateRaw({ windowBits }); this._inflate[kTotalLength] = 0; this._inflate[kBuffers] = []; this._inflate[kOwner] = this; this._inflate.on('error', inflateOnError); this._inflate.on('data', inflateOnData); } this._inflate[kCallback] = callback; this._inflate[kWriteInProgress] = true; this._inflate.write(data); if (fin) this._inflate.write(TRAILER); this._inflate.flush(() => { const err = this._inflate[kError]; if (err) { this._inflate.close(); this._inflate = null; callback(err); return; } const data = bufferUtil.concat( this._inflate[kBuffers], this._inflate[kTotalLength] ); if ( (fin && this.params[`${endpoint}_no_context_takeover`]) || this._inflate[kPendingClose] ) { this._inflate.close(); this._inflate = null; } else { this._inflate[kWriteInProgress] = false; this._inflate[kTotalLength] = 0; this._inflate[kBuffers] = []; } callback(null, data); }); } /** * Compress data. * * @param {Buffer} data Data to compress * @param {Boolean} fin Specifies whether or not this is the last fragment * @param {Function} callback Callback * @private */ _compress (data, fin, callback) { if (!data || data.length === 0) { process.nextTick(callback, null, EMPTY_BLOCK); return; } const endpoint = this._isServer ? 'server' : 'client'; if (!this._deflate) { const key = `${endpoint}_max_window_bits`; const windowBits = typeof this.params[key] !== 'number' ? zlib.Z_DEFAULT_WINDOWBITS : this.params[key]; this._deflate = zlib.createDeflateRaw({ memLevel: this._options.memLevel, level: this._options.level, flush: zlib.Z_SYNC_FLUSH, windowBits }); this._deflate[kTotalLength] = 0; this._deflate[kBuffers] = []; // // `zlib.DeflateRaw` emits an `'error'` event only when an attempt to use // it is made after it has already been closed. This cannot happen here, // so we only add a listener for the `'data'` event. // this._deflate.on('data', deflateOnData); } this._deflate[kWriteInProgress] = true; this._deflate.write(data); this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { var data = bufferUtil.concat( this._deflate[kBuffers], this._deflate[kTotalLength] ); if (fin) data = data.slice(0, data.length - 4); if ( (fin && this.params[`${endpoint}_no_context_takeover`]) || this._deflate[kPendingClose] ) { this._deflate.close(); this._deflate = null; } else { this._deflate[kWriteInProgress] = false; this._deflate[kTotalLength] = 0; this._deflate[kBuffers] = []; } callback(null, data); }); } } module.exports = PerMessageDeflate; /** * The listener of the `zlib.DeflateRaw` stream `'data'` event. * * @param {Buffer} chunk A chunk of data * @private */ function deflateOnData (chunk) { this[kBuffers].push(chunk); this[kTotalLength] += chunk.length; } /** * The listener of the `zlib.InflateRaw` stream `'data'` event. * * @param {Buffer} chunk A chunk of data * @private */ function inflateOnData (chunk) { this[kTotalLength] += chunk.length; if ( this[kOwner]._maxPayload < 1 || this[kTotalLength] <= this[kOwner]._maxPayload ) { this[kBuffers].push(chunk); return; } this[kError] = new Error('max payload size exceeded'); this[kError].closeCode = 1009; this.removeListener('data', inflateOnData); this.reset(); } /** * The listener of the `zlib.InflateRaw` stream `'error'` event. * * @param {Error} err The emitted error * @private */ function inflateOnError (err) { // // There is no need to call `Zlib#close()` as the handle is automatically // closed when an error is emitted. // this[kOwner]._inflate = null; this[kCallback](err); }