How to pause and resume multipart uploads for large files using AWS S3 v3 Javascript SDK?

965 Views Asked by At

I have found the documentation for handling multipart uploads using the AWS v3 Javascript SDK. Here is the documentation on using the CreateMultipartUploadCommand. They also have an Upload() method that abstracts some of the multipart uploading.

However, I couldn't find any easy way to use the SDK to pause and resume multipart uploads at a later time. Nor could I find a way to transparently handle expiring temporary credentials that were obtained using the AssumeRoleCommand. The maximum length for credentials varies between 1 and 12 hours, and according to the AWS documentation "Role chaining limits your Amazon Web Services CLI or Amazon Web Services API role session to a maximum of one hour." As I am using role chaining I am limited to 1 hour and will need to transparently refresh the credentials if the upload is taking longer than 1 hour.

1

There are 1 best solutions below

0
On

I ended up just copying the AWS Upload() file (plus its dependent files minus the index files) and modifying the Upload() command to suit my needs. I will post my entire ModifiedUpload.ts file here, but you will need to make adjustments as I am using a custom built S3Service class for the appS3Service property. Its only passed in as it has the s3 client, and is used to regenerate a new s3 client. Should be easy enough to change/remove.

I would encourage you to download the existing aws files, and then use either a git compare or IDE compare functionality to compare the new ModifiedUpload.ts file with the Upload.ts file so you can easily see what changes I made, and make modifications yourself.

Libraries I use that weren't in the original SDK:

  • "async-lock": "^1.3.1"
  • "@types/async-lock": "^1.1.5"
  • "ngx-logger": "^5.0.11"
  • "dayjs": "^1.11.3"
  • "rxjs": "~7.4.0"

High level overview of changes I made:

Pausing and Resuming

  1. Pass in a resumeUploadId to Upload() if you have a multipart upload you wish to resume. See the ListMultiPartUploadsCommand for retrieving a list of uncompleted uploads and their id
  2. Update the Upload.done() method to check for resumeUploadId, and call a new checkForPartialUploads() command, which in turn calls listMultipartUploadRetryWrapper()
  3. listMultipartUploadRetryWrapper() will pull all your uploaded parts and push them to this.uploadedParts, and will then call __notifyProgress() to let the upstream callers of the progress
  4. Thats basically it. uploadedParts was a pre-existing property so once we push our existing parts to it, the rest is already handled.

I did update __doConcurrentUpload() to check if the signal was aborted (user paused) and all concurrent uploads have finished. In which case it will send a "allUploadsCompletedAfterAbort" customEvent so your upstream callers can disable the 'resume' button until all pending uploads are complete. In your upstream caller you will want to subscribe to the customEvents property, which can push only two types currently, "allUploadsCompletedAfterAbort" and "noUploadPartsFound". The latter one you can use to display an error to the user indicating that uploading has to restart over.

So when a user wants to pause, you can call the abort() method on your Upload() object. If a user wants to resume in that same session where you still have the first Upload() object (FirstUpload), you can recreate a new Upload() object, and pass in the FirstUpload.getUploadId() as the resumeUploadId in the secondaryOptions param.

If the user refreshes the page in the middle of uploading, or closes and comes back later, then you will need to use the ListMultiPartUploadsCommand to retrieve the upload ID you want (let the user pick, you default to the latest, etc) and pass it to the Upload() constructor as the resumeUploadId.

Transparent refresh of S3Client/S3 Credentials

  1. I passed in the s3Url, objType (specific to our app and relevant for our s3 path), objId and appS3Service properties into the new second parameter UploadSecondaryOptions. These are just used so our S3Service can generate new credentials. You will want to modify this (or remove if you dont need this transparent refresh of credentials ability).
  2. Within the concurrentUploadRetryWrapper method (created from __doConcurrentUpload) I use asyn-lock library to acquire a lock on s3credentialskey, just so that only 1 of our processes is checking credentials at a time.
  3. I check the expiration date on our client credentials
  4. If the credentials are expiring within 10 minutes, I call our custom appS3Service to generate new credentials

Other changes

You will notice in some places I just added some retry logic for redundancy. If a user was uploading large files I wanted it to be as redundant as possible.

