Error when posting Payload data string to Hubspot using an AWS Lambda Python API call

402 Views Asked by At

I have recently uploaded contact records to HubSpot using Postman. Here is a raw JSON data example and POST method that I use to successfully upload a contact:

https://api.hubapi.com/crm/v3/objects/contacts?hapikey={{hapikey}}
{properties": {
    "smbapi": "yes",
    "email": "[email protected]",
    "business_name":"Forest City Grinding Inc",
    "srvc_address_1":"3544 Stenstrom Rd",
    "srvc_city_1":"",
    "srvc_state_1":"IL",
    "srvc_zip_1":"61109",
    "proposal_date":"2021-12-07",
    "proposal_start_date": "2022-01-01",
    "udc_code_1": "COMED",
    "eog":"electric",
    "fixedprice_1_gas_mcf": 6.63,
    "fixedprice_2_gas_mcf": 6.11,
    "fixedprice_3_gas_mcf": 5.9,
    "term_1": 12,
    "term_2": 24,
    "term_3": 36,
    "smb_bdm_name": "Timothy Chin",
    "smb_bdm_phone": "833-999-9999",
    "smb_bdm_email": "[email protected]"
  }
}

Next, I then created a python lambda function to automate this process because we want to ingest CSV files that may have many records to extract. So, I had constructed the dictionary to look the same as the string above which had worked great/fine with Postman. However, when I try and do a Post method API call to HubSpot, using my dictionary payload, I am getting this error:

Invalid input JSON : Cannot build ObjectSchemaEgg, Invalid input JSON on line 1, column 2: Cannot build ObjectSchemaEgg, some of required attributes are not set [name, labels]

Here is the processed dictionary string that my code constructed for the API call:

{'properties': '{"smbapi": "yes", "business_name": "Forest City Grinding Inc", "srvc_address_1": "4844 Stenstrom Rd", "srvc_state_1": "IL", "srvc_zip_1": "61109", "proposal_date": "2021-12-07", "proposal_start_date": "2022-01-01", "udc_code_1": "COMED", "fixedprice_1": "6.63", "fixedprice_2": "6.11", "fixedprice_3": "5.9", "term_2": "24", "term_3": "36", "smb_bdm_name": "Gary Wysong", "smb_bdm_phone": "833-389-0881", "smb_bdm_email": "[email protected]"}'}

Here is my Lambda code in full (give special attention to both the call to post_to_hubspot() and also the post_to_hubspot() function itself). The code that loads the dynamo table is working correctly.:

import boto3
import json
import decimal
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import re
import pandas as pd
import numpy as np
import os
import datetime
from os import urandom
import email
import base64
import requests
from datetime import datetime, timedelta, timezone
import mailparser
import calendar
global payload_data

landing_zone_bucket_name = str(os.environ['BUCKETNAME'])
s3 = boto3.resource('s3')
landing_zone_bucket = s3.Bucket(landing_zone_bucket_name )
s3r = boto3.client('s3')
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table(str(os.environ['DYNAMOTABLE']))
unprocessed_records_table = dynamodb.Table(str(os.environ['UNPROCESSEDTABLE']))
email_table = dynamodb.Table(str(os.environ['EMAILSTATUSTABLE']))
endpoint_url=os.environ['ENDPOINT_URL']
access_key = os.environ['ACCESSKEY']
now = datetime.now()
today_date = datetime.strftime(now,'%d')
today_month = datetime.strftime(now,'%m')
today_year = datetime.strftime(now,'%Y')
time_stamp = datetime.now().strftime('%Y%m%d%H%M%S')
payload_data = {}

#WRITE RECORDS TO DYNAMO
def dynamoPut(dObj,table_name=None):
    try:
        for each in list(dObj['Information']):
            if dObj['Information'][each]:
                dObj['Information'][each] = str(dObj['Information'][each])
            else:
                del dObj['Information'][each]
        dObj['Month'] =  today_month
        dObj['Year'] =  today_year
        dObj['Day'] = today_date
        for each in list(dObj):
            if dObj[each] != '':
                dObj[each] = dObj[each]
            else:
                del dObj[each]
        if table_name != None:
            response = unprocessed_records_table.put_item(Item = dObj)
        else:
            response = table.put_item(Item = dObj)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return True
        else:
            return False
    except Exception as e:
        print(e)
        return False

