astpicotts/helpers.py

114 lines
3.3 KiB
Python
Raw Normal View History

2018-01-22 01:39:24 +01:00
#!/usr/bin/env python2
import re
import subprocess
import logging
import time
def get_path(soft):
try:
path = subprocess.check_output(['which', soft]).strip()
return path
except subprocess.CalledProcessError, e:
logging.fatal('%s not found' % soft)
return False
def get_sox_ver(sox_path=None):
global sox_ver
if 'sox_ver' not in globals() or not sox_ver:
if not sox_path:
sox_path = get_path('sox')
try:
result = subprocess.check_output([sox_path, '--version'])
logging.debug('Sox accept --version parameter : consider as version 14')
sox_ver = 14
except subprocess.CalledProcessError:
logging.debug('Sox not accept --version parameter : consider as version 12')
sox_ver = 12
return sox_ver
def enable_simulate_mode():
global SIMULATE_MODE
SIMULATE_MODE = True
def check_simulate_mode():
return 'SIMULATE_MODE' in globals() and SIMULATE_MODE
def get_var(asterisk_agi, varname):
if check_simulate_mode():
result = raw_input('Simulate mode : please enter "%s" variable value =>> ' % varname)
else:
result = asterisk_agi.get_full_variable(varname)
return result
def set_var(asterisk_agi, varname, value):
if check_simulate_mode():
print('Simulate mode : set variable %s to "%s"' % (varname, value))
else:
logging.info('Set variable %s to "%s"' % (varname, value))
asterisk_agi.set_variable(varname, value)
def detect_format(asterisk_agi):
if check_simulate_mode():
return ("sln", 8000)
nativeformat = asterisk_agi.get_full_variable('${CHANNEL(audionativeformat)}')
logging.debug('Native audio format : %s' % nativeformat)
if re.match('(silk|sln)12', nativeformat):
return ("sln12", 12000)
elif re.match('(speex|slin|silk)16|g722|siren7', nativeformat):
return ("sln16", 16000)
elif re.match('(speex|slin|celt)32|siren14', nativeformat):
return ("sln32", 32000)
elif re.match('(celt|slin)44', nativeformat):
return ("sln44", 44100)
elif re.match('(celt|slin)48', nativeformat):
return ("sln48", 48000)
else:
return ("sln", 8000)
def remove_ext(filepath):
return re.sub('\.[a-zA-Z0-9]+$','',filepath)
any_intkeys = "0123456789#*"
def playback(asterisk_agi, filepath, simulate_play=False, read=False, read_timeout=3000, read_maxdigits=20, intkey=None):
if check_simulate_mode():
logging.debug('Simulate mode : Play file %s' % filepath)
try:
mplayer_path = get_path('mplayer')
subprocess.check_output([mplayer_path, filepath])
except Exception, e:
logging.warning('Fail to play %s file : %s' % (filepath, e))
if read:
result = raw_input('=>> ')
else:
# Simulate empty result
result = ''
else:
play_filepath = remove_ext(filepath)
logging.debug('Asterisk play file path : %s' % play_filepath)
if read:
result = asterisk_agi.get_data(play_filepath, timeout=read_timeout, max_digits=read_maxdigits)
else:
if intkey == "any":
global any_intkeys
intkey = any_intkeys
result = asterisk_agi.stream_file(play_filepath, escape_digits=intkey)
logging.debug('User enter "%s"' % result)
return result
def check_answered(asterisk_agi):
if check_simulate_mode():
print('Simulate mode : Channel answered')
else:
status = asterisk_agi.channel_status()
if status == 4:
logging.debug('Call is riging. Answer it and wait one second.')
asterisk_agi.answer()
time.sleep(1)
def hangup(asterisk_agi):
if check_simulate_mode():
print('Simulate mode : Hangup')
else:
asterisk_agi.hangup()