How to deploy Google Cloud Dataflow with connection to PostgreSQL (beam-nuggets) from Google Cloud Functions

1.9k Views Asked by At

I'm trying to create ETL in GCP which will read part of data from PostgreSQL and put it in the suitable form to BigQuery. I was able to perform this task deploying Dataflow from my computer, but I failed to make it dynamic, so it will read last transferred record and transfer next 100. So I figured out, that I'll create Dataflows from Cloud Function. Everything was working OK, reading/writing to BigQuery works like a charm, but I'm stuck on PostgreSQL requited package: beam-nuggets.

In the function I'm creating pipe arguments:

pipe_arguments = [    
    '--project={0}'.format(PROJECT),
    '--staging_location=gs://xxx.appspot.com/staging/',
    '--temp_location=gs://xxx.appspot.com/temp/',
    '--runner=DataflowRunner',
    '--region=europe-west4',
    '--setup_file=./setup.py'
    ]

    pipeline_options = PipelineOptions(pipe_arguments)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

Then create pipeline:

 pipeline = beam.Pipeline(argv = pipe_arguments) 

and run it:

pipeline.run()

If I omit:

    '--setup_file=./setup.py'

everything is fine except Dataflow cannot use PostgeQSL as import:

from beam_nuggets.io import relational_db

fails.

When I add

    '--setup_file=./setup.py'

line, testing function from GCP Function web portal returns:

Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full trace: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']' returned non-zero exit status 1.
,          output of the failed child process b'running sdist\nrunning egg_info\ncreating example.egg-info\n'

running

python setup.py sdist --dist-dir ./tmp/

from local computer works OK.

setup.py is deployed along with function code (main.py) and requirements.txt to the Cloud Function.

Requirements.txt is used during Function deploy and looks like this:

beam-nuggets==0.15.1
google-cloud-bigquery==1.17.1
apache-beam==2.19.0
google-cloud-dataflow==2.4.0
google-apitools==0.5.31

setup.py looks like this:

from setuptools import find_packages
from setuptools import setup

REQUIRED_PACKAGES = ['beam-nuggets>=0.15.1']

setup(
    name='example',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='example desc'
)

I'm stuck for couple days, tried different setup.py approaches, tried to use requirements.txt instead of setup.py - no luck.

