Why event listener "pause" execution of async function to start new execution?

229 Views Asked by At

Why does the Node.js event listener "pause" the execution of an async function to start second execution if to emit the same event second time? And the second question: how does it possible to finish first execution, then start the second one?

I.e., if to launch this code in Node.js:

import { EventEmitter } from "events";

let event = new EventEmitter();

event.on("myEvent", async function () {
  console.log("Start");
  await new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(console.log("Do job"));
    }, 1000);
  });
  console.log("Finish");
});

event.emit("myEvent"); // first emit
event.emit("myEvent"); // second emit

Then I'm getting such result:

Start
Start
Do job
Finish
Do job
Finish

Hovewer I'd like to see this:

Start
Do job
Finish
Start
Do job
Finish

UPDATE Below I put real code which contains the described problem

const web3 = new Web3(
  new Web3.providers.WebsocketProvider(
    "wss://eth-mainnet.g.alchemy.com/v2/<API-KEY>"
  )
);
let walletAddress = "0x123";

let options = {
  topics: [web3.utils.sha3("Transfer(address,address,uint256)")],
};

let subscription = web3.eth.subscribe("logs", options);

subscription.on("data", async (event) => {
  if (event.topics.length == 3) {
    let transaction = decodeTransaction(event); //just using web3.eth.abi.decodeLog(...)
    if (
      transaction.from === walletAddress ||
      transaction.to === walletAddress
    ) {
      const contract = new web3.eth.Contract(abi, event.address);
      let coinSymbol = await contract.methods.symbol().call(); //<-- The issue starts here
      await redisClient.hSet(
        walletAddress,
        coinSymbol,
        transaction.value
      );
    }
  }
});
1

There are 1 best solutions below

4
On

The key here is that async functions don't block the interpreter and EventEmitter events don't wait for async event handlers to resolve their promise.

So, this is what happens:

  1. The first event.emit() gets called. This synchronously triggers the myEvent handler function to get called.
  2. That function executes. After outputting start, it hits an await. That causes it to suspend further execution of the function and immediately return a promise back to the caller. This causes the first event.emit(...) to be done as the eventEmitter object is not promise-aware - it pays no attention at all to the promise that your event handler function returns.
  3. The second event.emit() gets called. This synchronously triggers the myEvent handler function to get called.
  4. That function executes. After outputting start, it hits an await. That causes it to suspend further execution of the function and immediately return a promise back to the caller. This causes the second event.emit(...) to be done.

So, this is why your output starts with:

Start
Start

Then, sometime later (after a setTimeout() fires), console.log("Do job") outputs the promise gets resolved which causes the await to be satisfied and the function resumes execution after the await. This then outputs Finish.

So, at this point, the first timer has fired and you have:

Start
Start
Do job
Finish

Then the second setTimeout() fires and does the same and then you have this which is your full output:

Start
Start
Do job
Finish
Do job
Finish

The key here is that the EventEmitter class is not promise-aware for it's event handlers. It pays no attention to the promise that your async function returns and thus does not "wait" for it to resolve before allowing the rest of your code to continue executing.


If what you're trying to do in your subscription.on('data', ...) code is to serialize the processing of each event so you finish processing one before starting the processing of the next one, then you can queue your events and only process the next one when the prior one has finished. If a new event arrives while you're still processing a previous one, it just gets put in the queue and stays there until the prior one is done processing.

Here's how that code could look:

const eventQueue = [];
let eventInProgress = false;

async function processEvent(event) {
    try {
        eventInProgress = true;
        if (event.topics.length == 3) {
            let transaction = decodeTransaction(event); //just using web3.eth.abi.decodeLog(...)
            if (
                transaction.from === walletAddress ||
                transaction.to === walletAddress
            ) {
                const contract = new web3.eth.Contract(abi, event.address);
                let coinSymbol = await contract.methods.symbol().call(); //<-- The issue starts here
                await redisClient.hSet(
                    walletAddress,
                    coinSymbol,
                    transaction.value
                );
            }
        }
    } catch (e) {
        // have to decide what to do with rejections from either of the await statements
        console.log(e);
    } finally {
        eventInProgress = false;
        // if there are more events to process, then process the oldest one
        if (eventQueue.length) {
            processEvent(eventQueue.shift());
        }
    }
}

subscription.on("data", (event) => {
    // serialize the processing of events
    if (eventInProgress) {
        eventQueue.push(event);
    } else {
        processEvent(event);
    }
});