How to pull data from a paginated JSON API using kedro (APIDataSet)?

238 Views Asked by At

The problem: I would like to retrieve data from a paginated API that sends JSON responses. Using kedro.extras.datasets.api.APIDataSet I can query the API and retrieve the initial response. However if there are more results than the size limit per API request, I need to traverse the pagination links in the JSON responses. Has anybody successfully done this already?

Should I subclass APIDataSet for this and put the link traversal logic in the _execute_request() method? The provided APIDataSet returns requests.Response objects. Should a subclassed APIDataSet return (or yield) the results directly?

I tried this approach and it works to retrieve the data. But I am unsure if this is the "kedro way" to do it. Should the traversal logic be done in a node instead?

import copy
from typing import Any, Dict, Iterable, List, Union

import dpath.util
import requests
from kedro.extras.datasets.api import APIDataSet
from kedro.io.core import DataSetError
from requests.auth import AuthBase

class PaginatedJSONAPIDataSet(APIDataSet):
    def __init__(
        self,
        url: str,
        method: str = "GET",
        data: Any = None,
        params: Dict[str, Any] = None,
        headers: Dict[str, Any] = None,
        auth: Union[Iterable[str], AuthBase] = None,
        json: Union[List, Dict[str, Any]] = None,
        timeout: int = 60,
        credentials: Union[Iterable[str], AuthBase] = None,
        items_path: str = None,
        next_link_path: str = None,  # multiple keys possible to access next link in nested json, separate with "/", like "key1/key2"
    ):
        super().__init__(
            url, method, data, params, headers, auth, json, timeout, credentials
        )
        self.items_path = items_path
        self.next_link_path = next_link_path

    def _execute_request(self) -> List[Dict[str, Any]]:
        # initial request
        try:
            response = requests.request(**self._request_args)
            response.raise_for_status()
        except requests.exceptions.HTTPError as exc:
            raise DataSetError("Failed to fetch data", exc) from exc
        except OSError as exc:
            raise DataSetError("Failed to connect to the remote server") from exc

        request_args = copy.deepcopy(self._request_args)
        request_args.pop("params")
        hits = []
        # pagination traversal
        while True:
            hits.extend(dpath.util.get(response.json(), self.items_path))
            try:
                next_link = dpath.util.get(response.json(), self.next_link_path)
            # next link key is not present in json response
            except KeyError:
                break
            # next link key is present, but value is null / None
            if next_link is None:
                break
            request_args["url"] = next_link
            response = requests.request(**request_args)
        return hits
# toy example with a paginated API, to demonstrate pagination traversal
data_set = PaginatedJSONAPIDataSet(
    url="https://pokeapi.co/api/v2/pokemon",
    items_path="results",
    next_link_path="next",
    params={
        "limit": 500
    }
)
data = data_set.load()
print(type(data)) # <class 'list'>
print(len(data)) # 1126
print(data[0]) # {'name': 'bulbasaur', 'url': 'https://pokeapi.co/api/v2/pokemon/1/'}

Can someone give me a tip if they have done something similar or refer me to a best practice example (I could not find one)?

1

There are 1 best solutions below

1
On

You would have to define a custom dataset, it should be easy to take the existing implementant and extend / override to handle the pagination part.

We'd love a PR back into the main project as I think this would be useful for other users, amazingly (to my knowledge) it's not come up before.