I have requirement to load data from BQ to PostGreSQL using GCP Workflows.
Basically i am doing a HTTP Post call to PostGreSQL and invoking "importContext" method to perfrom the import operation.
steps:
- callImport:
call: http.post
args:
url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/" + instance + "/import"}
auth:
type: OAuth2
body:
importContext:
uri: ${file}
database: ${databaseschema}
fileType: CSV
csvImportOptions:
table: ${importtable}
columns : [a,b,c,b]
result: operation
In my flow i am calling HTTP GET method to check if the import operation is DONE or NOT , strangely in case of any ERROR while import the response is getting status code as DONE along with the Error tag in the body response which suppose to be status:ERROR .
Successful body Response:
{
"body":{
"endTime":"2023-07-26T12:15:55.629Z",
"importContext":{
"csvImportOptions":{
"columns":[strings
],
"table":"table_name"
},
"database":"postgres",
"fileType":"CSV",
"kind":"sql#importContext",
"uri":"gs://workflow/2023-07-26000000000000.csv"
},
"insertTime":"2023-07-26T12:15:44.791Z",
"kind":"sql#operation",
"name":"af439df3-21ea-45b2-a92c-a4de00000024",
"operationType":"IMPORT",
"selfLink":"https://sqladmin.googleapis.com/v1/projects/",
"startTime":"2023-07-26T12:15:45.027Z",
"status":"DONE",
"targetId":"pricing-dev-master",
"targetLink":"https://sqladmin.googleapis.com/v1/projects/",
"targetProject":"project-dev",
"user":"[email protected]"
},
"code":200,
"headers":{
"Alt-Svc":"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
"Cache-Control":"private",
"Content-Length":"1179",
"Content-Type":"application/json; charset=UTF-8",
"Date":"Wed, 26 Jul 2023 12:16:00 GMT",
"Server":"ESF",
"Vary":"Origin, X-Origin, Referer",
"X-Content-Type-Options":"nosniff",
"X-Frame-Options":"SAMEORIGIN",
"X-Xss-Protection":"0"
}
}
Error response Body :
{
"body":{
"endTime":"2023-07-26T10:19:49.527Z",
"error":{
"errors":[
{
"code":"ERROR_RDBMS",
"kind":"sql#operationError",
"message":"generic::failed_precondition: ERROR: invalid input syntax for type integer: \"2023-07-11\"\nCONTEXT: COPY table_name, line 1, column id: \"2023-07-11\"\n"
}
],
"kind":"sql#operationErrors"
},
"importContext":{
"csvImportOptions":{
"table":"table_name"
},
"database":"postgres",
"fileType":"CSV",
"kind":"sql#importContext",
},
"uri":"gs://workflow/2023-07-26000000000000.csv"
"insertTime":"2023-07-26T10:19:38.945Z",
"kind":"sql#operation",
"name":"375f1c18-c2ac-4b1e-800c-650f00000024",
"operationType":"IMPORT",
"selfLink":"https://sqladmin.googleapis.com/v1/projects/",
"startTime":"2023-07-26T10:19:39.118Z",
**"status":"DONE",**
"targetId":"pricing-dev-master",
"targetLink":"https://sqladmin.googleapis.com/v1/projects/",
"targetProject":"project-dev",
"user":"[email protected]"
},
**"code":200**,
"headers":{
"Alt-Svc":"h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000",
"Cache-Control":"private",
"Content-Length":"1290",
"Content-Type":"application/json; charset=UTF-8",
"Date":"Wed, 26 Jul 2023 10:19:49 GMT",
"Server":"ESF",
"Vary":"Origin, X-Origin, Referer",
"X-Content-Type-Options":"nosniff",
"X-Frame-Options":"SAMEORIGIN",
"X-Xss-Protection":"0"
}
}
To handle this situtation i wanted to write a logic to make the workflow Fail in case of ERROR in import process. I tried to capture the body response in "maps" but then its not working. Can you please suggest is this the right way to append a map on every HTTP Get calls?
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
next: wait
- condition: ${"sql#operationError" in progress_Map}
raise: "Error in the load"
next : completed
- completed:
return: "done"
- wait:
call: sys.sleep
args:
seconds: 5
next: getoperation
- getoperation:
call: http.get
args:
url: ${operation.body.selfLink}
auth:
type: OAuth2
result: operation
next: trackprogress
# Creating Maps to add the response body into progress_Map
- trackprogress:
assign:
- progress_Map[file]: ${operation.body}
raise : $(progress_Map)
- returnoutput:
return: ${progress_Map}
next: chekoperation
I have the below code to handle '''switch''' to control the exeution but somehow its not working.
- chekoperation:
switch:
- condition: ${operation.body.status != "DONE"}
# Tried this as well
#- condition: ${not("DONE" in progress_Map)}
next: wait
- condition: ${"sql#operationError" in progress_Map}
raise: "Error in the load"
next : completed
If there is no "status" key in the body in case of error, this condition will fail:
To prevent that you can have another condition (or replace it) at the top to check if that node is there like this:
Or was I getting something wrong?