To make "IPC" calls with JS more gracefully, I wrote the event invoke library

background

The team has a node recently JS new module needs to be developed, involving the management and communication of multiple processes. The simplified model can be understood as some methods that need to frequently call the worker process from the master process. A simple design and implementation is implemented event-invoke The library can be called simply and gracefully.

Node.js provides child_process module. In the master process, the worker process can be created and its object (CP for short) can be obtained through fork / spawn and other method calls. The parent-child process will establish an IPC channel. In the master process, you can use cp.send() to send IPC messages to the worker process, and in the worker process, you can also use process Send () sends IPC messages to the parent process to achieve the purpose of duplex communication. (process management involves more complex work, which will not be covered in this paper for the time being)

Minimum implementation

Based on the above premise, with the help of IPC channel and process object, we can realize the communication between processes in an event driven way. We only need a few lines of code to realize the basic calling logic, for example:

// master.js
const child_process = require('child_process');
const cp = child_process.fork('./worker.js');

function invoke() {
    cp.send({ name: 'methodA', args: [] });
  cp.on('message', (packet) => {
      console.log('result: %j', packet.payload);
  });
}

invoke();

// worker.js
const methodMap = {
  methodA() {}
}

cp.on('message', async (packet) => {
  const { name, args } = packet;
  const result = await methodMap[name)(...args);
  process.send({ name, payload: result });
});

After careful analysis of the above code implementation, we can intuitively feel that the invoke call is not elegant, and when the call amount is large, many message listeners will be created, and a lot of additional design needs to be done to ensure that the request and response are one-to-one correspondence. We hope to design a simple and ideal way. We only need to provide the invoke method, pass in the method name and parameters, return a Promise, and call IPC like calling a local method, without considering the details of message communication.

// Hypothetical IPC call
const res1 = await invoker.invoke('sleep', 1000);
console.log('sleep 1000ms:', res1);
const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
console.log('max(1, 2, 3):', res2);

process design

From the call model, roles can be abstracted as Invoker and Callee, corresponding to service callers and providers respectively, and the details of message communication can be encapsulated internally. parent_process and child_ The communication bridge of process is the IPC channel provided by the operating system. From the perspective of API, it can be simplified into two Event objects (the main process is cp and the sub process is process). As the two ends of the duplex channel in the middle, the Event object is temporarily named InvokerChannel and CalleeChannel.

Key entities and processes are as follows:

  • All methods that can be called are registered in Callee and saved in functionMap
  • The user calls invoker When invoking ():

    • Create a promise object, return it to the user, and save it in the promise map
    • Each call generates an id to ensure that the call and execution results correspond one to one
    • The timeout control is carried out, and the overtime task directly executes reject the promise
  • Invoker sends the calling method message to Callee through the Channel
  • Callee parses the received message, executes the corresponding method through name, and sends the result and completion status (success or exception) to Invoker through Channel
  • The Invoker parses the message and finds the corresponding promise object through id+name. If it succeeds, it will resolve and if it fails, it will reject

In fact, this design is not only applicable to IPC calls, but also can be directly applied in browser scenarios. For example, cross iframe calls can wrap window Postmessage(), which can be called across tabs, can use the storage event, and in the Web worker, it can use the worker PostMessage () acts as a bridge for communication.

Quick start

Based on the above design, it must be easy to realize coding. Take advantage of non working hours to quickly complete development and documentation. Source code: https://github.com/x-cold/event-invoke

Installation dependency

npm i -S event-invoke

Parent child process communication instance

Example code: Example code

// parent.js
const cp = require('child_process');
const { Invoker } = require('event-invoke');

const invokerChannel = cp.fork('./child.js');

const invoker = new Invoker(invokerChannel);

async function main() {
  const res1 = await invoker.invoke('sleep', 1000);
  console.log('sleep 1000ms:', res1);
  const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
  console.log('max(1, 2, 3):', res2);
  invoker.destroy();
}