def dynamoPutFileName(filename,source_type):
    try:
        dObj = {}
        dObj['id'] =  urandom(20).hex()
        dObj['CreatedAt'] =  str(datetime.now())
        dObj['FileName'] = filename
        dObj['Type'] = source_type
        dObj['EmailSent'] = False
        response = email_table.put_item(Item = dObj)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return True
        else:
            return False
    except Exception as e:
        print(e)
        return False


def parse_csv_hubspot(event, obj):
    #parsing CSV file to write to dynamo
    try:
        def auto_truncate(val):
            return val[:255 ]
        print('<<  IN PARSE CSV HUBSPOT >>')
        print(event)
        csv = pd.read_csv(obj['Body'], encoding = "ISO-8859-1") 
        csv_nn = csv.replace(np.nan, 'null', regex=True)
        d = csv_nn.to_dict(orient='records')
        source_id = urandom(20).hex()
        file_name = event['file_path'].split('/')[-1] 
        print('<< FILE NAME >>', file_name)
        for each in d:
            try:
                dbObj = {}
                #PASSING THE EXTERNAL KEY
                UniqueKey = ''
                if 'smbapi' in each and each['smbapi'] != 'null':
                    dbObj['smbapi' ] = each['smbapi']
                    print('<< SMB API>>', dbObj['smbapi' ])

                if 'business_name' in each and each['business_name'] != 'null':
                    dbObj['business_name'] = each['business_name']
                    print('<< BUSINESS NAME >>', dbObj['business_name'])

                if 'srvc_address_1' in each and each['srvc_address_1'] != 'null':
                    dbObj['srvc_address_1'] = each['srvc_address_1']
                    print('<< ADDRESS 1 >>', dbObj['srvc_address_1'])

                if 'srvc_city_1' in each and each['srvc_city_1'] != 'null':
                    dbObj['srvc_city_1'] = each['srvc_city_1']

                if 'srvc_state_1' in each and each['srvc_state_1'] != 'null':
                    dbObj['srvc_state_1'] = each['srvc_state_1']

              
                if 'srvc_zip_1' in each and each['srvc_zip_1'] != 'null':
                    dbObj['srvc_zip_1']= str(each['srvc_zip_1']).zfill(5)
                
                if 'proposal_date' in each and each['proposal_date'] != 'null':
                    dbObj['proposal_date']= try_parsing_date(each['proposal_date']).date().isoformat()

                if 'proposal_start_date' in each and each['proposal_start_date'] != 'null':
                    dbObj['proposal_start_date']= try_parsing_date(each['proposal_start_date']).date().isoformat()   

                if 'udc_code_1' in each and each['udc_code_1'] != 'null':
                    dbObj['udc_code_1']= each['udc_code_1']

                if 'eog' in each and each['eog'] != 'null':
                    dbObj['eog']= each['eog']

                if 'fixedprice_1' in each and each['fixedprice_1'] != 'null':
                    dbObj['fixedprice_1']= each['fixedprice_1']

                if 'fixedprice_2' in each and each['fixedprice_2'] != 'null':
                    dbObj['fixedprice_2']= each['fixedprice_2']      
                
                if 'fixedprice_3' in each and each['fixedprice_3'] != 'null':
                    dbObj['fixedprice_3']= each['fixedprice_3']

                if 'fixedprice_1_gas_therm' in each and each['fixedprice_1_gas_therm'] != 'null':
                    dbObj['fixedprice_1_gas_therm']= each['fixedprice_1_gas_therm']

                if 'fixedprice_2_gas_therm' in each and each['fixedprice_2_gas_therm'] != 'null':
                    dbObj['fixedprice_2_gas_therm']= each['fixedprice_2_gas_therm']

                if 'fixedprice_3_gas_therm' in each and each['fixedprice_3_gas_therm'] != 'null':
                    dbObj['fixedprice_3_gas_therm']= each['fixedprice_3_gas_therm']

                if 'fixedprice_1_gas_ccf' in each and each['fixedprice_1_gas_ccf'] != 'null':
                    dbObj['fixedprice_1_gas_ccf']= each['fixedprice_1_gas_ccf']

                if 'fixedprice_2_gas_ccf' in each and each['fixedprice_2_gas_ccf'] != 'null':
                    dbObj['fixedprice_2_gas_ccf']= each['fixedprice_2_gas_ccf'] 

                if 'fixedprice_3_gas_ccf' in each and each['fixedprice_3_gas_ccf'] != 'null':
                    dbObj['fixedprice_3_gas_ccf']= each['fixedprice_3_gas_ccf']

                if 'fixedprice_1_gas_dth' in each and each['fixedprice_1_gas_dth'] != 'null':
                    dbObj['fixedprice_1_gas_dth']= each['fixedprice_1_gas_dth']

                if 'fixedprice_2_gas_dth' in each and each['fixedprice_2_gas_dth'] != 'null':
                    dbObj['fixedprice_2_gas_dth']= each['fixedprice_2_gas_dth']

                if 'fixedprice_3_gas_dth' in each and each['fixedprice_3_gas_dth'] != 'null':
                    dbObj['fixedprice_3_gas_dth']= each['fixedprice_3_gas_dth']

                if 'fixedprice_1_gas_mcf' in each and each['fixedprice_1_gas_mcf'] != 'null':
                    dbObj['fixedprice_1_gas_mcf']= each['fixedprice_1_gas_mcf']

                if 'fixedprice_2_gas_mcf' in each and each['fixedprice_2_gas_mcf'] != 'null':
                    dbObj['fixedprice_2_gas_mcf']= each['fixedprice_2_gas_mcf']

                if 'fixedprice_3_gas_mcf' in each and each['fixedprice_3_gas_mcf'] != 'null':
                    dbObj['fixedprice_3_gas_mcf']= each['fixedprice_3_gas_mcf']

                if 'term_1' in each and each['term_1'] != 'null':
                    dbObj['term_1']= each['term_1']

                if 'term_2' in each and each['term_2'] != 'null':
                    dbObj['term_2']= each['term_2']

                if 'term_3' in each and each['term_3'] != 'null':
                    dbObj['term_3']= each['term_3']
               
                if 'smb_bdm_name' in each and each['smb_bdm_name'] != 'null':
                    dbObj['smb_bdm_name']= each['smb_bdm_name']

                if 'smb_bdm_phone' in each and each['smb_bdm_phone'] != 'null':
                    if '.' in str(each['smb_bdm_phone']):
                            dbObj['smb_bdm_phone']= str(int(float(each['smb_bdm_phone'])))
                    else:
                            dbObj['smb_bdm_phone']= str(each['smb_bdm_phone'])

                if 'smb_bdm_email' in each and each['smb_bdm_email'] != 'null' and each['smb_bdm_email'].strip() != '' and each['smb_bdm_email'] != None:
                    dbObj['smb_bdm_email']= each['smb_bdm_email']
                print('<< OBJ >> ',dbObj)
                
                N = urandom(20).hex()
                now = str(datetime.now())
                #<< END of HUBSPOT INGESTION >>
                # table.put_item(
                Item =  {
                    'CogId' : str(N),
                    'CreatedAt': now,
                    'ExternalId': UniqueKey,
                    'Information' : dbObj,
                    'SourceBucket': landing_zone_bucket_name,
                    'SourcePath' : event['file_path'],
                    'Source' : 'HubSpot',
                    'SourceId' : source_id,
                    'SourceFileName': time_stamp + '_' + file_name
                }
                #WRITE-TO-DYNAMO
                files_processing = dynamoPut(Item)
                if not files_processing:
                    print('Writing {} record to dynamodb Failed'.format(Item))

            except Exception as e:
                print(e)
                N = urandom(20).hex()
                Item =  {
                    'CogId' : str(N),
                    'CreatedAt': now,
                    'Information' : each,
                    'SourceBucket': landing_zone_bucket_name,
                    'SourcePath' : event['file_path'],
                    'Source' : 'HubSpot',
                    'message': str(e),
                    'SourceId' : source_id,
                    'ExternalId': UniqueKey
                }
                files_processing = dynamoPut(Item,'Fail')
                pass
        temp_file_name = time_stamp + '_' + file_name
        isert_file_name = dynamoPutFileName(temp_file_name,'HubSpot')
        post_to_hubspot(dbObj)
        return True
    except Exception as e:
        print(e)
        new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
        unprocessed_folder_path = os.environ['CSV_ERROR_FOLDER_HUBSPOT']
        # MOVING PROCESSED FILES FROM NEW TO UNPROCESSED FOLDER
        move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,unprocessed_folder_path)
        return False 