// Modified from AWS Upload class https://github.com/aws/aws-sdk-js-v3/blob/main/lib/lib-storage/src/Upload.ts
/* eslint-disable */
import { AbortController, AbortSignal } from "@aws-sdk/abort-controller";
import {
  AbortMultipartUploadCommandOutput,
  CompletedPart,
  CompleteMultipartUploadCommand,
  CompleteMultipartUploadCommandInput,
  CompleteMultipartUploadCommandOutput,
  CreateMultipartUploadCommand,
  CreateMultipartUploadCommandOutput,
  ListPartsCommand,
  ListPartsCommandOutput,
  PutObjectCommand,
  PutObjectCommandInput,
  PutObjectTaggingCommand,
  S3Client,
  Tag,
  UploadPartCommand,
} from "@aws-sdk/client-s3";
import { Progress, BodyDataTypes } from "@aws-sdk/lib-storage";
import {
  EndpointParameterInstructionsSupplier,
  getEndpointFromInstructions,
  toEndpointV1,
} from "@aws-sdk/middleware-endpoint";
import { HttpRequest } from "@aws-sdk/protocol-http";
import { extendedEncodeURIComponent } from "@aws-sdk/smithy-client";
import { Endpoint } from "@aws-sdk/types";
import { EventEmitter } from "events";

import { byteLength } from "./bytelength";
import { getChunk } from "./chunker";
import * as dayjs from "dayjs";
import { S3Service } from "../s3.service";
import { NGXLogger } from "ngx-logger";
import { Options } from "./types";
import * as AsyncLock from "async-lock";
import { Subject } from "rxjs";
import { HttpStatusCode } from "@angular/common/http";

export interface RawDataPart {
  partNumber: number;
  data: BodyDataTypes;
  lastPart?: boolean;
}

// New class I created. S3Service appS3Service was custom to our app. It is just used
// to regenerate a new s3 client with new creds.  
// s3Url, objId, objType are only used for that purpose, and you
// may not need any of those properties
export class UploadSecondaryOptions {
  public s3Url: string
  public objId: string
  public objType: string
  public appS3Service: S3Service
  public resumeUploadId: string
  public logger: NGXLogger
}

const S3_RETRY_COUNT = 3;

const MIN_PART_SIZE = 1024 * 1024 * 5;

export class ModifiedUpload extends EventEmitter {
  /**
   * S3 multipart upload does not allow more than 10000 parts.
   */
  private MAX_PARTS = 10000;

  // Defaults.
  private queueSize = 4;
  private partSize = MIN_PART_SIZE;
  private leavePartsOnError = false;
  private tags: Tag[] = [];

  private client: S3Client;
  private params: PutObjectCommandInput;

  // used for reporting progress.
  private totalBytes?: number;
  private bytesUploadedSoFar: number;

  // used in the upload.
  private abortController: AbortController;
  private concurrentUploaders: Promise<void>[] = [];
  private createMultiPartPromise?: Promise<CreateMultipartUploadCommandOutput>;

  private uploadedParts: CompletedPart[] = [];
  private uploadId?: string;
  uploadEvent?: string;

  private isMultiPart = true;
  private singleUploadResult?: CompleteMultipartUploadCommandOutput;

  // ---- custom properties not in aws sdk -----
  private s3Url: string
  private objId: string
  private objType: string
  private appS3Service: S3Service
  private logger: NGXLogger
  private asyncLock = new AsyncLock()
  private resumeUploadId: string = ''
  private runningUploads: number = 0
  private allUploadsCompletedAfterAbort = false
  private createMultipartFailed = false
  public customEvents: Subject<'noUploadPartsFound' | 'allUploadsCompletedAfterAbort'> = new Subject()
  // -------------------------------------------

  constructor(options: Options, secondaryOptions: UploadSecondaryOptions) {
    super();

    // set defaults from options.
    this.queueSize = options.queueSize || this.queueSize;
    this.partSize = options.partSize || this.partSize;
    this.leavePartsOnError = options.leavePartsOnError || this.leavePartsOnError;
    this.tags = options.tags || this.tags;

    this.client = options.client;
    this.params = options.params;

    this.s3Url = secondaryOptions.s3Url
    this.objId = secondaryOptions.objId
    this.objType = secondaryOptions.objType
    this.appS3Service = secondaryOptions.appS3Service
    this.logger = secondaryOptions.logger
    this.resumeUploadId = secondaryOptions.resumeUploadId

    this.__validateInput();

    // set progress defaults
    this.totalBytes = byteLength(this.params.Body);
    this.bytesUploadedSoFar = 0;
    this.abortController = options.abortController ?? new AbortController();
  }

