How to sample a dlt source which takes data from API?

21 Views Asked by At

I want to test a dlt source with API, but I don’t want to exceed my API limitation. How can I set up a source, so it only produces a sample of the data?

I can go to the source insides and use counter, for example:

def get_page(endpoint, headers, params):
    res = requests.get(endpoint, headers, params=params).json()
    count_max = 10
    count = 0
    while res is not None:
        yield res["result"]
        count += 1
        if count > count_max:
            return
        has_more = res.get("paging", {}).get("next", None)
        if has_more:
            next_url = has_more["link"]
            res = requests.get(next_url, headers=headers).json()
        else:
            res = None

But I don't want to go inside the source definition each time I need to test something.

1

There are 1 best solutions below

0
On

If there is a clear pattern to the pagination of results, you can set a dlt.config for the run mode of your pipeline and generate random pages.

import random

page_nos = range(1, 100)


def build_link(page_no):
    return f"{endpoint}/{page_no}" # replace with observed pattern

def get_samples_of_page(num_samples):
    return [
        build_link(page_no) 
        for page_no in random.sample(page_nos, num_samples)
    ]

def get_page_test_mode(endpoint, headers, params):
    for page_url in get_samples_of_page(5):
        try:
            yield request.get(page_url, headers=headers).json()
        except: # some page sample links may return a 404 Not Found
            pass


def get_page_live_mode(endpoint, headers, params):
    # your original code



def get_page(endpoint, headers, params, run_mode):
   if run_mode == 'TEST':
       yield from get_page_test_mode(endpoint, headers, params)
   else:
       yield from get_page_live_mode(endpoint, headers, params)

If there is no clear pattern observed in how the page links are constructed, you can use the VCR-py module to record responses to requests the first time you run your tests and subsequent tests will be served from the saved responses.

import vcr

def get_page(endpoint, headers, params, run_mode):
    record_response = vcr.use_cassette(
        'fixtures/vcr_cassettes/results.yaml',
        record_mode='once' if run_mode == 'TEST' else 'all',
    ) 

    yield from record_response(get_page_live_mode(endpoint, headers, params))