How to created an unbounded input for Beam in Go?

93 Views Asked by At

I'm trying to use the Go Beam Sdk to create a pipeline processing pubsub messages.

github.com/apache/beam/sdks/v2/go/pkg/beam

I understand that the pubsubio connector is doing external calls working only on dataflow runner.

What if I want to test my pipeline locally ? How would you do that ?

I need to understand what is preventing me to write my own pubsub unbounded source ? (I may not understand how Beam works under the hood, like how does it serialize user defined code to send it to the runner ?)

Tried to do something like that:

package pubsubio

import (
    "context"
    "fmt"

    cloud_pubsub "cloud.google.com/go/pubsub"
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)

func init() {
    register.DoFn3x1[context.Context, string, func(*cloud_pubsub.Message), error](&readFn{})
    register.Emitter1[*cloud_pubsub.Message]()
}

type ReadConfig struct {
    ProjectID        string
    TopicName        string
    SubscriptionName string
}

func Read(
    scope beam.Scope,
    cfg ReadConfig,
) beam.PCollection {
    scope = scope.Scope("pubsubio.Read")
    col := beam.Create(scope, cfg.SubscriptionName)
    return beam.ParDo(scope, newReadFn(cfg.ProjectID, cfg.TopicName), col)
}

type readFn struct {
    pubsubFn
    TopicName string
}

func newReadFn(projectID, topicName string) *readFn {
    return &readFn{
        pubsubFn: pubsubFn{
            ProjectID: projectID,
        },
        TopicName: topicName,
    }
}

func (fn *readFn) ProcessElement(
    ctx context.Context,
    subscriptionName string,
    emit func(message *cloud_pubsub.Message),
) error {
    log.Info(ctx, "[pubsubio.ProcessElement] Reading from pubsub")

    _, err := pubsubx.EnsureTopic(ctx, fn.client, fn.TopicName)
    if err != nil {
        return fmt.Errorf("cannot get topic: %w", err)
    }

    sub, err := pubsubx.EnsureSubscription(ctx, fn.client, fn.TopicName, subscriptionName)
    if err != nil {
        return fmt.Errorf("cannot get subscription: %w", err)
    }

    return sub.Receive(ctx, func(ctx context.Context, message *cloud_pubsub.Message) {
        emit(message)
        log.Debugf(ctx, "[pubsubio.ProcessElement] Emit msg: %s", message.ID)
        message.Ack()
    })
}

So basically I created a Read fn that never return, but the rest of my pipeline is never triggered (I must miss something)

0

There are 0 best solutions below