Firebase onWrite cloud function does not run intermittently

179 Views Asked by At

I have been using firebase-queue for years to invoke jobs for years, but recently it started failing with transaction errors competing on resources, and had serious scaling issue.

So I migrated firebase-queue to cloud function and it seemed to be fast and very scalable initially. I wrote a simple script that is hooked up by onWrite on queue/task/{taskId}. But it seems like sometimes onWrite is never triggered randomly. I can not see the specific job triggered in the logs at all.

We are using blaze plan, promise is returned on the function, timelimit is set as maximum, 99.9% of the jobs are taking less than 5000ms.

We have like 50K invocations per day, and around 100 jobs are not triggered. This is so intermittent and have no clue on that. One strange thing is I don't see any errors on log viewer, but on the invocations graph, I can see some load error and connection error, and still no idea what are they. Any insight to resolve the issue would be appreciated.

const functions = require(`firebase-functions`);
const admin = require(`firebase-admin`);
const _ = require(`lodash`);
const specs = require(`./specs.json`);
const queueSpecs = require(`./queue-specs.js`);

const SERVER_TIMESTAMP = { ".sv": `timestamp` };

admin.initializeApp();

const workers = {};

queueSpecs.forEach((spec) => {
    let specString = spec;

    if (typeof spec === `object`) {
        specString = spec.id;
    }

    workers[specString] = require(`./lib/workers/${specString}`);
});

const taskHandler = (change, context) => {
    const data = change.after.val();
    const ref = change.after.ref;
    const { taskID } = context.params;

    if (!data) {
        return null;
    }

    const worker = workers[data._state];

    if (!worker) {
        console.error(`Worker not found for task ${taskID} - ${data._state}`);
        return null;
    }

    const specID = data._state.replace(`/`, `_`);
    const spec = specs[specID];

    if (!spec) {
        console.error(`Spec not found for task ${taskID} - ${specID}`);
        return null;
    }

    console.log(
        `Running ${taskID} - ${data._state} - ${JSON.stringify(data, null, 2)}`
    );

    return new Promise((resolve, reject) => {
        const taskResolve = newTask => {
            const updates = _.clone(newTask);
            let newState = _.get(newTask, `_new_state`, ``);

            newState = newState || spec.finished_state;

            if (!updates || !newState) {
                console.log(`Exiting task - ${taskID}`);
                return ref.set(null).then(resolve, reject);
            }

            updates._state = newState || spec.finished_state;
            updates._state_changed = SERVER_TIMESTAMP;
            updates._owner = null;
            updates._progress = null;
            updates._error_details = null;

            console.log(`Resolving`);

            return ref.update(updates).then(resolve, reject);
        };

        const taskReject = error => {
            const updates = {};
            let errorString;

            if (_.isError(error)) {
                errorString = error.message;
            } else if (_.isString(error)) {
                errorString = error;
            } else if (!_.isUndefined(error) && !_.isNull(error)) {
                errorString = error.toString();
            }

            if (updates._state === `error`) {
                console.log(`Exiting task on reject - ${taskID}`);
                return ref.set(null).then(resolve, reject);
            }

            updates._state = spec.error_state || `error`;
            updates._state_changed = SERVER_TIMESTAMP;
            updates._owner = null;
            updates._progress = null;
            updates._error_details = {
                previous_state: spec.in_progress_state,
                error: errorString,
                errorStack: _.get(error, `stack`, null),
                attempts: 1
            };

            console.log(`Rejecting`);
            return ref.update(updates).then(resolve, reject);
        };

        const taskProgress = () => {
            // eslint-disable-line
            // progress is never used, thus just resolving
            const updates = {};

            console.log(`Progress ????`);

            return ref.update(updates).then(resolve, reject);
        };

        worker(data, taskProgress, taskResolve, taskReject);
    });
};

exports.taskRunner = functions.database
    .ref(`/queue/tasks/{taskID}`)
    .onWrite(taskHandler);
0

There are 0 best solutions below