Asterisk AMI call originate with python for a group of contact at a time

238 Views Asked by At

I want to generate call to a group of contacts(suppose i have 1000 contacts in a group) from database at a time. Here for example i have 30 concurrent channels, i want that the code should originate call to 30 numbers at the same time, and if some channel is free than again call will generate equal to the number of free channel and the process continues.

what i have achieved in the below code: it generates call but one by one, when a call is answered or drops than a next call is generated, but i dont want it that way.

import time
import asterisk.manager
import pymysql

# Assuming you have pymysql library installed, you can install it with: pip install pymysql

DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASSWORD = 'XXXX'
DB_NAME = 'abcd'

AMI_USERNAME = 'admin'
AMI_PASSWORD = 'XYZ'
AMI_HOST = '127.0.0.1'
AMI_PORT = 5038

TOTAL_CHANNELS_LIMIT = 30  # Set your total channel limit here

def connect_to_db():
    connection = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db=DB_NAME, cursorclass=pymysql.cursors.DictCursor)
    return connection

def connect_to_ami():
    manager = asterisk.manager.Manager()
    manager.connect(AMI_HOST, AMI_PORT)
    manager.login(AMI_USERNAME, AMI_PASSWORD)
    return manager

def check_active_channels(manager):
    response = manager.command('core show channels concise')
    active_channels = [line.split()[0] for line in response.data.split('\n') if line.strip()]
    return active_channels

def get_active_campaigns(connection):
    with connection.cursor() as cursor:
        cursor.execute("SELECT name as campaign_name, call_group, fixed_channels FROM vb_schedule_play WHERE status = 'active'")
        result = cursor.fetchall()
        active_campaigns = [{'name': row['campaign_name'], 'fixed_channels': row['fixed_channels'], 'call_group': row['call_group']} for row in result]
        return active_campaigns

def get_contacts_for_campaign(connection, campaign_name):
    with connection.cursor() as cursor:
        sql = f"SELECT phone as contact_number FROM contact_list WHERE group_name = 'sohub_group'  AND (status IS NULL OR status = '') LIMIT 1"
        cursor.execute(sql)
        result = cursor.fetchone()
        if result:
            return result['contact_number']
        else:
            return None

def update_contact_status(connection, contact_number, campaign_name, status):
    with connection.cursor() as cursor:
        sql = f"UPDATE contact_list SET status = '{status}' WHERE phone = '{contact_number}'"
        cursor.execute(sql)
        connection.commit()

def initiate_calls(manager, campaign_name, fixed_channels, connection, total_channels_limit=TOTAL_CHANNELS_LIMIT):
    if fixed_channels is None:
        fixed_channels = 0

    fixed_channels = int(fixed_channels) if fixed_channels is not None else 0

    if fixed_channels > 0:
        # Use fixed_channels for the specified campaign
        channels_to_use = min(fixed_channels, total_channels_limit)
    else:
        # Dynamic channel allocation for campaigns with fixed_channels equal to 0
        available_channels = check_active_channels(manager)
        channels_to_use = min(len(available_channels), total_channels_limit)

    total_channels_used = 0  # Initialize the total channels used

    for _ in range(channels_to_use):
        if total_channels_used >= total_channels_limit:
            print(f"Reached the total channel limit ({total_channels_limit}). Stopping further calls.")
            break

        contact_number = get_contacts_for_campaign(connection, campaign_name)
        if contact_number:
            full_channel = f'PJSIP/{contact_number}'
            manager.originate(
                channel=full_channel,
                exten='s',
                context='ami-contact-center',
                priority=1,
                caller_id='0123456789',
                variables={
                    'CallerID': '0123456789',
                    'CALLERID(all)': '0123456789',
                    'play_type': 'tts',
                    'play': 'hello this is a test call',
                },
                timeout=30000
            )
            update_contact_status(connection, contact_number, campaign_name, 'calling')
            total_channels_used += 1  # Increment the total channels used
            time.sleep(1)  # Add a delay to avoid overwhelming the system
        else:
            print(f"No more contacts available for campaign {campaign_name}")

def main():
    db_connection = connect_to_db()
    ami_manager = connect_to_ami()

    active_campaigns = get_active_campaigns(db_connection)

    for campaign in active_campaigns:
        campaign_name = campaign['name']
        fixed_channels = campaign['fixed_channels']

        initiate_calls(ami_manager, campaign_name, fixed_channels, db_connection, total_channels_limit=TOTAL_CHANNELS_LIMIT)

    ami_manager.logoff()
    db_connection.close()

if __name__ == '__main__':
    main()

1

There are 1 best solutions below

0
arheops On

There are many ways to do it.

  1. make concurrent connections.

  2. use call files

  3. Use async originate version.

     Action: Originate
     Parameters:
    
     Channel: Channel on which to originate the call (The same as you specify in the Dial application command)
     Context: Context to use on connect (must use Exten & Priority with it)
     Exten: Extension to use on connect (must use Context & Priority with it)
     Priority: Priority to use on connect (must use Context & Exten with it)
     Timeout: Timeout (in milliseconds) for the originating connection to happen(defaults to 30000 milliseconds)
     CallerID: CallerID to use for the call
     Variable: Channels variables to set (max 32). Variables will be set for both channels (local and connected).
     Account: Account code for the call
     Application: Application to use on connect (use Data for parameters)
     Data : Data if Application parameter is used
     **Async**: For the origination to be asynchronous (allows multiple calls to be generated without waiting for a response)
     ActionID: The request identifier. It allows you to identify the response to this request. You may use a number or a string. Useful when you make several simultaneous requests.
    

    Sequence of events: first the Channel is rung. Then, when that answers, the Extension

Most reliable way is call-files.