Load parquet to tigergraph from S3 using PyTigerGraph

225 Views Asked by At

I have this working GSQL script:

use graph test_graph
drop job test_job
drop data_source deep_disco_test

create data_source S3 deep_disco_test for graph test_graph

set my_source = "/home/ubuntu/s3.config"

CREATE LOADING JOB test_job for GRAPH test_graph {
       DEFINE FILENAME vertex1= "$deep_disco_test:{\"file.uris\":\"s3://signals-output/db_to_tg/identities/test_identities.parquet/\",\"file.reader.type\": \"parquet\",\"file.regexp\":\".parquet\"}";
       DEFINE FILENAME edge1=   "$deep_disco_test:{\"file.uris\":\"s3://signals-output/db_to_tg/relations/test_relations.parquet/\",\"file.reader.type\": \"parquet\",\"file.regexp\":\".parquet\"}";

       LOAD vertex1 to VERTEX identity VALUES($"dd_id",$"names",$"identity_type") USING JSON_FILE = "true";
       LOAD edge1 to EDGE relation VALUES($"src",$"dst",$"edge_type") USING JSON_FILE = "true";

}

run loading job test_job using EOF = "true"

Now i want to be able to run this from databricks using PyTigerGraph conn class and conn.uploadFile().

This are my steps:

  1. create string to create two loading jobs:
load_job = f'''

use graph {graph}
drop job {test_load_vertices}
drop job {test_load_edges}
drop data_source {data_source}
create data_source S3 {data_source} for graph {graph}

set {data_source} = "/home/ubuntu/s3.config"

CREATE LOADING JOB {test_load_vertices} FOR GRAPH {graph} {{
       DEFINE FILENAME MyDataSource;
       LOAD MyDataSource to VERTEX identity VALUES($"dd_id",$"names",$"identity_type") USING JSON_FILE = "true";}}
                                       
CREATE LOADING JOB {test_load_edges} FOR GRAPH {graph} {{
       DEFINE FILENAME MyDataSource;
       LOAD MyDataSource to EDGE relation VALUES($"src", $"dst", $"edge_type") USING JSON_FILE = "true";}}
'''
  1. load the jobs
conn.gsql(load_job)

Which outputs the following:

"Using graph 'dd_subgraph_test'
Successfully dropped jobs on the graph 'dd_subgraph_test': [test_load_vertices_job].
Successfully dropped jobs on the graph 'dd_subgraph_test': [test_load_edges_job].
Successfully dropped data sources: [deep_disco_test].
Successfully created data sources: [deep_disco_test].
Data source 'deep_disco_test' has been updated.
Successfully created loading jobs: [test_load_vertices_job].
Successfully created loading jobs: [test_load_edges_job]."
  1. Test run one of the loading jobs:
print(conn.uploadFile(
  filePath="s3://signals-output/db_to_tg/identities/test_identities.parquet", 
  fileTag='MyDataSource', 
  jobName="test_load_vertices_job"))

Which prints None.

Also, in graphstudio nothing happens.

What am i doing wrong?

1

There are 1 best solutions below

0
On

Well, you are using the wrong function. The purpose of the uploadFile()¹ is to literarily upload a file from your client's file system to the TigerGraph server's file system to be picked up by the loading job (client here is the machine running the Python code). My understanding is that a temporary file is created in the server's file system and the loading job is called like

RUN LOADING JOB MyLoadingJob USING MyDataSource = "/path/to/temporary.file"

Since you intend to load data from an S3 bucket, you do not need to upload anything to the database server's file system, as the server can directly pull data from the bucket. This will require reworking your loading jobs a bit. You will need to specify the bucket and the object in DEFINE FILENAME². E.g.:

CREATE LOADING JOB test_load_vertices_job FOR GRAPH dd_subgraph_test {
  DEFINE FILENAME MyDataSource = "$deep_disco_test:{'file.uris':'s3://signals-output/db_to_tg/identities/test_identities.parquet'}";
  LOAD MyDataSource to VERTEX …

See more on this in the documentation.

Right now we do not have proper Pythonista functionality to run loading jobs (without uploading file), but soon we will have (at least I have an prototype that works), but you can use the gsql() function to run a loading job like you would form the gsql command line tool:

conn.gsql('RUN LOADING JOB test_load_vertices_job')

You might want to use -noprint to reduce the output (i.e. to eliminate the multiple lines of progress bar that will be printed).

¹ uploadFile() has been renamed to runLoadingJobWithFile() which is a more fitting/appropriate name.

² You can specify the bucket/object configuration details in an external JSON file or directly as embedded JSON doc. The same applies to the data source defintion. Also, in the JSON you can use single comma ' instead of double quotes " to eliminate the escaping, making your code easier to parse by human eye.