#!/usr/bin/env python2 import re import subprocess import logging import time def get_path(soft): try: path = subprocess.check_output(['which', soft]).strip() return path except subprocess.CalledProcessError, e: logging.fatal('%s not found' % soft) return False def get_sox_ver(sox_path=None): global sox_ver if 'sox_ver' not in globals() or not sox_ver: if not sox_path: sox_path = get_path('sox') try: result = subprocess.check_output([sox_path, '--version']) logging.debug('Sox accept --version parameter : consider as version 14') sox_ver = 14 except subprocess.CalledProcessError: logging.debug('Sox not accept --version parameter : consider as version 12') sox_ver = 12 return sox_ver def enable_simulate_mode(): global SIMULATE_MODE SIMULATE_MODE = True def check_simulate_mode(): return 'SIMULATE_MODE' in globals() and SIMULATE_MODE def get_var(asterisk_agi, varname): if check_simulate_mode(): result = raw_input('Simulate mode : please enter "%s" variable value =>> ' % varname) else: result = asterisk_agi.get_full_variable(varname) return result def set_var(asterisk_agi, varname, value): if check_simulate_mode(): print('Simulate mode : set variable %s to "%s"' % (varname, value)) else: logging.info('Set variable %s to "%s"' % (varname, value)) asterisk_agi.set_variable(varname, value) def detect_format(asterisk_agi): if check_simulate_mode(): return ("sln", 8000) nativeformat = asterisk_agi.get_full_variable('${CHANNEL(audionativeformat)}') logging.debug('Native audio format : %s' % nativeformat) if re.match('(silk|sln)12', nativeformat): return ("sln12", 12000) elif re.match('(speex|slin|silk)16|g722|siren7', nativeformat): return ("sln16", 16000) elif re.match('(speex|slin|celt)32|siren14', nativeformat): return ("sln32", 32000) elif re.match('(celt|slin)44', nativeformat): return ("sln44", 44100) elif re.match('(celt|slin)48', nativeformat): return ("sln48", 48000) else: return ("sln", 8000) def remove_ext(filepath): return re.sub('\.[a-zA-Z0-9]+$','',filepath) any_intkeys = "0123456789#*" def playback(asterisk_agi, filepath, simulate_play=False, read=False, read_timeout=3000, read_maxdigits=20, intkey=None): if check_simulate_mode(): logging.debug('Simulate mode : Play file %s' % filepath) try: mplayer_path = get_path('mplayer') subprocess.check_output([mplayer_path, filepath]) except Exception, e: logging.warning('Fail to play %s file : %s' % (filepath, e)) if read: result = raw_input('=>> ') else: # Simulate empty result result = '' else: play_filepath = remove_ext(filepath) logging.debug('Asterisk play file path : %s' % play_filepath) if read: result = asterisk_agi.get_data(play_filepath, timeout=read_timeout, max_digits=read_maxdigits) else: if intkey == "any": global any_intkeys intkey = any_intkeys result = asterisk_agi.stream_file(play_filepath, escape_digits=intkey) logging.debug('User enter "%s"' % result) return result def beep(asterisk_agi): if check_simulate_mode(): logging.debug('Beep') else: playback(asterisk_agi, 'beep') def check_answered(asterisk_agi): if check_simulate_mode(): print('Simulate mode : Channel answered') else: status = asterisk_agi.channel_status() if status == 4: logging.debug('Call is riging. Answer it and wait one second.') asterisk_agi.answer() time.sleep(1) def hangup(asterisk_agi): if check_simulate_mode(): print('Simulate mode : Hangup') else: asterisk_agi.hangup() def record_file(asterisk_agi, filepath, fileformat='wav', escape_digits='#', max_duration=-1, simulate_record=False, simulate_record_duration=5): if check_simulate_mode(): if not simulate_record: raw_input('Simulate mode : record %s file "%s" until you press on touch [enter]' % (fileformat, filepath)) else: assert fileformat == 'wav', "Only wav file format in support in simulate record mode" logging.debug('Simulate mode : Record file %s', filepath) raw_input('The record will start after you press [entre] key. In this mode, the record will stop after %d seconds.' % simulate_record_duration) try: arecord_path = get_path('arecord') subprocess.check_output([arecord_path, '-vv', '-fdat', '-d %d' % simulate_record_duration, filepath]) except Exception, e: logging.warning('Fail to record %s file', filepath, exc_info=True) else: return asterisk_agi.record_file(filepath, format=fileformat, escape_digits=escape_digits, timeout=max_duration)