def try_parsing_date(text):
    for fmt in ('%m/%d/%Y','%Y-%m-%dT%H:%M:%S-%f', '%m/%d/%y', '%Y-%m-%d', '%m.%d.%Y','%Y-%m-%dT%I', '%Y-%m-%dT%I%p', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f+','%Y-%m-%dT%H:%M:%S'):#2018-11-20T08:05:54-0500
        try:
            return datetime.strptime(text, fmt)
        except ValueError:
            print('in except')
            pass
    return ValueError('no valid date format found')

def post_to_hubspot(list_contacts):
    print('<< IN POST-To-HUBSPOT >>')
    data_string = **json.dumps(list_contacts)**
    payload_data = {"properties": data_string}
    print('<< dbOBJ LIST >> ',payload_data)
    
    response = requests.request("POST", endpoint_url+access_key, headers={'Content-Type': 'application/json'}, data=payload_data)
    token_response=json.loads(response.text)
    print('<< TOKEN RESPONSE >>',token_response)

def moving_files_new_to_processed(event, new_folder,processed_folder):
    #MOVING-FILES-TO-PROCESSED
    try:
        copy_source = {
            'Bucket': landing_zone_bucket_name,
                'Key': event['file_path']
            }
        path = event['file_path']
        processed_folder = processed_folder + time_stamp + '_'
        new_key = path.replace(new_folder, processed_folder)
        new_obj = landing_zone_bucket.Object(new_key)
        new_obj.copy(copy_source)
        s3.Object(landing_zone_bucket_name, event['file_path']).delete()
        return True
    except Exception as e:
        print(e)
        return False


def lambda_handler(event,context):
    print("Starting to Push Records to Dynamo Lambda")
    print(event) 
    try:
        parse_flag = False
        new_folder_path = ''
        processed_folder_path = ''
        #Gets file path and calls required function to parse it out
        key = str(os.environ['CSV_NEW_FOLDER_HUBSPOT'])
        obj = s3r.get_object(Bucket=landing_zone_bucket_name, Key=event['file_path'])
        
        print('after obj')
        print(os.environ['CSV_NEW_FOLDER_HUBSPOT']) 
        print('in HubSpot parse_csv')
        parse_csv_func = parse_csv_hubspot(event, obj)
        # Checks if parse_csv return empty dictionary
        if parse_csv_func:
            parse_flag = True
            new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
            processed_folder_path = os.environ['CSV_PROCESSED_FOLDER_HUBSPOT']
        else:
            print('File Format not Supported for {}'.format(event['file_path']))
        if parse_flag:
            # UPLOADING CONTACT.MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
            #print('<< PAYLOAD >> ',payload)
            #response = requests.request("POST", "https://api.hubapi.com/crm/v3/schemas/?hapikey="+access_key, headers={'Content-Type': 'application/json'}, data=json.dumps(str(payload)))
            #token_response=json.loads(response.text)
            #print('<< TOKEN RESPONSE >>',token_response)
            #MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
            move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,processed_folder_path)
            if move_file_to_processed:
                print('File {} moved Successfully from {} to {}'.format(event['file_path'],new_folder_path,processed_folder_path))
            else:
                print('Moving {} file from new to processing folder Failed'.format(event['file_path']))
            
    except Exception as e:
        print(e) 

