anhaibo/2023-02
实时质检或坐席辅助系统设计的一般原理是将客户和坐席通话的两轨实时语音流送到 ASR 识别后经 NLU 意图识别等完成。一般套装软件如 Avaya,Genesys 等使用网络镜像方式抓取语音包,FreeSWITCH 因为是开源系统一般使用 media_bug 旁路语音流的方式抓取语音包再进行后续处理。因为使用公司自研的 ASR,需要通过 socket udp 或 websocket 方式将语音流发送到 ASR Proxy 再将识别/NLU 后的文字结果回传坐席或质检员。如果是类似阿里的 ASR 则可以直接在 FreeSWITCH 本机中调用阿里的 nlusdk 进行识别,本文介绍 ASR Proxy 模式。
系统架构如下:
还是比较简单的就 3 个组件:FreeSWITCH, websocketsever(asrproxy),asrserver
下面重点介绍一下怎么写 1 个 FreeSWITCH 的 mod_mediabugwebsocket
(上次写线上 c 程序还是 2003 年的泰康人寿 95522 呼叫中心项目,因为 FreeSWITCH 的 mod 只能用 c/c++实现,所以又需要 baidu/google/知乎/csdn 里找寻相关技能了,好在现在有了 github 翻墙后能找到很多优秀的源码,本文参考了好几个 git 的代码,lightwebsocket client 也是 git 上找的!!!)
基本上是 3 步:
1,mod_load 初始化,自定义 1 个 struct,初始化 websocket 指针等,传递相关参数;
2,media_bug 获取 frame data;并使用 websocket 将每个 frame data 送到 websocket server;
3,关闭 websocket,mod_shutdown.
下面详细介绍如上部分:
mod 的框架都是一样的,可以参考 FreeSWITCH 自带的#src#/mod/sdk/autotools/src/mod_example.c
或者杜老师的 mod_aac:https://github.com/rts-cn/mod_aac
#include <switch.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <assert.h>
#include <string>
#include <g711.h>
//github上下载的1个websocketclient,不过close那里有个坑,后面我说下怎么fix:
#include "light_websocket_client.hpp" //链接:https://github.com/hank4187yan/light-websocket-client
using namespace lightws;
typedef struct {
CWebsocket* ws_cli;
char ws_url[200]; //websocket url
char uuid[40]; //通话UUID,用a-leg
char userCode[20]; //坐席工号
char userExt[20]; //坐席分机号
char direct[2]; //语音流向 1:客户 2:坐席
char mediaType[2]; //媒体类型 0:PCMU 8:PCMA
char udp_key[30]; //SWITCH_ABC_TYPE_INIT/SWITCH_ABC_TYPE_CLOSE/SWITCH_ABC_TYPE_READ
unsigned long long seq; //frame序号,方便调试
time_t timestamp; //frame 时间戳
#ifdef _TEMP_LOCALFILE
FILE *fp; //调试用的临时本地文件,看看取的包对不对
#endif
} udp_info_t;
SWITCH_MODULE_LOAD_FUNCTION(mod_mediabugwebsocket_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mediabugwebsocket_shutdown);
//模块定义,分别是模块加载、模块卸载
SWITCH_MODULE_DEFINITION(mod_mediabugwebsocket, mod_mediabugwebsocket_load, mod_mediabugwebsocket_shutdown, NULL);
//启动-停止函数
SWITCH_STANDARD_APP(mediabugudp_start_function);
SWITCH_STANDARD_APP(mediabugudp_stop_function);
//mediabug回调函数
static switch_bool_t mediabugudp_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type);
//websocket的3个函数
static int socket_init(udp_info_t *udp_info);
static void socket_send_framedata(udp_info_t *udp_info,switch_frame_t *frame);
static void socket_close(udp_info_t *udp_info);
//模块加载
SWITCH_MODULE_LOAD_FUNCTION(mod_mediabugwebsocket_load)
{
switch_application_interface_t *app_interface;
*module_interface = switch_loadable_module_create_module_interface(pool, modname);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "mediabugudp mod loaded version[20230202_v1.08].\n");
SWITCH_ADD_APP(app_interface, "mediabugwebsocket_start", "mediabugwebsocket", "mediabugwebsocket", mediabugudp_start_function, "", SAF_NONE);
SWITCH_ADD_APP(app_interface, "mediabugwebsocket_stop" , "mediabugwebsocket", "mediabugwebsocket", mediabugudp_stop_function , "", SAF_NONE);
return SWITCH_STATUS_SUCCESS;
}
//模块卸载
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mediabugwebsocket_shutdown)
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "mediabugudp shutdown\n");
return SWITCH_STATUS_SUCCESS;
}
//mediabugudp 执行函数
SWITCH_STANDARD_APP(mediabugudp_start_function)
{
switch_media_bug_t *bug;
switch_status_t status;
switch_channel_t *channel;
udp_info_t *udp_info;
//传入参数
int argc;
char *lbuf;
char *argv[5] = { 0 };
lbuf = switch_core_session_strdup(session, data);
argc=switch_separate_string(lbuf, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
if( zstr(data) || (lbuf==NULL) || (argc < 5) )
{
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[[mediabugudp_start_function]]argv error!!!\n");
return;
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,"mediabugudp_start_function!\n");
udp_info = (udp_info_t *)malloc(sizeof(udp_info_t)); //动态分配一块内存,保存websocket,fp等指针;
if(udp_info==NULL) return;
//传参操作
udp_info->ws_cli=NULL;
memset(udp_info->ws_url,0x00,sizeof(udp_info->ws_url));
memset(udp_info->uuid,0x00,sizeof(udp_info->uuid));
memset(udp_info->userCode,0x00,sizeof(udp_info->userCode));
memset(udp_info->userExt,0x00,sizeof(udp_info->userExt));
memset(udp_info->direct,0x00,sizeof(udp_info->direct));
memset(udp_info->mediaType,0x00,sizeof(udp_info->mediaType));
memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));
strcpy(udp_info->ws_url,argv[0]);
strcpy(udp_info->uuid,argv[1]);
strcpy(udp_info->userCode,argv[2]);
strcpy(udp_info->userExt,argv[3]);
strcpy(udp_info->direct,argv[4]);
udp_info->seq=0;
time(&udp_info->timestamp);
//临时文件操作
#ifdef _TEMP_LOCALFILE
char filename[256];
memset(filename,0x00,sizeof(filename));
sprintf(filename,"/usr/local/freeswitch/recordings/%ld_%s_%s_%s.pcm",udp_info->timestamp,udp_info->uuid,udp_info->userCode,udp_info->direct);
udp_info->fp=fopen(filename,"ab+");
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "filename=[%s][%p]\n",filename,udp_info->fp);
#endif
channel = switch_core_session_get_channel(session);
if (session == NULL)
return;
//Has the media bug started?
bug = (switch_media_bug_t *) switch_channel_get_private(channel, "_mediabugudp_");
if (bug != NULL)
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "[[mediabugudp_start_function]] bug is already running,retun!!!\n");
return;
}
//添加media bug
status = switch_core_media_bug_add(session, "_mediabugudp_", 0, mediabugudp_callback, udp_info, 0,
SMBF_READ_STREAM|SWITCH_ABC_TYPE_READ_REPLACE|SMBF_NO_PAUSE,&bug);
if (status != SWITCH_STATUS_SUCCESS) {
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "[[mediabugudp_start_function]] bug add failed!\n");
return;
}
//设置变量, 用于检测重复
switch_channel_set_private(channel, "_mediabugudp_", bug);
return;
}
SWITCH_STANDARD_APP(mediabugudp_stop_function) //其实这个函数调用不到,电话挂机后bug,channel,session都会自动销毁
{
switch_media_bug_t *bug;
switch_channel_t *channel;
if (session == NULL)
return;
channel = switch_core_session_get_channel(session);
if (switch_channel_get_private(channel, "_mediabugudp_"))
{
bug = (switch_media_bug_t *) switch_channel_get_private(channel, "_mediabugudp_");
switch_channel_set_private(channel, "_mediabugudp_", NULL);
switch_core_media_bug_remove(session, &bug); // 移除 media bug
}
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[[mediabugudp stop function]]\n");
}
static switch_bool_t mediabugudp_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
char uuid_1[80],uuid_2[80];
switch_channel_t *channel;
switch_core_session_t * session;
switch_codec_t *read_codec;
udp_info_t *udp_info = (udp_info_t *)user_data;
uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
switch_frame_t frame = { 0 };
frame.data = data;
frame.buflen = sizeof(data);
session = switch_core_media_bug_get_session(bug);
channel= switch_core_session_get_channel(session);
memset(uuid_1,0x00,sizeof(uuid_1));
memset(uuid_2,0x00,sizeof(uuid_2));
strcpy(uuid_1,udp_info->uuid);
strcpy(uuid_2,switch_channel_get_variable(channel,"uuid"));
switch (type) {
case SWITCH_ABC_TYPE_INIT:
{"[[mediabugudp_callback-->SWITCH_ABC_TYPE_INIT]] init,|uuid_in[%s]|uuid_this[%s]\n",uuid_1,uuid_2);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
read_codec = switch_core_session_get_read_codec(session);"read_codec=[%s]!\n",read_codec->implementation->iananame);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
////###############################################################"PCMA")==0 || strcmp(read_codec->implementation->iananame,"pcma")==0)
if(strcmp(read_codec->implementation->iananame,
{"8");
strcpy(udp_info->mediaType,
}"PCMU")==0 || strcmp(read_codec->implementation->iananame,"pcmu")==0)
else if(strcmp(read_codec->implementation->iananame,
{"0");
strcpy(udp_info->mediaType,
}
else
{"[[mediabugudp_callback-->SWITCH_ABC_TYPE_INIT]]read_codec is[%s] neither PCMA nor PCMU,mediabug will end!\n",
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
read_codec->implementation->iananame);
return SWITCH_FALSE;
}
if(socket_init(udp_info) < 0){"mediabugudp function start and socket created failed!\n");
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR,
return SWITCH_FALSE;
}
udp_info->seq=0;
time(&udp_info->timestamp);
memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));"SWITCH_ABC_TYPE_INIT");
strcpy(udp_info->udp_key,
////###############################################################
}
break;
case SWITCH_ABC_TYPE_READ:
{
if (switch_core_media_bug_read(bug, &frame, SWITCH_FALSE) != SWITCH_STATUS_FALSE) {
////###############################################################
udp_info->seq++;
time(&udp_info->timestamp);
memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));"SWITCH_ABC_TYPE_READ");
strcpy(udp_info->udp_key,"[[SWITCH_ABC_TYPE_READ]]:data_len=[%d],rate[%d],channels[%d],samples[%d],seq[%lld],timestamp[%ld]\n",
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
// frame.datalen,frame.rate,frame.channels,frame.samples,udp_info->seq,udp_info->timestamp);
socket_send_framedata(udp_info,&frame);
////##############################################################
}
}
break;
case SWITCH_ABC_TYPE_READ_REPLACE:
{
//frame = *switch_core_media_bug_get_read_replace_frame(bug);"[[mediabugudp_callback-->SWITCH_ABC_TYPE_READ_REPLACE]]: data_len=[%d], rate[%d], channels[%d], samples[%d] \n",
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,
// frame.datalen, frame.rate, frame.channels, frame.samples);
}
break;
case SWITCH_ABC_TYPE_WRITE_REPLACE:
{
//frame = *switch_core_media_bug_get_write_replace_frame(bug);"[[mediabugudp_callback-->SWITCH_ABC_TYPE_WRITE_REPLACE]]: data_len=[%d], rate[%d], channels[%d], samples[%d] \n",
//switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO,
// frame.datalen, frame.rate, frame.channels, frame.samples);
}
break;
case SWITCH_ABC_TYPE_CLOSE:
{"[[mediabugudp_callback-->SWITCH_ABC_TYPE_CLOSE]],uuid_in[%s]|uuid_this[%s]\n",uuid_1,uuid_2);
switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,
////##################################################################
udp_info->seq=0;
time(&udp_info->timestamp);
memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));"SWITCH_ABC_TYPE_CLOSE");
strcpy(udp_info->udp_key,
usleep(10000);//sleep 10 毫秒
socket_close(udp_info);
////#################################################################
}
break;
default:
break;
}
return SWITCH_TRUE; }
咨询了业界大佬鼎鼎通(江湖上神一样的存在,自己开个公司,自己玩,据说年入 300-500 个)SWITCH_ABC_TYPE_READ_REPLACE 会导致 cpu 2 次调用影响性能,我还没做压测,先按照大佬的建议写吧:
mediabug 中直接获取的 frame->data 是 L16 的 PCM,无压缩的,一般的 ASR 是直接直接可以识别的,而且效果更好,但是我司的一堆科学家搞得 ASR 就必须是 PCMA 或 PCMU,所以就从 switch_pcm.c 中抄了一个转码函数:
static int L16Encode_To_G711(
char *targetpcm, //PCMA or PCMU
void *decoded_data, //要解码的buffer指针
uint32_t decoded_data_len, //要解码buffer长度=320
void *encoded_data, //输出的alaw/ulaw的buffer指针
uint32_t *encoded_data_len //输出的alaw/ulau的buffer长度
)
{
short *dbuf;
unsigned char *ebuf;
uint32_t i;
dbuf = (short *)decoded_data;
ebuf = (unsigned char *)encoded_data;
if(strcmp(targetpcm,"PCMA")==0 || strcmp(targetpcm,"pcma")==0)
{
for (i = 0; i < decoded_data_len / sizeof(short); i++)
ebuf[i] = linear_to_alaw(dbuf[i]);
}
else if (strcmp(targetpcm,"PCMU")==0 || strcmp(targetpcm,"pcmu")==0)
{
for (i = 0; i < decoded_data_len / sizeof(short); i++)
ebuf[i] = linear_to_ulaw(dbuf[i]);
}
else
return -1;
*encoded_data_len = i;
return 0;
}
感觉有点粗暴,但是能用,就先这么着吧,问了杜老师也说能用就行:
我在想这个 payload 为嘛起这个名, iana 安娜是否是原代码开发者的媳妇或小姨子吧!
websocket 就不描述了,很简单,只是 lightwebsocket client 的 close 有个坑,等着 server 端发 bye 消息,不能自己中断,我这么粗暴的人怎么忍得了这个,改下:light_websocket_client.cpp
int CWebsocket::close() {
int ret = LIGHTWS_NO_ERR;
if ((m_ws_state == CLOSING) || (m_ws_state == CLOSED)) {
return LIGHTWS_NO_ERR;
}
m_ws_state = CLOSING;
// last 4 bytes are a masking key
uint8_t closeFrame[6] = {0x88, 0x80, 0x00, 0x00, 0x00, 0x00};
std::vector<uint8_t> header(closeFrame, closeFrame+6);
m_txbuf.insert(m_txbuf.end(), header.begin(), header.end());
//add close(socket)
closesocket(m_sock_fd);
m_ws_state = CLOSED;
//上面2句是加的,强制关闭socket
return ret;
}
[root@anhaibo002 mod_mediabugwebsocket]# more Makefile
FREESWITCH_MOD_PATH=/usr/local/freeswitch/mod
FREESWITCH_INCLUDE=/usr/local/freeswitch/include/freeswitch
ROCKET_WEBSOCKET_INCLUDE=./
MODNAME = mod_mediabugwebsocket.so
MODOBJ = mod_mediabugwebsocket.o light_websocket_client.o
MODCFLAGS = -Wall -Wno-unused-function -I$(FREESWITCH_INCLUDE) -I$(ROCKET_WEBSOCKET_INCLUDE)
#MODLDFLAGS = -lfreeswitch
CC = g++
CFLAGS = -fPIC -g -ggdb $(MODCFLAGS)
CPPFLAGS = -fPIC -std=c++11 -g -ggdb $(MODCFLAGS)
LDFLAGS = $(MODLDFLAGS)
LDLIBS = -lstdc++
.PHONY: all
all: $(MODNAME)
$(MODNAME): $(MODOBJ)
@$(CC) -shared $(CPPFLAGS) -o $@ $(MODOBJ) $(LDFLAGS)
.c.o: $<
@$(CC) $(CFLAGS) -o $@ -c $<
.PHONY: clean
clean:
rm -f $(MODNAME) $(MODOBJ)
install: $(MODNAME)
install -d $(FREESWITCH_MOD_PATH)
install $(MODNAME) $(FREESWITCH_MOD_PATH)
light_websocket_client.o: light_websocket_client.cpp light_websocket_client.hpp
freeswitch@ivrin> reload mod_mediabugwebsocket
+OK Reloading XML
+OK module unloaded
+OK module loaded
2023-02-02 23:10:44.795247 [INFO] switch_xml.c:1373 No files to include at /usr/local/freeswitch/conf/autoload_configs/../sip_profiles/external/*.xml
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:1263 Deleting Application 'mediabugwebsocket_start'
2023-02-02 23:10:44.835247 [DEBUG] switch_loadable_module.c:1265 Write lock interface 'mediabugwebsocket_start' to wait for existing references.
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:1263 Deleting Application 'mediabugwebsocket_stop'
2023-02-02 23:10:44.835247 [DEBUG] switch_loadable_module.c:1265 Write lock interface 'mediabugwebsocket_stop' to wait for existing references.
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:2396 Stopping: mod_mediabugwebsocket
2023-02-02 23:10:44.835247 [WARNING] mod_mediabugwebsocket.cpp:99 mediabugudp shutdown
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:2416 mod_mediabugwebsocket unloaded.
2023-02-02 23:10:44.835247 [WARNING] mod_mediabugwebsocket.cpp:88 mediabugudp mod loaded version[20230202_v1.08].
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:1803 Successfully Loaded [mod_mediabugwebsocket]
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:350 Adding Application 'mediabugwebsocket_start'
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:350 Adding Application 'mediabugwebsocket_stop'
extension name="mod_websocket1004">
<condition field="destination_number" expression="^(1004)$">
<action application="export" data="nolocal:execute_on_answer=lua mediabug/mediabugwebsocket.lua ws://liveasr.test.xxx.com/websocket? 26544309 800196 inbound"/>
<action application="bridge" data="user/$1"/>
<condition>
</extension>
</
extension name="9xxxxxxxxxxx">
<condition field="destination_number" expression="^9(\d{11})$">
<action application="export" data="nolocal:execute_on_answer=lua mediabug/mediabugwebsocket.lua ws://liveasr.test.xxx.com/websocket? 26544309 800196 outbound"/>
<action application="bridge" data="{origination_caller_id_number=0571xxxxxx}sofia/external/135xxxxxxxx@1.1.1.1:5060"/>
<condition>
</extension> </
中间的 lua 这么写:(两个 leg 分别执行)
cmd="uuid_broadcast "..uuid_this.." mediabugwebsocket_start::'"..ws_url.." "..uuid_this.." "..userCode.." "..userExt.." "..direct_this.."'".." aleg";
ddd_debug(cmd);
api:executeString(cmd);
cmd="uuid_broadcast "..uuid_this.." mediabugwebsocket_start::'"..ws_url.." "..uuid_this.." "..userCode.." "..userExt.." "..direct_that.."'".." bleg";
ddd_debug(cmd);
api:executeString(cmd);