Error Moving BigQuery table snapshot to Google cloud storage

61 Views Asked by At

I'm attempting to generate snapshots for my BigQuery tables and transfer them to Google Cloud Storage. While I can successfully create the snapshots, I encounter an error when attempting to move them to GCS.

google.api_core.exceptions.BadRequest: 400 melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 is not allowed for this operation because it is currently a SNAPSHOT.; reason: invalid, message: melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 is not allowed for this operation because it is currently a SNAPSHOT

Below is my code

import os
import base64
import datetime
import json
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession

load_dotenv()

# Access environment variables
SERVICE_ACCOUNT_FILE = 'credentials.json'
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
FILE_EXTENSION = os.getenv("FILE_EXTENSION")

# Construct the URL for the API endpoint
BASE_URL = f"https://bigquery.googleapis.com"

credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

def move_snapshots_to_gcs(env_vars):

    # acess environment variables from scheduler
    BUCKET_NAME = env_vars.get("BUCKET_NAME")
    LOCATION = env_vars.get("LOCATION")

    authed_session = AuthorizedSession(credentials)

   # Generate a timestamp string
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    # Generate the folder name with the current date
    folder_name = f'SNAPSHOT_{timestamp.split("_")[0]}'

    # Initialize a GCS client
    storage_client = storage.Client(credentials=credentials)

    # Get the bucket
    bucket = storage_client.bucket(BUCKET_NAME)

    # Check if the folder exists in the bucket
    folder_blob = bucket.blob(folder_name + "/")

    if not folder_blob.exists():
        # Create the folder
        folder_blob.upload_from_string("")
        print(f"Folder {folder_name} created.")

    # Check if the flag file exists in the folder of that bucket
    flag_blob = bucket.blob(f"{folder_name}/backup_flag")

    if flag_blob.exists():
        print("Backup has already been performed. Exiting...")
        return

    # Construct a BigQuery client object with the service account credentials
    client = bigquery.Client(credentials=credentials)

    # List tables in the specified dataset
    tables = client.list_tables(DATASET_ID)
 
    print("Tables contained in '{}':".format(DATASET_ID))
    for table in tables:
        
        TABLE_ID=table.table_id
        SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"

        # Make the GET request with the access token
        response_status_code = create_table_snapshot()
        print(response_status_code)
        if response_status_code == 200:
            print(response_status_code)


            # Define the filename with the timestamp and filetype as parquet
            filename = f"{table.table_id.split('.')[-1]}_{timestamp}.{FILE_EXTENSION}"

            # Format the destination URI
            destination_uri = f"gs://{BUCKET_NAME}/{folder_name}/{filename}"
            dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
            table_ref = dataset_ref.table(SNAPSHOT_TABLE_ID)

            extract_job = client.extract_table(
                table_ref,
                destination_uri,
                # Location must match that of the source table.
                location=LOCATION,
            )  # API request
            extract_job.result()  # Waits for job to complete.

            print(
                "Exported {}:{}.{} to {}".format(table.project, table.dataset_id, table.table_id, destination_uri)
            )


def build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID):
    json_body = f'''
            {{
                "configuration": {{
                    "copy": {{
                    "sourceTables": [
                        {{
                            "projectId": "{PROJECT_ID}",
                            "datasetId": "{DATASET_ID}",
                            "tableId": "{TABLE_ID}"
                        }}
                    ],
                    "destinationTable": 
                    {{
                        "projectId": "{PROJECT_ID}",
                        "datasetId": "{DATASET_ID}",
                        "tableId": "{SNAPSHOT_TABLE_ID}"
                    }},
                    "operationType": "SNAPSHOT",
                    "writeDisposition": "WRITE_EMPTY"
                    }}
                }}
            }}
    '''
    return json.loads(json_body)

def create_table_snapshot():
   # Generate a timestamp string
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    authed_session = AuthorizedSession(credentials)

    # Construct a BigQuery client object with the service account credentials
    client = bigquery.Client(credentials=credentials)

    # List tables in the specified dataset
    tables = client.list_tables(DATASET_ID)
    
    last_response_status = None  # Variable to store the last response status code

    for table in tables:

        TABLE_ID=table.table_id
        SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"

        json = build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID)

        # Make the POST request to create snapshot
        response = authed_session.post(
            url=f'{BASE_URL}/bigquery/v2/projects/{PROJECT_ID}/jobs',
            json=json,
            headers={'Content-Type': 'application/json'}
        )

        print(response.json())

        last_response_status = response.status_code  # Update the status code

    return last_response_status

if __name__ == "__main__":
    #print(build_json_body('melodic-map-408207', 'kebron_dataset', 'student'))
    move_snapshots_to_gcs({'BUCKET_NAME': 'kebron_daily_backup_bucket', 'LOCATION': 'US'})

Below is my error enter image description here

I attempted to do it manually, but encountered a similar error. enter image description here

enter image description here

Shouldn't this be working correctly? Why isn't it functioning as expected?

1

There are 1 best solutions below

0
Mel On

Based on your use case, you want to "Export table data to Cloud Storage". The document can guide you through it, either coded or via the console UI.

Make sure to review the limitations, needed permissions and location considerations as mentioned by @Kolban

Sample Code:

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import org.threeten.bp.Duration;

public class ExtractTableToCsv {

  public static void runExtractTableToCsv() {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "bigquery-public-data";
    String datasetName = "samples";
    String tableName = "shakespeare";
    String bucketName = "my-bucket";
    String destinationUri = "gs://" + bucketName + "/path/to/file";
    // For more information on export formats available see:
    // https://cloud.google.com/bigquery/docs/exporting-data#export_formats_and_compression_types
    // For more information on Job see:
    // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html

    String dataFormat = "CSV";
    extractTableToCsv(projectId, datasetName, tableName, destinationUri, dataFormat);
  }

  // Exports datasetName:tableName to destinationUri as raw CSV
  public static void extractTableToCsv(
      String projectId,
      String datasetName,
      String tableName,
      String destinationUri,
      String dataFormat) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(projectId, datasetName, tableName);
      Table table = bigquery.getTable(tableId);

      Job job = table.extract(dataFormat, destinationUri);

      // Blocks until this job completes its execution, either failing or succeeding.
      Job completedJob =
          job.waitFor(
              RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
              RetryOption.totalTimeout(Duration.ofMinutes(3)));
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to extract due to an error: \n" + job.getStatus().getError());
        return;
      }
      System.out.println(
          "Table export successful. Check in GCS bucket for the " + dataFormat + " file.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}