Pubsublite message acknowledgement not working

721 Views Asked by At

I'm using Google pubsublite. Small dummy topic with single partition and a few messages. Python client lib. Doing the standard SubscriberCluent.subscribe with callback. The callback places message in a queue. When the msg is taken out of the queue for consumption, its ack is called. When I want to stop, I call subscribe_future.cancel(); subscriber_future.result() and discard unconsumed messages in the queue.

Say I know the topic has 30 messages. I consume 10 of them before stopping. Then I restart a new SubscriberClient in the same subscription and receive messages. I expect to get starting with the 11th message, but I got starting with the first. So the precious subscriber has ack'd the first 10, but it's as if server did not receive the acknowledgement.

I thought maybe the ack needs some time to reach the server. So I waited 2 minutes before starting the second subscribe. Didn't help.

Then u thought maybe the subscriber object manages the ack calls, and I need to "flush" them before cancelling, but I found another about that.

What am I missing?

Here's the code. If you have pubsublite account, the code is executable after you fill in credentials. The code shows two issues, one is the subject of this question; the other is asked at here

# Using python 3.8

from __future__ import annotations

import logging
import pickle
import queue
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Union, Optional

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite import AdminClient, PubSubMessage
from google.cloud.pubsublite import Reservation as GCPReservation
from google.cloud.pubsublite import Subscription as GCPSubscription
from google.cloud.pubsublite import Topic as GCPTopic
from google.cloud.pubsublite.cloudpubsub import (PublisherClient,
                                                 SubscriberClient)
from google.cloud.pubsublite.types import (BacklogLocation, CloudZone,
                                           LocationPath,
                                           ReservationPath, SubscriptionPath,
                                           TopicPath,
                                           )
from google.cloud.pubsublite.types import FlowControlSettings
from google.oauth2.service_account import Credentials


logging.getLogger('google.cloud').setLevel(logging.WARNING)

logger = logging.getLogger(__name__)

FORMAT = '[%(asctime)s.%(msecs)03d %(name)s]  %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')


class Account:
    def __init__(self,
                 project_id: str,
                 region: str,
                 zone: str,
                 credentials: Credentials,
                 ):
        self.project_id = project_id
        self.region = region
        self.zone = CloudZone.parse(zone)
        self.credentials = credentials
        self.client = AdminClient(region=region, credentials=credentials)

    def location_path(self) -> LocationPath:
        return LocationPath(self.project_id, self.zone)

    def reservation_path(self, name: str) -> ReservationPath:
        return ReservationPath(self.project_id, self.region, name)

    def topic_path(self, name: str) -> TopicPath:
        return TopicPath(self.project_id, self.zone, name)

    def subscription_path(self, name: str) -> SubscriptionPath:
        return SubscriptionPath(self.project_id, self.zone, name)

    def create_reservation(self, name: str, *, capacity: int = 32) -> None:
        path = self.reservation_path(name)
        reservation = GCPReservation(name=str(path),
                                     throughput_capacity=capacity)
        self.client.create_reservation(reservation)
        # logger.info('reservation %s created', name)

    def create_topic(self,
                     name: str,
                     *,
                     partition_count: int = 1,
                     partition_size_gib: int = 30,
                     reservation_name: str = 'default') -> Topic:
        # A topic name can not be reused within one hour of deletion.
        top_path = self.topic_path(name)
        res_path = self.reservation_path(reservation_name)

        topic = GCPTopic(
            name=str(top_path),
            partition_config=GCPTopic.PartitionConfig(count=partition_count),
            retention_config=GCPTopic.RetentionConfig(
                per_partition_bytes=partition_size_gib * 1024 * 1024 * 1024),
            reservation_config=GCPTopic.ReservationConfig(
                throughput_reservation=str(res_path)))

        self.client.create_topic(topic)
        # logger.info('topic %s created', name)

        return Topic(name, self)

    def delete_topic(self, name: str) -> None:
        path = self.topic_path(name)
        self.client.delete_topic(path)
        # logger.info('topic %s deleted', name)

    def get_topic(self, name: str) -> Topic:
        return Topic(name, self)


