The problem: I would like to retrieve data from a paginated API that sends JSON responses.
Using kedro.extras.datasets.api.APIDataSet
I can query the API and retrieve the initial response. However if there are more results than the size limit per API request, I need to traverse the pagination links in the JSON responses. Has anybody successfully done this already?
Should I subclass APIDataSet for this and put the link traversal logic in the _execute_request() method? The provided APIDataSet returns requests.Response objects. Should a subclassed APIDataSet return (or yield) the results directly?
I tried this approach and it works to retrieve the data. But I am unsure if this is the "kedro way" to do it. Should the traversal logic be done in a node instead?
import copy
from typing import Any, Dict, Iterable, List, Union
import dpath.util
import requests
from kedro.extras.datasets.api import APIDataSet
from kedro.io.core import DataSetError
from requests.auth import AuthBase
class PaginatedJSONAPIDataSet(APIDataSet):
def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
items_path: str = None,
next_link_path: str = None, # multiple keys possible to access next link in nested json, separate with "/", like "key1/key2"
):
super().__init__(
url, method, data, params, headers, auth, json, timeout, credentials
)
self.items_path = items_path
self.next_link_path = next_link_path
def _execute_request(self) -> List[Dict[str, Any]]:
# initial request
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
request_args = copy.deepcopy(self._request_args)
request_args.pop("params")
hits = []
# pagination traversal
while True:
hits.extend(dpath.util.get(response.json(), self.items_path))
try:
next_link = dpath.util.get(response.json(), self.next_link_path)
# next link key is not present in json response
except KeyError:
break
# next link key is present, but value is null / None
if next_link is None:
break
request_args["url"] = next_link
response = requests.request(**request_args)
return hits
# toy example with a paginated API, to demonstrate pagination traversal
data_set = PaginatedJSONAPIDataSet(
url="https://pokeapi.co/api/v2/pokemon",
items_path="results",
next_link_path="next",
params={
"limit": 500
}
)
data = data_set.load()
print(type(data)) # <class 'list'>
print(len(data)) # 1126
print(data[0]) # {'name': 'bulbasaur', 'url': 'https://pokeapi.co/api/v2/pokemon/1/'}
Can someone give me a tip if they have done something similar or refer me to a best practice example (I could not find one)?
You would have to define a custom dataset, it should be easy to take the existing implementant and extend / override to handle the pagination part.
We'd love a PR back into the main project as I think this would be useful for other users, amazingly (to my knowledge) it's not come up before.