Node. Asynchronous iterators can be used in these scenarios in JS

The previous section explained the use of iterators. If you don't know enough about iterators, you can review them Easy to master iterators in ES6 from understanding to implementation At present, there is no built-in object with [symbol. Asyncinterator] attribute set by default in JavaScript, but in node Some core modules (Stream and Events) and some third-party NPM modules (mongodb) in JS already support symbol Asyncinterator property. This article also explores the application of asynchronous iterators in node What are the usage scenarios in JS? Welcome to leave a message for discussion.

catalogue

  • Using asyncinterator in Events
    • events.on() example 1
    • events.on() example 2
    • events.on() starts a node JS server
    • Resolve node JS source code for events Implementation of on asynchronous iterator
  • Using asyncinterator in Stream
    • Asynchronous iterator and Readable
    • From node JS source code to see how readable is implemented asyncinterator
    • Asynchronous iterator and Writeable
  • Using asyncinterator in MongoDB
    • cursor in MongoDB
    • Source code analysis of MongoDB asynchronous iterator implementation
    • Using for await Of traverses the iteratable object cursor
    • Transfer cursor to writable stream
  • timers/promises support
    • setInterval several function points
    • setInterval API introduction
    • Two sample demonstrations

Using asyncinterator in Events

Node.js v12.16.0 added events The on (emitter, eventName) method returns an asynchronous iterator that iterates over the eventName event.

events.on() example 1

As shown in the following example, for await The of loop will only output Hello. When the error event is triggered, it will be caught by try catch.

const { on, EventEmitter } = require('events');

(async () => {
  const ee = new EventEmitter();
  const ite = on(ee, 'foo');

  process.nextTick(() => {
    ee.emit('foo', 'Hello');
    ee.emit('error', new Error('unknown mistake.'))
    ee.emit('foo', 'Node.js');
  });

  try {
    for await (const event of ite) {
      console.log(event); // prints ['Hello']
    }
  } catch (err) {
    console.log(err.message); // unknown mistake.
  }
})();

In the above example, if the EventEmitter object instance ee triggers the error event, the error message will be thrown and the loop will exit, and all event listeners registered by the instance will be removed.

events.on() example 2

for await... The execution of the of internal block is synchronous and can only process one event at a time, even if you have events that will be executed immediately. If concurrent execution is required, it is not recommended. For this reason, events will be resolved below Give the answer when using the on () source code.

As shown below, although the events are triggered twice at the same time in sequence, the delay of 2s is simulated in the internal block, and the processing of the next event will also be delayed.

const ite = on(ee, 'foo');

process.nextTick(() => {
  ee.emit('foo', 'Hello');
  ee.emit('foo', 'Node.js');
  // ite.return(); //  After calling, you can end for await Traversal of
  // ite.throw() / / the iterator object throws an error
});

try {
  for await (const event of ite) {
    console.log(event); // prints ['Hello'] ['Node.js']
    await sleep(2000);
  }
} catch (err) {
  console.log(err.message);
}

// Unreachable here
console.log('This will not be executed');

The last sentence of the code in the above example will not be executed. At this time, the iterator will always be traversing. Although the above two events emit are triggered, the iterator does not terminate. When will it terminate? That is, the iterator will terminate only when there are some internal errors or we manually call the return() or throw() method of the iteratable object.

events.on() starts a node JS server

Previous article "Hello Node.js" is written in a way you haven't seen this time " I wrote a paragraph using events On() starts the code of an HTTP server. In the message, a small partner raised doubts about this. Based on this chapter, the asynchronous iterator is in events The learning used in on () can be well explained.

The relevant codes are as follows:

import { createServer as server } from 'http';
import { on } from 'events';
const ee = on(server().listen(3000), 'request');
for await (const [{ url }, res] of ee)
  if (url === '/hello')
    res.end('Hello Node.js!');
  else
    res.end('OK!');

The above code seems novel, and its core implementation is to use events On() returns the asynchronous iterative object of the createServer() object request event, and then uses for await For statement traversal, each request of the client is equivalent to one EE emit('request', Req, Res).

Because the execution of internal blocks is synchronous, the next event processing can be executed only after the last event is completed. For an HTTP server, concurrency needs to be considered. Please do not use the above method!

Resolve node JS source code for events Implementation of on asynchronous iterator