main();
// child.js
const { Callee } = require('event-invoke');

const calleeChannel = process;

const callee = new Callee(calleeChannel);

// async method
callee.register(async function sleep(ms) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
});

// sync method
callee.register(function max(...args) {
  return Math.max(...args);
});

callee.listen();

Customize the Channel to realize PM2 interprocess call

Example code: Example code

// pm2.config.cjs
module.exports = {
  apps: [
    {
      script: 'invoker.js',
      name: 'invoker',
      exec_mode: 'fork',
    },
    {
      script: 'callee.js',
      name: 'callee',
      exec_mode: 'fork',
    }
  ],
};
// callee.js
import net from 'net';
import pm2 from 'pm2';
import {
  Callee,
  BaseCalleeChannel
} from 'event-invoke';

const messageType = 'event-invoke';
const messageTopic = 'some topic';

class CalleeChannel extends BaseCalleeChannel {
  constructor() {
    super();
    this._onProcessMessage = this.onProcessMessage.bind(this);
    process.on('message', this._onProcessMessage);
  }

  onProcessMessage(packet) {
    if (packet.type !== messageType) {
      return;
    }
    this.emit('message', packet.data);
  }

  send(data) {
    pm2.list((err, processes) => {
      if (err) { throw err; }
      const list = processes.filter(p => p.name === 'invoker');
      const pmId = list[0].pm2_env.pm_id;
      pm2.sendDataToProcessId({
        id: pmId,
        type: messageType,
        topic: messageTopic,
        data,
      }, function (err, res) {
        if (err) { throw err; }
      });
    });
  }

  destory() {
    process.off('message', this._onProcessMessage);
  }
}

const channel = new CalleeChannel();
const callee = new Callee(channel);

// async method
callee.register(async function sleep(ms) {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
});

// sync method
callee.register(function max(...args) {
  return Math.max(...args);
});

callee.listen();

// keep your process alive
net.createServer().listen();
// invoker.js
import pm2 from 'pm2';
import {
  Invoker,
  BaseInvokerChannel
} from 'event-invoke';

const messageType = 'event-invoke';
const messageTopic = 'some topic';

class InvokerChannel extends BaseInvokerChannel {
  constructor() {
    super();
    this._onProcessMessage = this.onProcessMessage.bind(this);
    process.on('message', this._onProcessMessage);
  }

  onProcessMessage(packet) {
    if (packet.type !== messageType) {
      return;
    }
    this.emit('message', packet.data);
  }

  send(data) {
    pm2.list((err, processes) => {
      if (err) { throw err; }
      const list = processes.filter(p => p.name === 'callee');
      const pmId = list[0].pm2_env.pm_id;
      pm2.sendDataToProcessId({
        id: pmId,
        type: messageType,
        topic: messageTopic,
        data,
      }, function (err, res) {
        if (err) { throw err; }
      });
    });
  }

  connect() {
    this.connected = true;
  }

  disconnect() {
    this.connected = false;
  }

  destory() {
    process.off('message', this._onProcessMessage);
  }
}

const channel = new InvokerChannel();
channel.connect();

const invoker = new Invoker(channel);

setInterval(async () => {
  const res1 = await invoker.invoke('sleep', 1000);
  console.log('sleep 1000ms:', res1);
  const res2 = await invoker.invoke('max', [1, 2, 3]); // 3
  console.log('max(1, 2, 3):', res2);
}, 5 * 1000);

next step

at present event-invoke It has the basic ability to call "IPC" gracefully, with code coverage of 100%, and provides relatively perfect Type description . Interested students can use it directly and ask any questions directly Issue.

Other parts that need to be continuously improved in the future:

  • Richer examples cover usage scenarios such as cross Iframe, cross tab, Web worker, etc
  • Provide universal Channel out of the box
  • More friendly exception handling

Keywords: Javascript node.js Front-end TypeScript pm2

Added by twm on Sat, 12 Feb 2022 16:28:03 +0200