What could be the problem? Thanks for your help.

1

There are 1 best solutions below

0
On

The problem was caused by two issues:

  1. The dictionary should have been placed in json.dumps() to convert it to JSON string when doing a POST so the dictionary didn't need to change its structure. Here's the response from the POST:

    << TOKEN RESPONSE >> { "id": "135120801", "properties": { "business_name": "Millers Brand Oats", "createdate": "2021-12-21T02:31:12.452Z", "fixedprice_1": "6.63", "fixedprice_2": "6.11", "fixedprice_3": "5.9", "hs_all_contact_vids": "135120801", "hs_is_contact": "true", "hs_is_unworked": "true", "hs_marketable_until_renewal": "false", "hs_object_id": "135120801", "hs_pipeline": "contacts-lifecycle-pipeline", "lastmodifieddate": "2021-12-21T02:31:12.452Z", "proposal_date": "2021-12-07", "proposal_start_date": "2022-01-01", "smb_bdm_email": "[email protected]", "smb_bdm_name": "Tim Chu", "smb_bdm_phone": "833-999-9999", "smbapi": "yes", "srvc_address_1": "4844 Stenstrom Rd", "srvc_state_1": "IL", "srvc_zip_1": "61109", "term_2": "24", "term_3": "36", "udc_code_1": "COMED" }, "createdAt": "2021-12-21T02:31:12.452Z", "updatedAt": "2021-12-21T02:31:12.452Z", "archived": false }

  2. I was using the wrong endpoint:

https://api.hubapi.com/crm/v3/schemas/

instead of:

https://api.hubapi.com/crm/v3/objects/contacts/

Now I just need to find out why the AWS Lambda POSTs allow for duplicate contacts to be created in HubSpot while Postman POSTs prohibit duplicate contacts to be created.