The events module directly exports the on () method, which mainly combines the asynchronous iterator with the instance object of the EventEmitter class of the event. The implementation is very clever. The following explains the core source code. After understanding it, you can also implement an events by yourself on().

  • Line {1} ObjectSetPrototypeOf sets a new prototype for the object, which contains three methods: next(), return(), throw().
  • Line {2} according to the asynchronous iterative protocol, the iterative object must contain a symbol Asynciterator property, which is a function without parameters and returns the iteratable object itself, that is, SymbolAsyncIterator in the following code.
  • The new prototype of line {3} is the second parameter AsyncIteratorPrototype of ObjectSetPrototypeOf.
  • Line {4} eventTargetAgnosticAddListener is to register the listener for the event, which still uses the emitter () method of the event trigger object on(name, listener) .
  • Line {5} addErrorHandlerIfEventEmitter judges that if the event name is not equal to 'error', it also registers a listener for the error event and implements the peer {4}.
  • Line {6} eventHandler() function is the listener function registered above. Listener executes the listener function when an event is triggered. The combination with asynchronous iterator is here. When a new event is triggered, the first element will be taken from the unconsumedPromises array for execution, If you implement the standard iterator (create result() method), you will find the value returned by the standard iterator (create result() method).
  • Let's continue to see where unconsumedPromises come from.
module.exports = EventEmitter;
module.exports.on = on;

function on(emitter, event) {
  const unconsumedEvents = [];
  const unconsumedPromises = [];
  const iterator = ObjectSetPrototypeOf({ // {1}
    next() { .... },
    return() { ... },
    throw(err) { ... },
    [SymbolAsyncIterator]() { // {2}
      return this;
    }
  }, AsyncIteratorPrototype); // {3}
  eventTargetAgnosticAddListener(emitter, event, eventHandler); // {4}
  if (event !== 'error') {
    addErrorHandlerIfEventEmitter(emitter, errorHandler); // {5}
  }
  return iterator;
              
  function eventHandler(...args) { // {6}
    const promise =  .shift();
    if (promise) {
      // The following is equivalent to promise resolve({ value: args, done: false });
      PromiseResolve(createIterResult(args, false));
    } else {
      // for await... The execution of the internal blocks of the of traverser is synchronous, so only one event can be processed at a time. If multiple events are triggered at the same time, the last event is not completed, and the remaining events will be saved to unconsumedEvents. After the last event is completed, the traverser will automatically call the next() method of the iterator object to consume all unprocessed events.
      unconsumedEvents.push(args);
    }
  }
}

function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
  ...
  emitter.on(name, listener);
}

The following is the next() method implementation of the iterator object:

  • Line {1} consumes unread messages first
  • Line {2} determines that if an error occurs, an error message will be thrown. For example, after the throw() method of the iterator object is called, the error will be assigned. The code will be executed when the next iterator calls next().
  • Line {3} if the iterator object is completed and the done attribute of the returned Promise object is set to true, the traverser ends. The variable finished is set after the return() method of the iterator object is called.
  • Line {4} This is the data source of unconsumedPromises mentioned above. For example, when we execute for await When the of statement traverses the asynchronous iterator object, it will automatically trigger the next() method of the iterator object. When it is executed to line {4}, a Promise object will be created. However, resolve is not executed immediately, but is first stored in the unconsumedPromises array, so it is in the above #events The on () example 2# mentions a problem, * * for await Of * * * * when traversing the asynchronous iterator object of an event, the subsequent code block will not be executed. * * when we trigger an event, we will execute the resolve function in the listener function, which will be released at this time, and then for await The of iterator will automatically execute the next() method again, and then a new Promise loop will be repeated until the event object throws the error event or executes the return() method of the iterator object.
