Invoke Api Gateway Websockets API through HTTP

241 Views Asked by At

AWS provides us an SDK way to interact with WebSocket APIs, post_to_connection, delete_connection, get_connection

But how can we invoke the APIs from HTTP, which means a kind of python requests thing?

What I tried?

Here I'm struggling to find an example for how to sign a request for an API Gateway service.

1

There are 1 best solutions below

0
On BEST ANSWER

Here is the code to invoke web socket APIs using python-requests:

In the below code, replace aws_region, and api_id with your data.

You can use get_connection, post_connection, delete_connection to get connection details, post msg to connection, and delete connection.

#!/usr/bin/python
import os
from time import sleep

import requests
import json
from urllib.parse import quote, urlparse
import hmac
import hashlib
from datetime import datetime


def get_canonical_path(url):
    """
    Create canonical URI--the part of the URI from domain to query
    string (use '/' if no path)
    """
    parsed_url = urlparse(url)

    # safe chars adapted from boto's use of urllib.parse.quote
    # https://github.com/boto/boto/blob/d9e5cfe900e1a58717e393c76a6e3580305f217a/boto/auth.py#L393
    return quote(parsed_url.path if parsed_url.path else '/', safe='/-_.~')


def get_canonical_querystring(url):
    """
    Create the canonical query string. According to AWS, by the
    end of this function our query string values must
    be URL-encoded (space=%20) and the parameters must be sorted
    by name.
    This method assumes that the query params in `r` are *already*
    url encoded.  If they are not url encoded by the time they make
    it to this function, AWS may complain that the signature for your
    request is incorrect.
    It appears elasticsearc-py url encodes query paramaters on its own:
        https://github.com/elastic/elasticsearch-py/blob/5dfd6985e5d32ea353d2b37d01c2521b2089ac2b/elasticsearch/connection/http_requests.py#L64
    If you are using a different client than elasticsearch-py, it
    will be your responsibility to urleconde your query params before
    this method is called.
    """
    canonical_querystring = ''

    parsedurl = urlparse(url)
    querystring_sorted = '&'.join(sorted(parsedurl.query.split('&')))

    for query_param in querystring_sorted.split('&'):
        key_val_split = query_param.split('=', 1)

        key = key_val_split[0]
        if len(key_val_split) > 1:
            val = key_val_split[1]
        else:
            val = ''

        if key:
            if canonical_querystring:
                canonical_querystring += "&"
            canonical_querystring += u'='.join([key, val])

    return canonical_querystring


def sign(key, msg):
    """
    Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
    """
    return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def get_signature_key(secret_key, date_stamp, region_name, service_name):
    """
    Copied from https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
    """
    kDate = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
    kRegion = sign(kDate, region_name)
    kService = sign(kRegion, service_name)
    kSigning = sign(kService, 'aws4_request')
    return kSigning


def get_headers(aws_host: str, url: str, request_body: str, aws_region: str, service: str, aws_access_key: str,
                aws_secret_access_key: str, method: str):
    method = method.upper()
    # Create a date for headers and the credential string
    t = datetime.utcnow()
    amzdate = t.strftime('%Y%m%dT%H%M%SZ')

    date_stamp = t.strftime('%Y%m%d')  # Date w/o time for credential_scope

    canonical_uri = get_canonical_path(url)

    canonical_querystring = get_canonical_querystring(url)

    # Create the canonical headers and signed headers. Header names
    # and value must be trimmed and lowercase, and sorted in ASCII order.
    # Note that there is a trailing \n.
    canonical_headers = ('host:' + aws_host + '\n' +
                         'x-amz-date:' + amzdate + '\n')

    # Create the list of signed headers. This lists the headers
    # in the canonical_headers list, delimited with ";" and in alpha order.
    # Note: The request can include any headers; canonical_headers and
    # signed_headers lists those that you want to be included in the
    # hash of the request. "Host" and "x-amz-date" are always required.
    signed_headers = 'host;x-amz-date'

    payload_hash = hashlib.sha256(request_body).hexdigest()

    # Combine elements to create create canonical request
    canonical_request = (method + '\n' + canonical_uri + '\n' +
                         canonical_querystring + '\n' + canonical_headers +
                         '\n' + signed_headers + '\n' + payload_hash)

    # Match the algorithm to the hashing algorithm you use, either SHA-1 or
    # SHA-256 (recommended)
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = (date_stamp + '/' + aws_region + '/' +
                        service + '/' + 'aws4_request')
    string_to_sign = (algorithm + '\n' + amzdate + '\n' + credential_scope +
                      '\n' + hashlib.sha256(canonical_request.encode('utf-8')).hexdigest())

    # Create the signing key using the function defined above.
    signing_key = get_signature_key(secret_key=aws_secret_access_key, date_stamp=date_stamp, region_name=aws_region,
                                    service_name=service)

    # Sign the string_to_sign using the signing_key
    string_to_sign_utf8 = string_to_sign.encode('utf-8')
    signature = hmac.new(signing_key,
                         string_to_sign_utf8,
                         hashlib.sha256).hexdigest()

    # The signing information can be either in a query string value or in
    # a header named Authorization. This code shows how to use a header.
    # Create authorization header and add to request headers
    authorization_header = (algorithm + ' ' + 'Credential=' + aws_access_key +
                            '/' + credential_scope + ', ' + 'SignedHeaders=' +
                            signed_headers + ', ' + 'Signature=' + signature)

    headers = {
        'Authorization': authorization_header,
        'x-amz-date': amzdate,
        'x-amz-content-sha256': payload_hash,
        'Content-Type': 'application/json'
    }

    return headers


aws_host = '<api_id>.execute-api.<aws_region>.amazonaws.com'

connections_base_url = 'https://<api_id>.execute-api.<aws_region>.amazonaws.com/<api_stage>/@connections'

body = "Hey everyone From HTTP"

body = body.encode('utf-8')

aws_region = '<aws_region>'

service = 'execute-api'

aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")

aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY_ID")

def delete_connection(connection_id):
    url = f'{connections_base_url}/{connection_id}'

    headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                          aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                          method="delete")

    # print(headers)

    response = requests.delete(url=url, headers=headers, data=body)

    print(f"Deleted {connection_id} connection: {response.text}", response.status_code)


def post_message(connection_id):
    url = f'{connections_base_url}/{connection_id}'

    headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                          aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                          method="post")

    # print(headers)

    response = requests.post(url=url, headers=headers, data=body)

    print(f"Reponse for send msg to {connection_id}: {response.text}", response.status_code)


def get_connection(connection_id):
    url = f'{connections_base_url}/{connection_id}'

    headers = get_headers(aws_host=aws_host, url=url, request_body=body, service=service,
                          aws_region=aws_region, aws_access_key=aws_access_key, aws_secret_access_key=aws_secret_access_key,
                          method="get")

    # print(headers)

    response = requests.get(url=url, headers=headers, data=body)

    print(f"Status: {response.text}", response.status_code)


# Sample
connection_id = "connection_id"

get_connection(connection_id)

post_message(connection_id="connection_id")

delete_connection(connection_id="connection_id")