class Topic:

    def __init__(self, name: str, account: Account):
        self.account = account
        self.name = name
        self._path = self.account.topic_path(name)

    def create_subscription(self,
                            name: str,
                            *,
                            pos: str = None) -> Subscription:
        path = self.account.subscription_path(name)

        if pos is None or pos == 'beginning':
            starting_offset = BacklogLocation.BEGINNING
        elif pos == 'end':
            starting_offset = BacklogLocation.END
        else:
            raise ValueError(
                'Argument start only accepts one of two values - "beginning" or "end"'
            )

        Conf = GCPSubscription.DeliveryConfig
        subscription = GCPSubscription(
            name=str(path),
            topic=str(self._path),
            delivery_config=Conf(delivery_requirement=Conf.DeliveryRequirement.DELIVER_IMMEDIATELY))

        self.account.client.create_subscription(subscription, starting_offset)
        # logger.info('subscription %s created for topic %s', name, self.name)

        return Subscription(name, self)

    def delete_subscription(self, name: str) -> None:
        path = self.account.subscription_path(name)
        self.account.client.delete_subscription(path)
        # logger.info('subscription %s deleted from topic %s', name, self.name)

    def get_subscription(self, name: str):
        return Subscription(name, self)

    @contextmanager
    def get_publisher(self, **kwargs):
        with Publisher(self, **kwargs) as pub:
            yield pub


class Publisher:
    def __init__(self, topic: Topic, *, batch_size: int = 100):
        self.topic = topic
        self._batch_config = {
                'max_bytes': 3 * 1024 * 1024,  # 3 Mb; must be < 4 max_bytes
                'max_latency': 0.05,  # 50 ms
                'max_messages': batch_size,  # default is 1000
                }
        self._messages = queue.Queue(1000)
        self._nomore = object()
        self._pool = None
        self._worker = None

    def __enter__(self):
        def _publish():
            path = self.topic._path
            dumps = pickle.dumps
            nomore = self._nomore
            q = self._messages
            with PublisherClient(
                    credentials=self.topic.account.credentials,
                    per_partition_batching_settings=BatchSettings(**self._batch_config),
                    ) as publisher:
                while True:
                    x = q.get()
                    if x is nomore:
                        break
                    future = publisher.publish(path, dumps(x))
                    future.result()

        self._pool = ThreadPoolExecutor(1)
        self._worker = self._pool.submit(_publish)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._messages.put(self._nomore)
        self._worker.result()
        self._pool.shutdown()

    def put(self, data) -> None:
        self._messages.put(data)


class Subscription:

    def __init__(self, name: str, topic: Topic):
        self.topic = topic
        self.name = name
        self._path = topic.account.subscription_path(name)

    @contextmanager
    def get_subscriber(self, *, backlog=None):
        with Subscriber(self, backlog=backlog) as sub:
            yield sub


class Subscriber:
    def __init__(self, subscription: Subscription, backlog: int = None):
        self.subscription = subscription
        self._backlog = backlog or 100
        self._cancel_requested: bool = None
        self._messages: queue.Queue = None
        self._pool: ThreadPoolExecutor = None
        self._NOMORE = object()
        self._subscribe_task = None

    def __enter__(self):
        self._pool = ThreadPoolExecutor(1).__enter__()
        self._messages = queue.Queue(self._backlog)
        messages = self._messages

        def callback(msg: PubSubMessage):
            logger.info('got %s', pickle.loads(msg.data))
            messages.put(msg)

        def _subscribe():
            flowcontrol = FlowControlSettings(
                    messages_outstanding=self._backlog,
                    bytes_outstanding=1024 * 1024 * 10)

            subscriber = SubscriberClient(credentials=self.subscription.topic.account.credentials)
            with subscriber:
                fut = subscriber.subscribe(self.subscription._path, callback, flowcontrol)
                logger.info('subscribe sent to gcp')

                while True:
                    if self._cancel_requested:
                        fut.cancel()
                        fut.result()
                        while True:
                            while not messages.empty():
                                try:
                                    _ = messages.get_nowait()
                                except queue.Empty:
                                    break
                            try:
                                messages.put_nowait(self._NOMORE)
                                break
                            except queue.Full:
                                continue
                        break
                    time.sleep(0.003)

        self._subscribe_task = self._pool.submit(_subscribe)
        return self

    def __exit__(self, *args, **kwargs):
        if self._pool is not None:
            if self._subscribe_task is not None:
                self._cancel_requested = True
                while True:
                    z = self._messages.get()
                    if z is self._NOMORE:
                        break
                self._subscribe_task.result()
                self._subscribe_task = None
                self._messages = None
            self._pool.__exit__(*args, **kwargs)
            self._pool = None

    def get(self, timeout=None):
        if timeout is not None and timeout == 0:
            msg = self._messages.get_nowait()
        else:
            msg = self._messages.get(block=True, timeout=timeout)
        data = pickle.loads(msg.data)
        msg.ack()
        return data


