Gcp pub/sub consume messages into websockets route

525 Views Asked by At

In my view after I receive post with data I sent it to gcp pub/sub topic. I've got websockets where I'd like to read that data send earlier:

@sockets.route('/ws/<string:_id>/some_url/')
def some_function(ws, _id):
    if mine_id not in config.MINES:
        ws.close()
    while not ws.closed:
        ws.send(json.dumps({
            'type': 'some type',
            'data': get_data(_id, 'some type')
        }))
        time.sleep(100)

it gets data from generics.ListAPIView

def get_data(_id, data_type):
    if data_type in config.URLS:
        url = data_type
    else:
        return {'error': 'URL is wrong'}
    try:
        response = requests.get(
            '/'.join([config.API_URL, f"{_id}/" + url + "/"]),
            headers={'Authorization': ' '.join(['Token', config.TOKEN])}
        )
        return response.json()

    except HTTPError:
        return {'error': 'HTTPError'}
    except RequestException:
        return {'error': 'RequestException'}

Here's consumers code:

def consume():
    try:
        project_id = 'some project'
        subscription_name = 'some subscription'

        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(
            project_id, subscription_name)

        def callback(message):
            print('Received message: {}'.format(message.data))
            if message.attributes:
                print('Attributes:')
                for key in message.attributes:
                    value = message.attributes.get(key)
                    print('{}: {}'.format(key, value))
            message.ack()

        future = subscriber.subscribe(
            subscription_path, callback=callback)
        try:
            data = future.result()
        except Exception as e:
            logger.exception(
                'Listening for some type messages on {} threw an Exception: {}.'.format(
                    subscription_name, e))

    except ConnectionError:
        return {'error': 'ConnectionError'}

    return data

I've tried websockets and producer\consumer and they both work on their own. How can I make them befriend each otherand use consume() or data returned from it in the ws.send call ?

0

There are 0 best solutions below