123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477 |
- // Copyright Joyent, Inc. and other Node contributors.
- //
- // Permission is hereby granted, free of charge, to any person obtaining a
- // copy of this software and associated documentation files (the
- // "Software"), to deal in the Software without restriction, including
- // without limitation the rights to use, copy, modify, merge, publish,
- // distribute, sublicense, and/or sell copies of the Software, and to permit
- // persons to whom the Software is furnished to do so, subject to the
- // following conditions:
- //
- // The above copyright notice and this permission notice shall be included
- // in all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
- // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
- // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
- // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
- // USE OR OTHER DEALINGS IN THE SOFTWARE.
- // A bit simpler than readable streams.
- // Implement an async ._write(chunk, cb), and it'll handle all
- // the drain event emission and buffering.
- module.exports = Writable;
- /*<replacement>*/
- var Buffer = require('buffer').Buffer;
- /*</replacement>*/
- Writable.WritableState = WritableState;
- /*<replacement>*/
- var util = require('core-util-is');
- util.inherits = require('inherits');
- /*</replacement>*/
- var Stream = require('stream');
- util.inherits(Writable, Stream);
- function WriteReq(chunk, encoding, cb) {
- this.chunk = chunk;
- this.encoding = encoding;
- this.callback = cb;
- }
- function WritableState(options, stream) {
- var Duplex = require('./_stream_duplex');
- options = options || {};
- // the point at which write() starts returning false
- // Note: 0 is a valid value, means that we always return false if
- // the entire buffer is not flushed immediately on write()
- var hwm = options.highWaterMark;
- var defaultHwm = options.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
- // object stream flag to indicate whether or not this stream
- // contains buffers or objects.
- this.objectMode = !!options.objectMode;
- if (stream instanceof Duplex)
- this.objectMode = this.objectMode || !!options.writableObjectMode;
- // cast to ints.
- this.highWaterMark = ~~this.highWaterMark;
- this.needDrain = false;
- // at the start of calling end()
- this.ending = false;
- // when end() has been called, and returned
- this.ended = false;
- // when 'finish' is emitted
- this.finished = false;
- // should we decode strings into buffers before passing to _write?
- // this is here so that some node-core streams can optimize string
- // handling at a lower level.
- var noDecode = options.decodeStrings === false;
- this.decodeStrings = !noDecode;
- // Crypto is kind of old and crusty. Historically, its default string
- // encoding is 'binary' so we have to make this configurable.
- // Everything else in the universe uses 'utf8', though.
- this.defaultEncoding = options.defaultEncoding || 'utf8';
- // not an actual buffer we keep track of, but a measurement
- // of how much we're waiting to get pushed to some underlying
- // socket or file.
- this.length = 0;
- // a flag to see when we're in the middle of a write.
- this.writing = false;
- // when true all writes will be buffered until .uncork() call
- this.corked = 0;
- // a flag to be able to tell if the onwrite cb is called immediately,
- // or on a later tick. We set this to true at first, because any
- // actions that shouldn't happen until "later" should generally also
- // not happen before the first write call.
- this.sync = true;
- // a flag to know if we're processing previously buffered items, which
- // may call the _write() callback in the same tick, so that we don't
- // end up in an overlapped onwrite situation.
- this.bufferProcessing = false;
- // the callback that's passed to _write(chunk,cb)
- this.onwrite = function(er) {
- onwrite(stream, er);
- };
- // the callback that the user supplies to write(chunk,encoding,cb)
- this.writecb = null;
- // the amount that is being written when _write is called.
- this.writelen = 0;
- this.buffer = [];
- // number of pending user-supplied write callbacks
- // this must be 0 before 'finish' can be emitted
- this.pendingcb = 0;
- // emit prefinish if the only thing we're waiting for is _write cbs
- // This is relevant for synchronous Transform streams
- this.prefinished = false;
- // True if the error was already emitted and should not be thrown again
- this.errorEmitted = false;
- }
- function Writable(options) {
- var Duplex = require('./_stream_duplex');
- // Writable ctor is applied to Duplexes, though they're not
- // instanceof Writable, they're instanceof Readable.
- if (!(this instanceof Writable) && !(this instanceof Duplex))
- return new Writable(options);
- this._writableState = new WritableState(options, this);
- // legacy.
- this.writable = true;
- Stream.call(this);
- }
- // Otherwise people can pipe Writable streams, which is just wrong.
- Writable.prototype.pipe = function() {
- this.emit('error', new Error('Cannot pipe. Not readable.'));
- };
- function writeAfterEnd(stream, state, cb) {
- var er = new Error('write after end');
- // TODO: defer error events consistently everywhere, not just the cb
- stream.emit('error', er);
- process.nextTick(function() {
- cb(er);
- });
- }
- // If we get something that is not a buffer, string, null, or undefined,
- // and we're not in objectMode, then that's an error.
- // Otherwise stream chunks are all considered to be of length=1, and the
- // watermarks determine how many objects to keep in the buffer, rather than
- // how many bytes or characters.
- function validChunk(stream, state, chunk, cb) {
- var valid = true;
- if (!util.isBuffer(chunk) &&
- !util.isString(chunk) &&
- !util.isNullOrUndefined(chunk) &&
- !state.objectMode) {
- var er = new TypeError('Invalid non-string/buffer chunk');
- stream.emit('error', er);
- process.nextTick(function() {
- cb(er);
- });
- valid = false;
- }
- return valid;
- }
- Writable.prototype.write = function(chunk, encoding, cb) {
- var state = this._writableState;
- var ret = false;
- if (util.isFunction(encoding)) {
- cb = encoding;
- encoding = null;
- }
- if (util.isBuffer(chunk))
- encoding = 'buffer';
- else if (!encoding)
- encoding = state.defaultEncoding;
- if (!util.isFunction(cb))
- cb = function() {};
- if (state.ended)
- writeAfterEnd(this, state, cb);
- else if (validChunk(this, state, chunk, cb)) {
- state.pendingcb++;
- ret = writeOrBuffer(this, state, chunk, encoding, cb);
- }
- return ret;
- };
- Writable.prototype.cork = function() {
- var state = this._writableState;
- state.corked++;
- };
- Writable.prototype.uncork = function() {
- var state = this._writableState;
- if (state.corked) {
- state.corked--;
- if (!state.writing &&
- !state.corked &&
- !state.finished &&
- !state.bufferProcessing &&
- state.buffer.length)
- clearBuffer(this, state);
- }
- };
- function decodeChunk(state, chunk, encoding) {
- if (!state.objectMode &&
- state.decodeStrings !== false &&
- util.isString(chunk)) {
- chunk = new Buffer(chunk, encoding);
- }
- return chunk;
- }
- // if we're already writing something, then just put this
- // in the queue, and wait our turn. Otherwise, call _write
- // If we return false, then we need a drain event, so set that flag.
- function writeOrBuffer(stream, state, chunk, encoding, cb) {
- chunk = decodeChunk(state, chunk, encoding);
- if (util.isBuffer(chunk))
- encoding = 'buffer';
- var len = state.objectMode ? 1 : chunk.length;
- state.length += len;
- var ret = state.length < state.highWaterMark;
- // we must ensure that previous needDrain will not be reset to false.
- if (!ret)
- state.needDrain = true;
- if (state.writing || state.corked)
- state.buffer.push(new WriteReq(chunk, encoding, cb));
- else
- doWrite(stream, state, false, len, chunk, encoding, cb);
- return ret;
- }
- function doWrite(stream, state, writev, len, chunk, encoding, cb) {
- state.writelen = len;
- state.writecb = cb;
- state.writing = true;
- state.sync = true;
- if (writev)
- stream._writev(chunk, state.onwrite);
- else
- stream._write(chunk, encoding, state.onwrite);
- state.sync = false;
- }
- function onwriteError(stream, state, sync, er, cb) {
- if (sync)
- process.nextTick(function() {
- state.pendingcb--;
- cb(er);
- });
- else {
- state.pendingcb--;
- cb(er);
- }
- stream._writableState.errorEmitted = true;
- stream.emit('error', er);
- }
- function onwriteStateUpdate(state) {
- state.writing = false;
- state.writecb = null;
- state.length -= state.writelen;
- state.writelen = 0;
- }
- function onwrite(stream, er) {
- var state = stream._writableState;
- var sync = state.sync;
- var cb = state.writecb;
- onwriteStateUpdate(state);
- if (er)
- onwriteError(stream, state, sync, er, cb);
- else {
- // Check if we're actually ready to finish, but don't emit yet
- var finished = needFinish(stream, state);
- if (!finished &&
- !state.corked &&
- !state.bufferProcessing &&
- state.buffer.length) {
- clearBuffer(stream, state);
- }
- if (sync) {
- process.nextTick(function() {
- afterWrite(stream, state, finished, cb);
- });
- } else {
- afterWrite(stream, state, finished, cb);
- }
- }
- }
- function afterWrite(stream, state, finished, cb) {
- if (!finished)
- onwriteDrain(stream, state);
- state.pendingcb--;
- cb();
- finishMaybe(stream, state);
- }
- // Must force callback to be called on nextTick, so that we don't
- // emit 'drain' before the write() consumer gets the 'false' return
- // value, and has a chance to attach a 'drain' listener.
- function onwriteDrain(stream, state) {
- if (state.length === 0 && state.needDrain) {
- state.needDrain = false;
- stream.emit('drain');
- }
- }
- // if there's something in the buffer waiting, then process it
- function clearBuffer(stream, state) {
- state.bufferProcessing = true;
- if (stream._writev && state.buffer.length > 1) {
- // Fast case, write everything using _writev()
- var cbs = [];
- for (var c = 0; c < state.buffer.length; c++)
- cbs.push(state.buffer[c].callback);
- // count the one we are adding, as well.
- // TODO(isaacs) clean this up
- state.pendingcb++;
- doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
- for (var i = 0; i < cbs.length; i++) {
- state.pendingcb--;
- cbs[i](err);
- }
- });
- // Clear buffer
- state.buffer = [];
- } else {
- // Slow case, write chunks one-by-one
- for (var c = 0; c < state.buffer.length; c++) {
- var entry = state.buffer[c];
- var chunk = entry.chunk;
- var encoding = entry.encoding;
- var cb = entry.callback;
- var len = state.objectMode ? 1 : chunk.length;
- doWrite(stream, state, false, len, chunk, encoding, cb);
- // if we didn't call the onwrite immediately, then
- // it means that we need to wait until it does.
- // also, that means that the chunk and cb are currently
- // being processed, so move the buffer counter past them.
- if (state.writing) {
- c++;
- break;
- }
- }
- if (c < state.buffer.length)
- state.buffer = state.buffer.slice(c);
- else
- state.buffer.length = 0;
- }
- state.bufferProcessing = false;
- }
- Writable.prototype._write = function(chunk, encoding, cb) {
- cb(new Error('not implemented'));
- };
- Writable.prototype._writev = null;
- Writable.prototype.end = function(chunk, encoding, cb) {
- var state = this._writableState;
- if (util.isFunction(chunk)) {
- cb = chunk;
- chunk = null;
- encoding = null;
- } else if (util.isFunction(encoding)) {
- cb = encoding;
- encoding = null;
- }
- if (!util.isNullOrUndefined(chunk))
- this.write(chunk, encoding);
- // .end() fully uncorks
- if (state.corked) {
- state.corked = 1;
- this.uncork();
- }
- // ignore unnecessary end() calls.
- if (!state.ending && !state.finished)
- endWritable(this, state, cb);
- };
- function needFinish(stream, state) {
- return (state.ending &&
- state.length === 0 &&
- !state.finished &&
- !state.writing);
- }
- function prefinish(stream, state) {
- if (!state.prefinished) {
- state.prefinished = true;
- stream.emit('prefinish');
- }
- }
- function finishMaybe(stream, state) {
- var need = needFinish(stream, state);
- if (need) {
- if (state.pendingcb === 0) {
- prefinish(stream, state);
- state.finished = true;
- stream.emit('finish');
- } else
- prefinish(stream, state);
- }
- return need;
- }
- function endWritable(stream, state, cb) {
- state.ending = true;
- finishMaybe(stream, state);
- if (cb) {
- if (state.finished)
- process.nextTick(cb);
- else
- stream.once('finish', cb);
- }
- state.ended = true;
- }
|