aws appsync graphql api with python

2.3k Views Asked by At

I want to access AWS AppSync API using Python code and confused with requests library.

Auth mode is Cognito user pool. My questions are:

  1. How to get access tokens from Cognito user pool?

  2. How to make queries, mutations, and handle subscriptions?

I tried to do it with auth mode API key. but I am getting the following error.

import requests
import json

URL = "https://vtcarmq7zzeadnkwzcgfr24irm.appsync-api.us-east-1.amazonaws.com/graphql"

headers = {"x-api-key":"da2-bwuyzqchhfgyxemcmdinjegb7e"}

data = json.dumps({
    "query": '''
    
  listTodos(filter:{
    title:{
      contains:"g"
    }     
  }   )     {
    items{
      id title duedate     
    }   
  }
'''

} )

r = requests.request("POST", URL , data = data , headers = headers)

print(r.text)

{ "errors" : [ { "message" : "Unable to parse GraphQL query.", "errorType" : "MalformedHttpRequestException" } ] }

I have seen this video https://www.youtube.com/watch?v=2U4RsbFO4bA&t=1172s In this video, for authentication using cognito user pool, he says to make a call to cognito user pool and get the tokens and pass it to aws appsync in headers.

I am new to aws and python request module, trying to write python code for this video.

1

There are 1 best solutions below

1
On

graphql-python/gql supports AWS AppSync since version 3.0.0rc0.

It supports queries, mutation and even subscriptions on the realtime endpoint.

It supports IAM, api key and JWT authentication methods.

The documentation is available here

Here is an example of a mutation using the API Key authentication:

import asyncio
import os
import sys
from urllib.parse import urlparse

from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.appsync_auth import AppSyncApiKeyAuthentication

# Uncomment the following lines to enable debug output
# import logging
# logging.basicConfig(level=logging.DEBUG)


async def main():

    # Should look like:
    # https://XXXXXXXXXXXXXXXXXXXXXXXXXX.appsync-api.REGION.amazonaws.com/graphql
    url = os.environ.get("AWS_GRAPHQL_API_ENDPOINT")
    api_key = os.environ.get("AWS_GRAPHQL_API_KEY")

    if url is None or api_key is None:
        print("Missing environment variables")
        sys.exit()

    # Extract host from url
    host = str(urlparse(url).netloc)

    auth = AppSyncApiKeyAuthentication(host=host, api_key=api_key)

    transport = AIOHTTPTransport(url=url, auth=auth)

    async with Client(
        transport=transport, fetch_schema_from_transport=False,
    ) as session:

        query = gql(
            """
mutation createMessage($message: String!) {
  createMessage(input: {message: $message}) {
    id
    message
    createdAt
  }
}"""
        )

        variable_values = {"message": "Hello world!"}

        result = await session.execute(query, variable_values=variable_values)
        print(result)


asyncio.run(main())