def get_account() -> Account:
    return Account(project_id='--fill-in-proj-id--',
                   region='us-central1',
                   zone='us-central1-a',
                   credentials='--fill-in-creds--')


# This test shows that it takes extremely long to get the first messsage
# in `subscribe`.
def test1(account):
    name = 'test-' + str(uuid.uuid4())
    topic = account.create_topic(name)
    try:
        with topic.get_publisher() as p:
            p.put(1)
            p.put(2)
            p.put(3)

        sub = topic.create_subscription(name)
        try:
            with sub.get_subscriber() as s:
                t0 = time.time()
                logger.info('getting the first message')
                z = s.get()
                t1 = time.time()
                logger.info('  got the first message')
                print(z)
            print('getting the first msg took', t1 - t0, 'seconds')
        finally:
            topic.delete_subscription(name)
    finally:
        account.delete_topic(name)


def test2(account):
    name = 'test-' + str(uuid.uuid4())
    topic = account.create_topic(name)
    N = 30
    try:
        with topic.get_publisher(batch_size=1) as p:
            for i in range(N):
                p.put(i)

        sub = topic.create_subscription(name)
        try:
            with sub.get_subscriber() as s:
                for i in range(10):
                    z = s.get()
                    assert z == i

            # The following block shows that the subscriber
            # resets to the first message, not as expected
            # that it picks up where the last block left.

            with sub.get_subscriber() as s:
                for i in range(10, 20):
                    z = s.get()
                    try:
                        assert z == i
                    except AssertionError as e:
                        print(z, '!=', i)
                        return
        finally:
            topic.delete_subscription(name)
    finally:
        account.delete_topic(name)


if __name__ == '__main__':
    a = get_account()
    try:
        a.create_reservation('default')
    except AlreadyExists:
        pass

    test1(a)
    print('')
    test2(a)
2

There are 2 best solutions below

0
On BEST ANSWER

I found a solution. Before cancelling the "subscribe" future, I need to sleep a little bit to allow acknowledgements to be flushed (i.e. sent out). In particular, google.cloud.pubsublite.cloudpubsub.internal.make_subscriber._DEFAULT_FLUSH_SECONDS (value 0.1) appears to be the time to watch. Need to sleep a little longer than this to be sure.

This is a bug in the google package. "Cancelling" the future means abandon unprocessed messages, whereas submitted acknowledgements should be sent out. This bug may have gone unnoticed because duplicate message delivery is not an error.

9
On

I was not able to recreate your issue but I think you should check the way its being handled on the official documentation about using cloud pubsublite.

This is the code I extract and update from Receiving messages sample and It works as intended, it will get the message from the lite-topic and acknowledge to avoid getting it again. if rerun, I will only get the data if there is data to pull. I added the code so you can check if something may differ from your code.

consumer.py

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
    MessageMetadata, 
)
from google.cloud.pubsub_v1.types import PubsubMessage

# TODO(developer):
project_number = project-number
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "sub-id"
timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)

per_partition_flow_control_settings = FlowControlSettings(
    messages_outstanding=1000,
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

The only way I hit your scenario is when I use different subscriptions. But on that regard, when different subscriptions get message from the topic each one will receive the same stored messages as explained on Receiving messages from Lite subscriptions.

Consider this: