I am migrating data from gcp to aws. For this, I'm following this aws glue solution [1] utilizing the Spark Big Query Connector [2]. This approach is working fine for full loads, however I want to modify the job to load data incrementally based on date partitions. When providing the 'datePartition' property mentioned in [2] to the glue job as an additional connection option, I get the following error:
'An error occurred while calling o94.getDynamicFrame. com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: request failed: table decorators are not supported in standard SQL'
Can anyone help me out circumvening this, or guide me towards a different implementation of incremental loads? It would help me out tremendously.
[1] https://aws.amazon.com/blogs/big-data/migrating-data-from-google-bigquery-to-amazon-s3-using-aws-glue-custom-connectors/
[2] https://github.com/GoogleCloudDataproc/spark-bigquery-connector
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Google BigQuery Connector 0.24.2 for AWS Glue 3.0
GoogleBigQueryConnector0242forAWSGlue30_node1 = (
glueContext.create_dynamic_frame.from_options(
connection_type="marketplace.spark",
connection_options={
"datePartition": "20230501",
"parentProject": "project_name",
"dataset": "dataset_name",
"table": "table_name",
"connectionName": "connection_name",
},
transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node1",
)
)
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=GoogleBigQueryConnector0242forAWSGlue30_node1,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "bucket_name",
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="S3bucket_node3",
)
job.commit()