astpicotts/helpers.py
2019-06-07 16:12:49 +02:00

144 lines
4.8 KiB
Python

#!/usr/bin/env python2
import re
import subprocess
import logging
import time
def get_path(soft):
try:
path = subprocess.check_output(['which', soft]).strip()
return path
except subprocess.CalledProcessError, e:
logging.fatal('%s not found' % soft)
return False
def get_sox_ver(sox_path=None):
global sox_ver
if 'sox_ver' not in globals() or not sox_ver:
if not sox_path:
sox_path = get_path('sox')
try:
result = subprocess.check_output([sox_path, '--version'])
logging.debug('Sox accept --version parameter : consider as version 14')
sox_ver = 14
except subprocess.CalledProcessError:
logging.debug('Sox not accept --version parameter : consider as version 12')
sox_ver = 12
return sox_ver
def enable_simulate_mode():
global SIMULATE_MODE
SIMULATE_MODE = True
def check_simulate_mode():
return 'SIMULATE_MODE' in globals() and SIMULATE_MODE
def get_var(asterisk_agi, varname):
if check_simulate_mode():
result = raw_input('Simulate mode : please enter "%s" variable value =>> ' % varname)
else:
result = asterisk_agi.get_full_variable(varname)
return result
def get_env_var(asterisk_agi, varname):
if check_simulate_mode():
result = raw_input('Simulate mode : please enter "%s" environnement variable value =>> ' % varname)
else:
result = asterisk_agi.env.get(varname)
return result
def set_var(asterisk_agi, varname, value):
if check_simulate_mode():
print('Simulate mode : set variable %s to "%s"' % (varname, value))
else:
logging.info('Set variable %s to "%s"' % (varname, value))
asterisk_agi.set_variable(varname, value)
def detect_format(asterisk_agi):
if check_simulate_mode():
return ("sln", 8000)
nativeformat = asterisk_agi.get_full_variable('${CHANNEL(audionativeformat)}')
logging.debug('Native audio format : %s' % nativeformat)
if re.match('(silk|sln)12', nativeformat):
return ("sln12", 12000)
elif re.match('(speex|slin|silk)16|g722|siren7', nativeformat):
return ("sln16", 16000)
elif re.match('(speex|slin|celt)32|siren14', nativeformat):
return ("sln32", 32000)
elif re.match('(celt|slin)44', nativeformat):
return ("sln44", 44100)
elif re.match('(celt|slin)48', nativeformat):
return ("sln48", 48000)
else:
return ("sln", 8000)
def remove_ext(filepath):
return re.sub('\.[a-zA-Z0-9]+$','',filepath)
any_intkeys = "0123456789#*"
def playback(asterisk_agi, filepath, simulate_play=False, read=False, read_timeout=3000, read_maxdigits=20, intkey=None):
if check_simulate_mode():
logging.debug('Simulate mode : Play file %s' % filepath)
try:
mplayer_path = get_path('mplayer')
subprocess.check_output([mplayer_path, filepath])
except Exception, e:
logging.warning('Fail to play %s file : %s' % (filepath, e))
if read:
result = raw_input('=>> ')
else:
# Simulate empty result
result = ''
else:
play_filepath = remove_ext(filepath)
logging.debug('Asterisk play file path : %s' % play_filepath)
if read:
result = asterisk_agi.get_data(play_filepath, timeout=read_timeout, max_digits=read_maxdigits)
else:
if intkey == "any":
global any_intkeys
intkey = any_intkeys
result = asterisk_agi.stream_file(play_filepath, escape_digits=intkey)
logging.debug('User enter "%s"' % result)
return result
def beep(asterisk_agi):
if check_simulate_mode():
logging.debug('Beep')
else:
playback(asterisk_agi, 'beep')
def check_answered(asterisk_agi):
if check_simulate_mode():
print('Simulate mode : Channel answered')
else:
status = asterisk_agi.channel_status()
if status == 4:
logging.debug('Call is riging. Answer it and wait one second.')
asterisk_agi.answer()
time.sleep(1)
def hangup(asterisk_agi):
if check_simulate_mode():
print('Simulate mode : Hangup')
else:
asterisk_agi.hangup()
def record_file(asterisk_agi, filepath, fileformat='wav', escape_digits='#', max_duration=-1, simulate_record=False, simulate_record_duration=5):
if check_simulate_mode():
if not simulate_record:
raw_input('Simulate mode : record %s file "%s" until you press on touch [enter]' % (fileformat, filepath))
else:
assert fileformat == 'wav', "Only wav file format in support in simulate record mode"
logging.debug('Simulate mode : Record file %s', filepath)
raw_input('The record will start after you press [entre] key. In this mode, the record will stop after %d seconds.' % simulate_record_duration)
try:
arecord_path = get_path('arecord')
subprocess.check_output([arecord_path, '-vv', '-fdat', '-d %d' % simulate_record_duration, filepath])
except Exception, e:
logging.warning('Fail to record %s file', filepath, exc_info=True)
else:
logging.debug("Record %s file '%s' until user press [%s] key", fileformat, filepath, escape_digits)
return asterisk_agi.record_file(filepath, format=fileformat, escape_digits=escape_digits, timeout=max_duration)