  async abort(): Promise<void> {
    /**
     * Abort stops all new uploads and immediately exists the top level promise on this.done()
     * Concurrent threads in flight clean up eventually.
     */
    this.abortController.abort();
  }

  public async done(): Promise<CompleteMultipartUploadCommandOutput | AbortMultipartUploadCommandOutput> {
    if (this.resumeUploadId) {
      await this.checkForPartialUploads()
    }
    return await Promise.race([this.__doMultipartUpload(), this.__abortTimeout(this.abortController.signal)]);
  }

  public getUploadId() {
    return this.uploadId
  }

  public getRunningUploads() {
    return this.runningUploads
  }

  public getBytesUploadedSoFar() {
    return this.bytesUploadedSoFar
  }

  private async checkForPartialUploads() {
    if (!this.isMultiPart) {
      return
    }

    this.uploadId = this.resumeUploadId

    try {
      await this.listMultipartUploadRetryWrapper(0)
    } catch (error: any) {
      this.logger.error('Error ocurred listing parts in AWS:')
      this.logger.error(error)
      this.uploadId = null
      this.resumeUploadId = null
      this.bytesUploadedSoFar = 0
      this.uploadedParts = []

      // tell the calling component we are going to start uploading from scratch
      this.customEvents.next('noUploadPartsFound')
    }
  }

  public on(event: 'httpUploadProgress', listener: (progress: Progress) => void): this {
    this.uploadEvent = event;
    return super.on(event, listener);
  }

  private async __uploadUsingPut(dataPart: RawDataPart): Promise<void> {
    this.isMultiPart = false;
    const params = { ...this.params, Body: dataPart.data };

    const clientConfig = this.client.config;
    const requestHandler = clientConfig.requestHandler;
    const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;
    const uploadEventListener = (event: ProgressEvent) => {
      this.bytesUploadedSoFar = event.loaded;
      this.totalBytes = event.total;
      this.__notifyProgress({
        loaded: this.bytesUploadedSoFar,
        total: this.totalBytes,
        part: dataPart.partNumber,
        Key: this.params.Key,
        Bucket: this.params.Bucket,
      });
    };

    if (eventEmitter !== null) {
      // The requestHandler is the xhr-http-handler.
      eventEmitter.on("xhr.upload.progress", uploadEventListener);
    }

    const resolved = await Promise.all([this.client.send(new PutObjectCommand(params)), clientConfig?.endpoint?.()]);
    const putResult = resolved[0];
    let endpoint: Endpoint | undefined = resolved[1];

    if (!endpoint) {
      endpoint = toEndpointV1(
        await getEndpointFromInstructions(params, PutObjectCommand as EndpointParameterInstructionsSupplier, {
          ...clientConfig,
        })
      );
    }

    if (!endpoint) {
      throw new Error('Could not resolve endpoint from S3 "client.config.endpoint()" nor EndpointsV2.');
    }

    if (eventEmitter !== null) {
      eventEmitter.off("xhr.upload.progress", uploadEventListener);
    }

    const locationKey = this.params
      .Key!.split("/")
      .map((segment) => extendedEncodeURIComponent(segment))
      .join("/");
    const locationBucket = extendedEncodeURIComponent(this.params.Bucket!);

    const Location: string = (() => {
      const endpointHostnameIncludesBucket = endpoint.hostname.startsWith(`${locationBucket}.`);
      const forcePathStyle = this.client.config.forcePathStyle;
      if (forcePathStyle) {
        return `${endpoint.protocol}//${endpoint.hostname}/${locationBucket}/${locationKey}`;
      }
      if (endpointHostnameIncludesBucket) {
        return `${endpoint.protocol}//${endpoint.hostname}/${locationKey}`;
      }
      return `${endpoint.protocol}//${locationBucket}.${endpoint.hostname}/${locationKey}`;
    })();

    this.singleUploadResult = {
      ...putResult,
      Bucket: this.params.Bucket,
      Key: this.params.Key,
      Location,
    };
    const totalSize = byteLength(dataPart.data);

    this.__notifyProgress({
      loaded: totalSize,
      total: totalSize,
      part: 1,
      Key: this.params.Key,
      Bucket: this.params.Bucket,
    });
  }

