I want to generate call to a group of contacts(suppose i have 1000 contacts in a group) from database at a time. Here for example i have 30 concurrent channels, i want that the code should originate call to 30 numbers at the same time, and if some channel is free than again call will generate equal to the number of free channel and the process continues.
what i have achieved in the below code: it generates call but one by one, when a call is answered or drops than a next call is generated, but i dont want it that way.
import time
import asterisk.manager
import pymysql
# Assuming you have pymysql library installed, you can install it with: pip install pymysql
DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASSWORD = 'XXXX'
DB_NAME = 'abcd'
AMI_USERNAME = 'admin'
AMI_PASSWORD = 'XYZ'
AMI_HOST = '127.0.0.1'
AMI_PORT = 5038
TOTAL_CHANNELS_LIMIT = 30 # Set your total channel limit here
def connect_to_db():
connection = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, db=DB_NAME, cursorclass=pymysql.cursors.DictCursor)
return connection
def connect_to_ami():
manager = asterisk.manager.Manager()
manager.connect(AMI_HOST, AMI_PORT)
manager.login(AMI_USERNAME, AMI_PASSWORD)
return manager
def check_active_channels(manager):
response = manager.command('core show channels concise')
active_channels = [line.split()[0] for line in response.data.split('\n') if line.strip()]
return active_channels
def get_active_campaigns(connection):
with connection.cursor() as cursor:
cursor.execute("SELECT name as campaign_name, call_group, fixed_channels FROM vb_schedule_play WHERE status = 'active'")
result = cursor.fetchall()
active_campaigns = [{'name': row['campaign_name'], 'fixed_channels': row['fixed_channels'], 'call_group': row['call_group']} for row in result]
return active_campaigns
def get_contacts_for_campaign(connection, campaign_name):
with connection.cursor() as cursor:
sql = f"SELECT phone as contact_number FROM contact_list WHERE group_name = 'sohub_group' AND (status IS NULL OR status = '') LIMIT 1"
cursor.execute(sql)
result = cursor.fetchone()
if result:
return result['contact_number']
else:
return None
def update_contact_status(connection, contact_number, campaign_name, status):
with connection.cursor() as cursor:
sql = f"UPDATE contact_list SET status = '{status}' WHERE phone = '{contact_number}'"
cursor.execute(sql)
connection.commit()
def initiate_calls(manager, campaign_name, fixed_channels, connection, total_channels_limit=TOTAL_CHANNELS_LIMIT):
if fixed_channels is None:
fixed_channels = 0
fixed_channels = int(fixed_channels) if fixed_channels is not None else 0
if fixed_channels > 0:
# Use fixed_channels for the specified campaign
channels_to_use = min(fixed_channels, total_channels_limit)
else:
# Dynamic channel allocation for campaigns with fixed_channels equal to 0
available_channels = check_active_channels(manager)
channels_to_use = min(len(available_channels), total_channels_limit)
total_channels_used = 0 # Initialize the total channels used
for _ in range(channels_to_use):
if total_channels_used >= total_channels_limit:
print(f"Reached the total channel limit ({total_channels_limit}). Stopping further calls.")
break
contact_number = get_contacts_for_campaign(connection, campaign_name)
if contact_number:
full_channel = f'PJSIP/{contact_number}'
manager.originate(
channel=full_channel,
exten='s',
context='ami-contact-center',
priority=1,
caller_id='0123456789',
variables={
'CallerID': '0123456789',
'CALLERID(all)': '0123456789',
'play_type': 'tts',
'play': 'hello this is a test call',
},
timeout=30000
)
update_contact_status(connection, contact_number, campaign_name, 'calling')
total_channels_used += 1 # Increment the total channels used
time.sleep(1) # Add a delay to avoid overwhelming the system
else:
print(f"No more contacts available for campaign {campaign_name}")
def main():
db_connection = connect_to_db()
ami_manager = connect_to_ami()
active_campaigns = get_active_campaigns(db_connection)
for campaign in active_campaigns:
campaign_name = campaign['name']
fixed_channels = campaign['fixed_channels']
initiate_calls(ami_manager, campaign_name, fixed_channels, db_connection, total_channels_limit=TOTAL_CHANNELS_LIMIT)
ami_manager.logoff()
db_connection.close()
if __name__ == '__main__':
main()
There are many ways to do it.
make concurrent connections.
use call files
Use async originate version.
Sequence of events: first the Channel is rung. Then, when that answers, the Extension
Most reliable way is call-files.