Read huge JSON line by line from Google Cloud Storage with Python

6.2k Views Asked by At

I know I should have a code but I have nothing useful yet.

There is ~300GB JSON file on my GCS gs://path/listings_all.json ultimately I'm trying to import it into BigQuery but it has some wrong data structure (I have sourced it by mongoexport from MongoDB)

invalid field name "$date". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 128 characters long

So, now my approach is to somehow read source file line by line from GCS process it and upload each processed line to BigQuery using python API.

Below simple reader I have put together to test with sample 100 lines from the original huge file:

import json
from pprint import pprint

with open('schema_in_10.json') as f:
    for line in f:
        j_content = json.loads(line)

        # print(j_content['id'], j_content['city'], j_content['country'], j_content['state'], j_content['country_code'], j_content['smart_location'], j_content['address'], j_content['market'], j_content['neighborhood'])
        # // geo { lat, lng}'])
        print('------')
        pprint(j_content['is_location_exact'])
        pprint(j_content['zipcode'])
        pprint(j_content['name'])

Can you please help me on how can I read or stream a huge JSON line by line from Google Cloud Storage with Python3?

4

There are 4 best solutions below

5
On BEST ANSWER

Reading it line by line and then trying to stream to BigQuery won't scale with 300GB on your local machine, and you'll struggle to get this working TBH.

There's a couple of scalable options:

  1. Write a Cloud Dataflow pipeline to read your file from GCS (it will scale for you and read in parallel), correct the field name, and then write to BigQuery. See here.
  2. Load it directly into BigQuery using CSV instead JSON as the format and using a delimiter that doesn't appear in your data. This will load each record into a single String column and then you can use BigQuery's JSON functions to extract what you need. See here.
0
On

Here is an example implementation of a solution in GCP Dataflow that corresponds to the first suggestion in the accepted answer. You'll need to implement the json correction in function json_processor. You can run this code in a Datalab notebook.

# Datalab might need an older version of pip
# !pip install pip==9.0.3

import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions

project_id = 'my-project'
bigquery_dataset_name = 'testdataset' # needs to exist 
table_name = 'testtable'
bucket_name = 'my-bucket'
json_file_gcs_path = 'gs://path/to/my/file.json'
schema = "name:STRING,zipcode:STRING"

def json_processor(row):
    import json
    d = json.loads(row)
    return {'name': d['name'], 'zipcode': d['zipcode']}

options = beam.options.pipeline_options.PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = "myjob"
google_cloud_options.staging_location = 'gs://{}/binaries'.format(bucket_name)
google_cloud_options.temp_location = 'gs://{}/temp'.format(bucket_name)
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options.region = "europe-west1"

p = beam.Pipeline(options=options)

(p | "read_from_gcs" >> beam.io.ReadFromText(json_file_gcs_path)
   | "json_processor" >> beam.Map(json_processor)
   | "write_to_bq" >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table=table_name, 
                                                       dataset=bigquery_dataset_name, 
                                                       project=project_id, 
                                                       schema=schema, 
                                                       create_disposition='CREATE_IF_NEEDED',
                                                       write_disposition='WRITE_EMPTY'))
)

p.run()
0
On

Parsing a json file line by line with the builtin json parser is not going to work (unless it's actually a "json lines" doc of course), so you want a streaming parser instead.

But while this will solve the memory use issue, it won't fix invalid json, so your best bet is to first fix the invalid json source as a pure text file, either in python or using sed or some similar tool, then use the incremental parser to parse your content.

def fixfile(sourcepath, destpath):
    with open(sourcepath) as source, open(destpath, "w") as dest:
        for line in source:
            # you may want to use a regexp if this simple solution
            # breaks something else
            line = line.replace("$date", "date")
            dest.write(line)
2
On

smart_open now has support for streaming GCS files.

from smart_open import open

# stream from GCS
with open('gs://my_bucket/my_file.txt') as fin:
    for line in fin:
        print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt', 'wb') as fout:
    fout.write(b'hello world')