  private async __createMultipartUpload(): Promise<void> {
    await this.createMultipartUploadRetryWrapper(0)
  }

  private async createMultipartUploadRetryWrapper(retryCount: number) {
    let thrownError = null
    try {
      if (!this.createMultiPartPromise) {
        const createCommandParams = { ...this.params, Body: undefined };
        this.createMultiPartPromise = this.client.send(new CreateMultipartUploadCommand(createCommandParams));
      }
      const createMultipartUploadResult = await this.createMultiPartPromise;
      if (createMultipartUploadResult.$metadata.httpStatusCode !== HttpStatusCode.Ok ) {
        throw new Error(`Non-200 code received when attempting to create multipartupload`)
      }
      this.uploadId = createMultipartUploadResult.UploadId;
    } catch (error) {
      this.logger.error(`Error creating multipart upload on attempt ${retryCount}:`)
      this.logger.error(error)
      thrownError = error
    }

    if (thrownError && retryCount < S3_RETRY_COUNT) {
      await this.createMultipartUploadRetryWrapper(retryCount+1)
    } else if (thrownError) {
      this.createMultipartFailed = true
      throw thrownError
    }
  }

  private async listMultipartUploadRetryWrapper(retryCount: number) {
    this.uploadedParts = []
    this.bytesUploadedSoFar = 0
    let thrownError = null

    try {
      const listPartsCommand = new ListPartsCommand({
        Bucket: this.params.Bucket,
        Key: this.params.Key,
        UploadId: this.uploadId,
      })
      const response: ListPartsCommandOutput = await this.client.send(listPartsCommand)
      if (response.$metadata?.httpStatusCode !== 200) {
        throw new Error('Non-200 HTTP code returned listing multipart upload parts')
      }

      if (response.Parts?.length) {
        for (let part of response.Parts) {
          this.uploadedParts.push({
            PartNumber: part.PartNumber,
            ETag: part.ETag,
            ...(part.ChecksumCRC32 && { ChecksumCRC32: part.ChecksumCRC32 }),
            ...(part.ChecksumCRC32C && { ChecksumCRC32C: part.ChecksumCRC32C }),
            ...(part.ChecksumSHA1 && { ChecksumSHA1: part.ChecksumSHA1 }),
            ...(part.ChecksumSHA256 && { ChecksumSHA256: part.ChecksumSHA256 }),
          });
          if (part.Size) {
            this.bytesUploadedSoFar += part.Size
          }
        }

        if (this.uploadedParts?.length) {
          // make sure parent immediately sees how much we've already uploaded in the past
          this.uploadedParts.sort((partA, partB) => partA.PartNumber - partB.PartNumber)
          this.__notifyProgress({
            loaded: this.bytesUploadedSoFar,
            total: this.totalBytes,
            part: this.uploadedParts[this.uploadedParts.length - 1].PartNumber,
            Key: this.params.Key,
            Bucket: this.params.Bucket,
          });
        }
      } else {
        throw new Error('No uploaded parts found.')
      }
    } catch (error) {
      this.logger.error(`Error listing multipart upload ${this.params.Key} with id ${this.uploadId} on attempt ${retryCount}:`)
      this.logger.error(error)
      thrownError = error
    }

    if (thrownError && retryCount < S3_RETRY_COUNT) {
      await this.listMultipartUploadRetryWrapper(retryCount+1)
    } else if (thrownError) {
      throw thrownError
    }
  }

  private async __doConcurrentUpload(dataFeeder: AsyncGenerator<RawDataPart, void, undefined>): Promise<void> {
    for await (const dataPart of dataFeeder) {
      this.runningUploads++
      try {
        await this.concurrentUploadRetryWrapper(dataPart, 0)
      } catch (error) {
        throw error
      } finally {
        this.runningUploads--

        // if aborted, and the threadcount is 0, and we have not yet pushed this customEvent, push customEvent 'allUploadsCompletedAfterAbort'
        if (this.abortController.signal.aborted && this.runningUploads <= 0 && !this.allUploadsCompletedAfterAbort) {
          this.allUploadsCompletedAfterAbort = true
          this.customEvents.next('allUploadsCompletedAfterAbort')
        }
      }
    }
  }