const iterator = ObjectSetPrototypeOf({
  next() {
    // {1} First, we consume all unread messages
    const value = unconsumedEvents.shift();
    if (value) {
      return PromiseResolve(createIterResult(value, false));
    }

    // {2} If an error occurs, promise will be executed Reject throws an error, and event listening will be stopped after this error occurs.
    if (error) {
      const p = PromiseReject(error);
      // Only the first element errors
      error = null;
      return p;
    }

    // {3} If the iterator object is complete, promise Resolve done set to true
    if (finished) {
      return PromiseResolve(createIterResult(undefined, true));
    }

    // {4} Wait until an event occurs
    return new Promise(function(resolve, reject) {
      unconsumedPromises.push({ resolve, reject });
    });
  }
  ...
}

Using asyncinterator in Stream

Node. The readable stream object of JS stream module is in v10 The [symbol. Asyncinterator] attribute is tentatively supported in version 0.0. You can use for await The of statement traverses the readable stream object in V11 Versions above 14.0 are supported by LTS.

Asynchronous iterator and Readable

Create a readable stream object readable with the help of fs module.

const fs = require('fs');
const readable = fs.createReadStream('./hello.txt', {
  encoding: 'utf-8',
  highWaterMark: 1
});

In the past, when reading a file, we need to listen to the data event, splice the data, and judge the completion in the end event, as shown below:

function readText(readable) {
  let data = '';
  return new Promise((resolve, reject) => {
    readable.on('data', chunk => {
      data += chunk;
    })
    readable.on('end', () => {
      resolve(data);
    });
    readable.on('error', err => {
      reject(err);
    });
  })
}

Now it can be implemented in a simpler way through asynchronous iterators, as follows:

async function readText(readable) {
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  return data;
}

Now we can call readText to test.

(async () => {
  try {
    const res = await readText(readable);
    console.log(res); // Hello Node.js
  } catch (err) {
    console.log(err.message);
  }
})();

Use * * for await The of * * statement traverses the readable. If the loop terminates due to a break or throw error, the Stream will also be destroyed.

In the above example, the value received by chunk each time is determined according to the property of highWaterMark when creating a readable stream. In order to see the effect clearly, we specified the property of highWaterMark as 1 when creating a readable object, and only one character will be read at a time.

From node JS source code to see how readable is implemented asyncinterator

Iterator traversal statement synchronized with for Of similar to for await for asynciiterator asynchronous iterator traversal The of statement will call the symbol of the iteratable object readable by default inside the loop The asyncIterator () method gets an asynchronous iterator object and then calls the next() method of the iterator object to get the result.

This paper takes node JS source code V14 Take x as an example to see how to implement the source code. When we call FS When createreadstream() creates a readable stream object, the corresponding method will call the ReadStream constructor internally

// https://github.com/nodejs/node/blob/v14.x/lib/fs.js#L2001
function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options);
}

In fact, there is nothing we want to find in the ReadStream constructor. The point is that it inherits the Readable constructor of the Stream module through prototype.

function ReadStream(path, options) {
  ...
  Readable.call(this, options);
}

Now let's focus on the implementation of the constructor of Readable.

The SymbolAsyncIterator property is defined on the Readable prototype, which returns an iterator object created by the generator function.

// for await... The of loop will call
Readable.prototype[SymbolAsyncIterator] = function() {
  let stream = this;
  ...
  const iter = createAsyncIterator(stream);
  iter.stream = stream;
  return iter;
};

// Declare a generator function that creates an asynchronous iterator object
async function* createAsyncIterator(stream) {
  let callback = nop;

  function next(resolve) {
    if (this === stream) {
      callback();
      callback = nop;
    } else {
      callback = resolve;
    }
  }

  const state = stream._readableState;

  let error = state.errored;
  let errorEmitted = state.errorEmitted;
  let endEmitted = state.endEmitted;
  let closeEmitted = state.closeEmitted;
 
  // The error, end and close events control when to end the iterator traversal.
  stream
    .on('readable', next)
    .on('error', function(err) {
      error = err;
      errorEmitted = true;
      next.call(this);
    })
    .on('end', function() {
      endEmitted = true;
      next.call(this);
    })
    .on('close', function() {
      closeEmitted = true;
      next.call(this);
    });

  try {
    while (true) {
      // stream.read() pulls data from the internal buffer and returns it. If there is no readable data, null is returned
      // After the destroy() method of readable is called, readable Destroyed is true, and readable is the following stream object
      const chunk = stream.destroyed ? null : stream.read();
      if (chunk !== null) {
        yield chunk; // Here is the key. According to the definition of iterator protocol, the iterator object should return a next() method, and use yield to return the value every time
      } else if (errorEmitted) {
        throw error;
      } else if (endEmitted) {
        break;
      } else if (closeEmitted) {
        break;
      } else {
        await new Promise(next);
      }
    }
  } catch (err) {
    destroyImpl.destroyer(stream, err);
    throw err;
  } finally {
    if (state.autoDestroy || !endEmitted) {
      // TODO(ronag): ERR_PREMATURE_CLOSE?
      destroyImpl.destroyer(stream, null);
    }
  }
}

In addition to traversing the source code of the asynchronous generator readaid, you can see that the iterator for readaid can be used to traverse the source code In addition to the of traversal, you can also directly use the next() method calling the generator function.

const ret = readable[Symbol.asyncIterator]()
console.log(await ret.next()); // { value: 'H', done: false }
console.log(await ret.next()); // { value: 'e', done: false }

Asynchronous iterator and Writeable

Through the above explanation, we know how to traverse the asynchronous iterator to obtain data from the readable object, but have you ever thought about how to transfer an asynchronous iterator object to the writable stream? That's what we're talking about here.

Create a readable stream from an iterator

