For google cloud dataflow, is it possible to start another pipeline from a pipeline.

952 Views Asked by At

I am trying setup a google cloud dataflow pipeline (streaming mode) that read pubsub topic message, extract information (object name in google cloud storage) from published message, then start another pipeline (batch mode) to process the object stored in google cloud storage.

Is it possible to start another pipeline within a pipeline???

2

There are 2 best solutions below

0
On

There is no technical reason barring this. You would need to be sure to keep your Pipeline objects separate, have sufficient Compute Engine quota to launch all the jobs you need.

0
On

We got it to work. Doing this:

private static class ExecuteUpdateTaskFroNamespace extends DoFn<String, String> {
    @Override
    public void processElement(ProcessContext c) throws Exception {
        String namespace = c.element();
        LOG.info("Processing namespace: " + namespace);

        BasicOptions options = c.getPipelineOptions().cloneAs(BasicOptions.class);

        EntityOptions entityOptions = PipelineOptionsFactory.as(EntityOptions.class); // important to NOT use .create()
        entityOptions.setNamespace(namespace);
        entityOptions.setProject(options.getProject());
        entityOptions.setRunner(DataflowPipelineRunner.class);
        entityOptions.setStagingLocation(options.getStagingLocation());
        entityOptions.setKind("DocsAsset");
        try {
            Pipeline p = Pipeline.create(entityOptions);
            p.apply("Read from Datastore", BcDatastoreReadFactory.getEntitySource(entityOptions))
                    .apply("Find Old Site Entities", ParDo.of(new FindEntities()))
                    .apply("Transform Entities", ParDo.of(new TransformEntities()))
                    .apply("Save", DatastoreIO.v1().write().withProjectId(entityOptions.getProject()));
            p.run();

            LOG.info("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);
            c.output("Submitted UpdateAssetsSitesMimeType job for namespace: " + namespace);

        } catch (Exception e) {
            LOG.warn("Unable to create pipeline for namespace: " + namespace, e);
        }

    }
}

Issues: You can't spawn more then 25 at a time without hitting quota, to bypass this you can change setRunner(DataflowPipelineRunner.class) to setRunner(BlockingDataflowPipelineRunner.class). BUT BlockingDataflowPipelineRunner is removed in 2.0.0

EntityOptions and BasicOptions are extensions of PipelineOptions.