  private async concurrentUploadRetryWrapper(dataPart: RawDataPart, retryCount: number) {
    if (this.resumeUploadId && this.uploadedParts.some((completedPart) => completedPart.PartNumber === dataPart.partNumber)) {
      // then this part was already uploaded so skip it
      return;
    }

    if (this.uploadedParts.length > this.MAX_PARTS) {
      throw new Error(`Exceeded ${this.MAX_PARTS} as part of the upload to ${this.params.Key} and ${this.params.Bucket}.`);
    }

    // check if the client credentials will time out
    this.asyncLock.acquire('s3credentialskey', async () => {
      try {
        let credentialsExpiration: Date = (await this.client.config.credentials()).expiration
        if (dayjs().add(10, 'minutes').isAfter(dayjs(credentialsExpiration))) {
          // then credentials will expire within 10 minutes so generate a new client
          await this.appS3Service.generateS3Client(this.s3Url, this.objId, this.objType, true)
          this.client = this.appS3Service.client
        }
      } catch (error: any) {
        this.logger.error(`Error occurred generating new s3 credentials:`)
        this.logger.error(error)

        if (retryCount < S3_RETRY_COUNT) {
          await this.concurrentUploadRetryWrapper(dataPart, retryCount+1)
        }
      }
    })

    try {
      if (this.abortController.signal.aborted) {
        return;
      }

      // Use put instead of multi-part for one chunk uploads.
      if (dataPart.partNumber === 1 && dataPart.lastPart) {
        return await this.__uploadUsingPut(dataPart);
      }

      if (!this.uploadId) {
        await this.asyncLock.acquire('createMultipartUpload', async () => {
          // then check to make sure we have no upload id first.
          // This second check on uploadId is needed inside the lock
          // as another thread may have finished. We also don't put the asyncLock
          // on the outside of the if() since we'd be locking every thread,
          // and we need the abort check after
          if (!this.uploadId && !this.createMultipartFailed) {
            await this.__createMultipartUpload();
          }
        })
        if (this.abortController.signal.aborted) {
          return;
        }
      }

      const partSize: number = byteLength(dataPart.data) || 0;

      const requestHandler = this.client.config.requestHandler;
      const eventEmitter: EventEmitter | null = requestHandler instanceof EventEmitter ? requestHandler : null;

      let lastSeenBytes = 0;
      const uploadEventListener = (event: ProgressEvent, request: HttpRequest) => {
        const requestPartSize = Number(request.query["partNumber"]) || -1;

        if (requestPartSize !== dataPart.partNumber) {
          // ignored, because the emitted event is not for this part.
          return;
        }

        if (event.total && partSize) {
          this.bytesUploadedSoFar += event.loaded - lastSeenBytes;
          lastSeenBytes = event.loaded;
        }

        this.__notifyProgress({
          loaded: this.bytesUploadedSoFar,
          total: this.totalBytes,
          part: dataPart.partNumber,
          Key: this.params.Key,
          Bucket: this.params.Bucket,
        });
      };

      if (eventEmitter !== null) {
        // The requestHandler is the xhr-http-handler.
        eventEmitter.on("xhr.upload.progress", uploadEventListener);
      }

      const partResult = await this.client.send(
        new UploadPartCommand({
          ...this.params,
          UploadId: this.uploadId,
          Body: dataPart.data,
          PartNumber: dataPart.partNumber,
        }),
      );

      if (eventEmitter !== null) {
        eventEmitter.off("xhr.upload.progress", uploadEventListener);
      }

      if (this.abortController.signal.aborted) {
        return;
      }

      if (!partResult.ETag) {
        throw new Error(
          `Part ${dataPart.partNumber} is missing ETag in UploadPart response. Missing Bucket CORS configuration for ETag header?`,
        );
      }

      this.uploadedParts.push({
        PartNumber: dataPart.partNumber,
        ETag: partResult.ETag,
        ...(partResult.ChecksumCRC32 && { ChecksumCRC32: partResult.ChecksumCRC32 }),
        ...(partResult.ChecksumCRC32C && { ChecksumCRC32C: partResult.ChecksumCRC32C }),
        ...(partResult.ChecksumSHA1 && { ChecksumSHA1: partResult.ChecksumSHA1 }),
        ...(partResult.ChecksumSHA256 && { ChecksumSHA256: partResult.ChecksumSHA256 }),
      });

      if (eventEmitter === null) {
        this.bytesUploadedSoFar += partSize;
      }

      this.__notifyProgress({
        loaded: this.bytesUploadedSoFar,
        total: this.totalBytes,
        part: dataPart.partNumber,
        Key: this.params.Key,
        Bucket: this.params.Bucket,
      });
    } catch (e) {
      // Failed to create multi-part or put
      if (!this.uploadId) {
        throw e;
      }
      // on leavePartsOnError throw an error so users can deal with it themselves,
      // otherwise swallow the error.
      if (retryCount < S3_RETRY_COUNT) {
        await this.concurrentUploadRetryWrapper(dataPart, retryCount+1)
      } else if (this.leavePartsOnError) {
        throw e;
      }
    }
  }