log just says:

 {
 insertId: "000000-88232bc6-6122-4ec8-a4f3-90e9775e89f6"  
 
labels: {
  execution_id: "78ml14shfolv"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T12:08:35.898729649Z"  
 
resource: {
  
labels: {
   function_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "INFO"  
 textPayload: "Executing command: ['/env/bin/python3.7', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpxdvj0ulx']"  
 timestamp: "2020-07-13T12:08:31.639Z"  
 trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"  
}
 {
 insertId: "000000-3dfb239a-4067-4f9d-bd5f-bae5174e9dc7"  
 
labels: {
  execution_id: "78ml14shfolv"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T12:08:35.898729649Z"  
 
resource: {
  
labels: {
   function_name: "xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "DEBUG"  
 textPayload: "Function execution took 7798 ms, finished with status: 'crash'"  
 timestamp: "2020-07-13T12:08:35.663674738Z"  
 trace: "projects/xxx/traces/c9f1b1f68ed869f187e04ea672c487a4"  
}

Supplementary info:

if I'm using

'--requirements_file=./requirements.txt'

instead of

'--setup_file=./setup.py'

I'm getting:

Error: memory limit exceeded.

in GCP Functions web portal while running test function.

Afrer I increased memory to 2BG it says:

Error: function terminated. Recommended action: inspect logs for termination reason. Details:
Full traceback: Traceback (most recent call last):
  File "/env/local/lib/python3.7/site-packages/apache_beam/utils/processes.py", line 85, in check_output
    out = subprocess.check_output(*args, **kwargs)
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 411, in check_output
    **kwargs).stdout
  File "/opt/python3.7/lib/python3.7/subprocess.py", line 512, in run
    output=stdout, stderr=stderr)
subprocess.CalledProcessError: Command '['/env/bin/python3.7', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--exists-action', 'i', '--no-binary', ':all:']' returned non-zero exit status 1. 
 Pip install failed for package: -r         
 Output from execution of subprocess: b'Collecting beam-nuggets==0.15.1  
 Downloading beam-nuggets-0.15.1.tar.gz (17 kB)
  Saved /tmp/dataflow-requirements-cache/beam-nuggets-0.15.1.tar.gz
Collecting google-cloud-bigquery==1.17.1
  Downloading google-cloud-bigquery-1.17.1.tar.gz (228 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-bigquery-1.17.1.tar.gz
Collecting apache-beam==2.19.0
  Downloading apache-beam-2.19.0.zip (1.9 MB)
  Saved /tmp/dataflow-requirements-cache/apache-beam-2.19.0.zip
Collecting google-cloud-dataflow==2.4.0
  Downloading google-cloud-dataflow-2.4.0.tar.gz (5.8 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-dataflow-2.4.0.tar.gz
Collecting google-apitools==0.5.31
  Downloading google-apitools-0.5.31.tar.gz (173 kB)
  Saved /tmp/dataflow-requirements-cache/google-apitools-0.5.31.tar.gz
Collecting SQLAlchemy<2.0.0,>=1.2.14
  Downloading SQLAlchemy-1.3.18.tar.gz (6.0 MB)
  Saved /tmp/dataflow-requirements-cache/SQLAlchemy-1.3.18.tar.gz
Collecting sqlalchemy-utils<0.34,>=0.33.11
  Downloading SQLAlchemy-Utils-0.33.11.tar.gz (128 kB)
  Saved /tmp/dataflow-requirements-cache/SQLAlchemy-Utils-0.33.11.tar.gz
Collecting pg8000<2.0.0,>=1.12.4
  Downloading pg8000-1.16.0.tar.gz (75 kB)
  Saved /tmp/dataflow-requirements-cache/pg8000-1.16.0.tar.gz
Collecting PyMySQL<2.0.0,>=0.9.3
  Downloading PyMySQL-0.9.3.tar.gz (75 kB)
  Saved /tmp/dataflow-requirements-cache/PyMySQL-0.9.3.tar.gz
Collecting kafka>===1.3.5
  Downloading kafka-1.3.5.tar.gz (227 kB)
  Saved /tmp/dataflow-requirements-cache/kafka-1.3.5.tar.gz
Collecting google-cloud-core<2.0dev,>=1.0.0
 Downloading google-cloud-core-1.3.0.tar.gz (32 kB)
  Saved /tmp/dataflow-requirements-cache/google-cloud-core-1.3.0.tar.gz
Collecting google-resumable-media<0.5.0dev,>=0.3.1
  Downloading google-resumable-media-0.4.1.tar.gz (2.1 MB)
  Saved /tmp/dataflow-requirements-cache/google-resumable-media-0.4.1.tar.gz
Collecting protobuf>=3.6.0
  Downloading protobuf-3.12.2.tar.gz (265 kB)
  Saved /tmp/dataflow-requirements-cache/protobuf-3.12.2.tar.gz
Collecting crcmod<2.0,>=1.7
  Downloading crcmod-1.7.tar.gz (89 kB)
  Saved /tmp/dataflow-requirements-cache/crcmod-1.7.tar.gz
Collecting dill<0.3.2,>=0.3.1.1
  Downloading dill-0.3.1.1.tar.gz (151 kB)
  Saved /tmp/dataflow-requirements-cache/dill-0.3.1.1.tar.gz
Collecting fastavro<0.22,>=0.21.4
  Downloading fastavro-0.21.24.tar.gz (496 kB)
  Saved /tmp/dataflow-requirements-cache/fastavro-0.21.24.tar.gz
Collecting future<1.0.0,>=0.16.0
  Downloading future-0.18.2.tar.gz (829 kB)
  Saved /tmp/dataflow-requirements-cache/future-0.18.2.tar.gz
Collecting grpcio<2,>=1.12.1
  Downloading grpcio-1.30.0.tar.gz (19.7 MB)
    ERROR: Command errored out with exit status 1:
     command: /env/bin/python3.7 -c \'import sys, setuptools, tokenize; sys.argv[0] = \'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\'; __file__=\'"\'"\'/tmp/pip-download-yjpzrbur/grpcio/setup.py\'"\'"\';f=getattr(tokenize, \'"\'"\'open\'"\'"\', open)(__file__);code=f.read().replace(\'"\'"\'\\r\
\'"\'"\', \'"\'"\'\
\'"\'"\');f.close();exec(compile(code, __file__, \'"\'"\'exec\'"\'"\'))\' egg_info --egg-base /tmp/pip-download-yjpzrbur/grpcio/pip-egg-info
         cwd: /tmp/pip-download-yjpzrbur/grpcio/
    Complete output (11 lines):
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 196, in <module>
        if check_linker_need_libatomic():
      File "/tmp/pip-download-yjpzrbur/grpcio/setup.py", line 156, in check_linker_need_libatomic
        stderr=PIPE)
      File "/opt/python3.7/lib/python3.7/subprocess.py", line 800, in __init__
        restore_signals, start_new_session)
      File "/opt/python3.7/lib/python3.7/subprocess.py", line 1551, in _execute_child
        raise child_exception_type(errno_num, err_msg, err_filename)
    FileNotFoundError: [Errno 2] No such file or directory: \'cc\': \'cc\'
    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
WARNING: You are using pip version 20.0.2; however, version 20.1.1 is available.
You should consider upgrading via the \'/env/bin/python3.7 -m pip install --upgrade pip\' command.
'

Logs in this case:

 {
 insertId: "000000-5e4c10f4-d542-4631-8aaa-b9306d1390fd"  
 
labels: {
  execution_id: "15jww0sd8uyz"   
 }
 logName: "projects/xxx/logs/cloudfunctions.googleapis.com%2Fcloud-functions"  
 receiveTimestamp: "2020-07-13T14:01:33.505683371Z"  
 
resource: {
  
labels: {
   function_name: xxx"    
   project_id: "xxx"    
   region: "europe-west6"    
  }
  type: "cloud_function"   
 }
 severity: "DEBUG"  
 textPayload: "Function execution took 18984 ms, finished with status: 'crash'"  
 timestamp: "2020-07-13T14:01:32.953194652Z"  
 trace: "projects/xxx/traces/262224a3d230cd9a66b1eebba3d7c3e0"  
}

From local machine Dataflow deployment works OK.

Command from logs:

python -m pip download --dest ./tmp -r ./requirements.txt --exists-action i --no-binary :all:

also works OK although it seems like downloading half of the internet for couple of minutes, even if I reduce requirements.txt to beam-nuggets==0.15.1 only.

It stucks on

grpcio-1.30.0.tar.gz (19.7 MB)

exactly during setup from this package, function:

def check_linker_need_libatomic():
    """Test if linker on system needs libatomic."""
    code_test = (b'#include <atomic>\n' +
                 b'int main() { return std::atomic<int64_t>{}; }')
    cc_test = subprocess.Popen(['cc', '-x', 'c++', '-std=c++11', '-'],
                               stdin=PIPE,
                               stdout=PIPE,
                               stderr=PIPE)
    cc_test.communicate(input=code_test)
    return cc_test.returncode != 0
1

There are 1 best solutions below

1
On BEST ANSWER

I also tried GCP AppEngine instead of Cloud Functions, with the same result, however it directs me to the proper solution. Thanks to this and this I was able to create external package from beam-nuggets and include it using --extra_package instead of --setup_file or --setup_file.

The problem with grpcio compilation (forced by non configurable --no-binary', ':all:') remains. The problem with setup.py weird error also remains.

But deployment from Cloud Functions to Dataflow (with dependencies) is working, so problem closed for me.

Update:

Just after that I was hit with the problem:

in _import_module return __import__(import_name) ModuleNotFoundError: No module named 'main'

as I was not using any 'main' module it was hard to find, that I have to pack to the external package also every function defined in my main.py file (thus module name). So extra_package file contains all external dependencies and my own module in which my functions are stored.