Node.js stream object provides a practical method stream Readable. From(), for symbol Asyncinterator or symbol The iteratable object of the iterator protocol will first create a readable stream object, readable, and then build the node from the iterator JS readable stream.

Here is From understanding to implementation, it is easy to master iterators in ES6 In the example explained in the article, r1 is the iteratable object we created. Use stream Readable. The from () method can construct the iteratable object as a readable stream object, readable.

function Range(start, end) {
  this.id = start;
  this.end = end;
}
Range.prototype[Symbol.asyncIterator] = async function* () {
  while (this.id <= this.end) {
    yield this.id++;
  }
}
const r1 = new Range(0, 3);
const readable = stream.Readable.from(r1);
readable.on('data', chunk => {
  console.log(chunk); // 0 1 2 3
});

Transfer asynchronous iterator to writable stream

Using pipeline, a series of streams and generator functions can be transmitted together through the pipeline and notified when the pipeline is completed.

Using util Promise converts pipeline into promise form.

const util = require('util');
const pipeline = util.promisify(stream.pipeline); // Convert to promise form

(async () => {
  try {
    const readable = stream.Readable.from(r1);
    const writeable = fs.createWriteStream('range.txt');
    await pipeline(
      readable,
      async function* (source) {
        for await (const chunk of source) {
          yield chunk.toString();
        }
      },
      writeable
    );
    console.log('Pipeline success');
  } catch (err) {
    console.log(err.message);
  }
})()

When writing data, the chunk passed in must be of String, Buffer and Uint8Array types, otherwise the writeable object will report an error when writing data. Since the final returned value type in our customized iteratable object r1 is Number, we need to make a conversion here. The generator function in the middle of the pipeline is to convert the value received each time into a String.

Using asyncinterator in MongoDB

In addition to the node we explained above In addition to several modules officially provided by JS, asynchronous iteration is also supported in MongoDB, but there is little information about this. MongoDB is realized through the concept of a cursor.

cursor in MongoDB

Local node JS driver mongodb module to introduce, when we call dB collection. The find () method returns a cursor. If we want to access the document, we need to iterate the cursor object to complete it, but usually we will directly use the toArray() method to complete it.

Let's look at an example. Now we have a database example and a collection books. There are two records in the table, as shown below:

image.png

Query all the data of the books collection. The myCursor variable defined in the following code is the cursor object, which will not iterate automatically. You can use the hasNext() method of the cursor object to detect whether there is another one. If so, you can use the next() method to access the data.

From the following log records, you can see that false is returned when calling hasNext() for the third time. If you call next() at this time, an error will be reported. The cursor has been closed, that is, there is no data to traverse.

const MongoClient = require('mongodb').MongoClient;
const dbConnectionUrl = 'mongodb://127.0.0.1:27017/example';

(async () => {
  const client = await MongoClient.connect(dbConnectionUrl, { useUnifiedTopology: true });
  const bookColl = client.db('example').collection('books');
  const myCursor = await bookColl.find();
 
  console.log(await myCursor.hasNext()); // true
  console.log((await myCursor.next()).name); // Explain node in simple terms js
  console.log(await myCursor.hasNext()); // true
  console.log((await myCursor.next()).name); // Node.js actual combat
  console.log(await myCursor.hasNext()); // false
  console.log((await myCursor.next()).name); // MongoError: Cursor is closed
})()

It can also be detected by directly calling next(). If there is still a value, this record will be returned; otherwise, the next() method will return null.

console.log((await myCursor.next()).name);
console.log((await myCursor.next()).name);
console.log((await myCursor.next()));

Source code analysis of MongoDB asynchronous iterator implementation

In MongoDB, the cursor determines whether it reaches the tail of the cursor by whether hasNext() returns false or next() returns null. The difference is that there should be a symbol in our JavaScript iterative protocol definition The iterator object of asyncinterator property, and the iterator object is in the form of {done, value}.

Fortunately, MongoDB node JS driver has helped us realize this function. See the implementation in MongoDB through a section of source code.

  • find method

The find method returns an iteratable cursor object.

// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/collection.js#L470

Collection.prototype.find = deprecateOptions(
  {
    name: 'collection.find',
    deprecatedOptions: DEPRECATED_FIND_OPTIONS,
    optionsIndex: 1
  },
  function(query, options, callback) {
    const cursor = this.s.topology.cursor(
      new FindOperation(this, this.s.namespace, findCommand, newOptions),
      newOptions
    );

    return cursor;
  }
);
  • CoreCursor

The core implementation is here. This is the core class of a cursor, mongodb node All cursors in the JS driver are based on this. If asynchronous iterators are currently supported, set symbol on the prototype of CoreCursor Asyncinterator property, which returns the asynchronous iterator object based on Promise implementation, which conforms to the standard definition of asynchronous iteratable objects in JavaScript.

// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/core/cursor.js#L610

if (SUPPORTS.ASYNC_ITERATOR) {
  CoreCursor.prototype[Symbol.asyncIterator] = require('../async/async_iterator').asyncIterator;
}
// https://github.com/mongodb/node-mongodb-native/blob/3.6/lib/async/async_iterator.js#L16

// async function* asyncIterator() {
//   while (true) {
//     const value = await this.next();
//     if (!value) {
//       await this.close();
//       return;
//     }

//     yield value;
//   }
// }

// TODO: change this to the async generator function above
function asyncIterator() {
  const cursor = this;

  return {
    next: function() {
      return Promise.resolve()
        .then(() => cursor.next())
        .then(value => {
          if (!value) {
            return cursor.close().then(() => ({ value, done: true }));
          }
          return { value, done: false };
        });
    }
  };
}

At present, it is implemented in the form of Promise by default. There are segments TODO and node in the above code JS driven asynchronous iterative implementation may be implemented based on generator function later, which is unchanged for our use

Using for await Of traverses the iteratable object cursor

Based on our above example, if you change to for await Of statement traversal is much simpler.

const myCursor = await bookColl.find();
for await (val of myCursor) {
  console.log(val.name);
}

The same is true for the aggregation pipeline in MongoDB, so there is no more analysis, as shown below:

const myCursor = await bookColl.aggregate();
for await (val of myCursor) {
  console.log(val.name);
}

When traversing a large data set, using a cursor will load the data in MongoDB in batches. We don't have to worry about storing all the data in the memory of the server at one time, resulting in excessive memory pressure.

Transfer cursor to writable stream

The MongoDB cursor object itself is also an iteratable object, which is combined with the readable From () can be converted into a readable stream object, which can be written to a file by stream.

However, it should be noted that each time the cursor in MongoDB returns a single document record, which is of Object type. If written directly, the writeable stream will report parameter type errors, because the writeable stream is a non Object mode by default (only String, Buffer and Unit8Array are accepted), so you can see that the generator function is used in the middle of pipeline transmission, The data block received each time is processed into a writable stream Buffer type.

const myCursor = await bookColl.find();
const readable = stream.Readable.from(myCursor);
await pipeline(
  readable,
  async function* (source) {
    for await (const chunk of source) {
      yield Buffer.from(JSON.stringify(chunk));
    }
  },
  fs.createWriteStream('books.txt')
);

timers/promises support

timers/promises provides the Promise version of the timer function. If it is used, the file header needs to be loaded first. Otherwise, the timer function in the form of callback is still used by default.

import { setInterval } from 'timers/promises';

setInterval several function points

Node. js v15. Version 9.0 adds setInterval based on asynchronous generator function in the timers module, which has the following function points:

  • Returns an asynchronous iterator object in ms, which can be managed by Promise
  • You can use for await Of iteration.
  • You can use the AbortController controller object to abort the timer function.

setInterval API introduction

  • The default waiting time is 1 ms between iterations.
  • Value: the value returned by the iterator.
  • options.ref: set to false to indicate that the planned timeout between iterations should not require node JS event loop remains active. The default value is true.
  • options.signal: optional parameter used to cancel the timer. This parameter is an instance property of the controller object AbortController.
setInterval(delay, value, { ref: false, signal });

Two examples

Example 1: for await Of iterative timer function

const ac = new AbortController();
const { signal } = ac;
const delay = 1000;
setTimeout(() => ac.abort(), 5100);
let i=0;
try {
  for await (const startTime of setInterval(delay, Date.now(), { ref: false, signal })) {
    console.log(Date.now(), i, startTime);
    i++;
  }
} catch (err) {
  // AbortError: The operation was aborted
  console.error(err);
}

Example 2: break statement interrupt because it is an iterator object based on the generator function, you can also use break statement to stop the timer after meeting certain conditions.

try {
  for await (const v of setInterval(delay)) {
    console.log(Date.now(), i);
    i++;
    if(i >= 2) {
      console.log(Date.now(), i, 'break');
      break;
    }
  }
} catch (err) {
  console.error(err);
}

Reference

  • https://nodejs.org/dist/latest-v14.x/docs/api/stream.html#stream_readable_symbol_asynciterator
  • https://nodejs.org/dist/latest-v14.x/docs/api/events.html#events_events_on_emitter_eventname
  • https://docs.mongodb.com/manual/tutorial/iterate-a-cursor/index.html

- END -

Added by peterj on Wed, 09 Feb 2022 06:40:44 +0200