  private async __doMultipartUpload(): Promise<CompleteMultipartUploadCommandOutput> {
    // Set up data input chunks.
    const dataFeeder = getChunk(this.params.Body, this.partSize);

    // Create and start concurrent uploads.
    for (let index = 0; index < this.queueSize; index++) {
      const currentUpload = this.__doConcurrentUpload(dataFeeder);
      this.concurrentUploaders.push(currentUpload);
    }

    // Create and start concurrent uploads.
    await Promise.all(this.concurrentUploaders);
    if (this.abortController.signal.aborted) {
      throw Object.assign(new Error("Upload aborted."), { name: "AbortError" });
    }

    let result;
    if (this.isMultiPart) {

      this.uploadedParts.sort((a, b) => a.PartNumber! - b.PartNumber!);

      const uploadCompleteParams = {
        ...this.params,
        Body: undefined,
        UploadId: this.uploadId,
        MultipartUpload: {
          Parts: this.uploadedParts,
        },
      };
      result = await this.completeMultipartUploadRetryWrapper(uploadCompleteParams, 0);
    } else {
      result = this.singleUploadResult!;
    }

    // Add tags to the object after it's completed the upload.
    if (this.tags.length) {
      await this.client.send(
        new PutObjectTaggingCommand({
          ...this.params,
          Tagging: {
            TagSet: this.tags,
          },
        })
      );
    }

    return result;
  }

  private async completeMultipartUploadRetryWrapper(uploadCompleteParams: CompleteMultipartUploadCommandInput, retryCount: number) {
    let result: CompleteMultipartUploadCommandOutput = null
    try {
      result = await this.client.send(new CompleteMultipartUploadCommand(uploadCompleteParams));
      if (result.$metadata.httpStatusCode === HttpStatusCode.Ok || retryCount >= S3_RETRY_COUNT) {
        return result
      } else {
        return await this.completeMultipartUploadRetryWrapper(uploadCompleteParams, retryCount+1)
      }
    } catch (error) {
      this.logger.error(`Error completing multipart upload on attempt ${retryCount}:`)
      this.logger.error(error)
      if (retryCount < S3_RETRY_COUNT) {
        return await this.completeMultipartUploadRetryWrapper(uploadCompleteParams, retryCount+1)
      }
    }
  }

  private __notifyProgress(progress: Progress): void {
    if (this.uploadEvent) {
      this.emit(this.uploadEvent, progress);
    }
  }

  private async __abortTimeout(abortSignal: AbortSignal): Promise<AbortMultipartUploadCommandOutput> {
    return new Promise((resolve, reject) => {
      abortSignal.onabort = () => {
        const abortError = new Error("Upload aborted.");
        abortError.name = "AbortError";
        reject(abortError);
      };
    });
  }

  private __validateInput(): void {
    if (!this.params) {
      throw new Error(`InputError: Upload requires params to be passed to upload.`);
    }

    if (!this.client) {
      throw new Error(`InputError: Upload requires a AWS client to do uploads with.`);
    }

    if (this.partSize < MIN_PART_SIZE) {
      throw new Error(
        `EntityTooSmall: Your proposed upload partsize [${this.partSize}] is smaller than the minimum allowed size [${MIN_PART_SIZE}] (5MB)`
      );
    }

    if (this.queueSize < 1) {
      throw new Error(`Queue size: Must have at least one uploading queue.`);
    }
  }
}