广告 《大道至简,给所有人看的编程课》 🔥

《FreeSWITCH案例大全》

1.15 FreeSWITCH 用 websocket 发送 mediabug 语音流到 ASRProxy 实现实时质检和坐席辅助

anhaibo/2023-02

实时质检或坐席辅助系统设计的一般原理是将客户和坐席通话的两轨实时语音流送到 ASR 识别后经 NLU 意图识别等完成。一般套装软件如 Avaya,Genesys 等使用网络镜像方式抓取语音包,FreeSWITCH 因为是开源系统一般使用 media_bug 旁路语音流的方式抓取语音包再进行后续处理。因为使用公司自研的 ASR,需要通过 socket udp 或 websocket 方式将语音流发送到 ASR Proxy 再将识别/NLU 后的文字结果回传坐席或质检员。如果是类似阿里的 ASR 则可以直接在 FreeSWITCH 本机中调用阿里的 nlusdk 进行识别,本文介绍 ASR Proxy 模式。

系统架构如下:

还是比较简单的就 3 个组件:FreeSWITCH, websocketsever(asrproxy),asrserver

下面重点介绍一下怎么写 1 个 FreeSWITCH 的 mod_mediabugwebsocket

(上次写线上 c 程序还是 2003 年的泰康人寿 95522 呼叫中心项目,因为 FreeSWITCH 的 mod 只能用 c/c++实现,所以又需要 baidu/google/知乎/csdn 里找寻相关技能了,好在现在有了 github 翻墙后能找到很多优秀的源码,本文参考了好几个 git 的代码,lightwebsocket client 也是 git 上找的!!!)

基本上是 3 步:

1,mod_load 初始化,自定义 1 个 struct,初始化 websocket 指针等,传递相关参数;

2,media_bug 获取 frame data;并使用 websocket 将每个 frame data 送到 websocket server;

3,关闭 websocket,mod_shutdown.

下面详细介绍如上部分:

1.15.1 mod_load 初始化,自定义 1 个 struct,初始化 websocket 指针等,传递相关参数

mod 的框架都是一样的,可以参考 FreeSWITCH 自带的#src#/mod/sdk/autotools/src/mod_example.c

或者杜老师的 mod_aac:https://github.com/rts-cn/mod_aac

1.15.1.1 相关头文件

#include <switch.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>
#include <assert.h>
#include <string>
#include <g711.h>
//github上下载的1个websocketclient,不过close那里有个坑,后面我说下怎么fix:
#include "light_websocket_client.hpp"   //链接:https://github.com/hank4187yan/light-websocket-client
using namespace lightws;

1.15.1.2 定义 1 个 struct 传参数

typedef struct {
CWebsocket*        ws_cli;
char               ws_url[200];  //websocket url
char               uuid[40];     //通话UUID,用a-leg
char               userCode[20]; //坐席工号
char               userExt[20];  //坐席分机号
char               direct[2];    //语音流向 1:客户 2:坐席
char               mediaType[2]; //媒体类型 0:PCMU 8:PCMA
char               udp_key[30];  //SWITCH_ABC_TYPE_INIT/SWITCH_ABC_TYPE_CLOSE/SWITCH_ABC_TYPE_READ
unsigned long long seq;          //frame序号,方便调试
time_t             timestamp;    //frame 时间戳
#ifdef  _TEMP_LOCALFILE
FILE               *fp;          //调试用的临时本地文件,看看取的包对不对
#endif
} udp_info_t;

1.15.1.3 module and callback 函数

SWITCH_MODULE_LOAD_FUNCTION(mod_mediabugwebsocket_load);
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mediabugwebsocket_shutdown);
//模块定义,分别是模块加载、模块卸载
SWITCH_MODULE_DEFINITION(mod_mediabugwebsocket, mod_mediabugwebsocket_load, mod_mediabugwebsocket_shutdown, NULL);
//启动-停止函数
SWITCH_STANDARD_APP(mediabugudp_start_function);
SWITCH_STANDARD_APP(mediabugudp_stop_function);
//mediabug回调函数
static switch_bool_t mediabugudp_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type);
//websocket的3个函数
static int  socket_init(udp_info_t *udp_info);
static void socket_send_framedata(udp_info_t *udp_info,switch_frame_t *frame);
static void socket_close(udp_info_t *udp_info);

1.15.1.4 相关模块

//模块加载
SWITCH_MODULE_LOAD_FUNCTION(mod_mediabugwebsocket_load)
{
    switch_application_interface_t *app_interface;
    *module_interface = switch_loadable_module_create_module_interface(pool, modname);
    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "mediabugudp mod loaded version[20230202_v1.08].\n");
    SWITCH_ADD_APP(app_interface, "mediabugwebsocket_start", "mediabugwebsocket", "mediabugwebsocket", mediabugudp_start_function, "", SAF_NONE);
    SWITCH_ADD_APP(app_interface, "mediabugwebsocket_stop" , "mediabugwebsocket", "mediabugwebsocket", mediabugudp_stop_function , "", SAF_NONE);
    return SWITCH_STATUS_SUCCESS;
}
//模块卸载
SWITCH_MODULE_SHUTDOWN_FUNCTION(mod_mediabugwebsocket_shutdown)
{
    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "mediabugudp shutdown\n");
    return SWITCH_STATUS_SUCCESS;
}
//mediabugudp 执行函数
SWITCH_STANDARD_APP(mediabugudp_start_function)
{
    switch_media_bug_t *bug;
    switch_status_t status;
    switch_channel_t *channel;
    udp_info_t *udp_info;
    //传入参数
    int argc;
    char *lbuf;
    char *argv[5] = { 0 };
    lbuf = switch_core_session_strdup(session, data);
    argc=switch_separate_string(lbuf, ' ', argv, (sizeof(argv) / sizeof(argv[0])));
    if( zstr(data) || (lbuf==NULL) || (argc < 5) )
    {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[[mediabugudp_start_function]]argv error!!!\n");
        return;
    }
    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING,"mediabugudp_start_function!\n");
    udp_info = (udp_info_t *)malloc(sizeof(udp_info_t));  //动态分配一块内存,保存websocket,fp等指针;
    if(udp_info==NULL) return;
    //传参操作
    udp_info->ws_cli=NULL;
    memset(udp_info->ws_url,0x00,sizeof(udp_info->ws_url));
    memset(udp_info->uuid,0x00,sizeof(udp_info->uuid));
    memset(udp_info->userCode,0x00,sizeof(udp_info->userCode));
    memset(udp_info->userExt,0x00,sizeof(udp_info->userExt));
    memset(udp_info->direct,0x00,sizeof(udp_info->direct));
    memset(udp_info->mediaType,0x00,sizeof(udp_info->mediaType));
    memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));
    strcpy(udp_info->ws_url,argv[0]);
    strcpy(udp_info->uuid,argv[1]);
    strcpy(udp_info->userCode,argv[2]);
    strcpy(udp_info->userExt,argv[3]);
    strcpy(udp_info->direct,argv[4]);
    udp_info->seq=0;
    time(&udp_info->timestamp);
    //临时文件操作
#ifdef  _TEMP_LOCALFILE
    char filename[256];
    memset(filename,0x00,sizeof(filename));
    sprintf(filename,"/usr/local/freeswitch/recordings/%ld_%s_%s_%s.pcm",udp_info->timestamp,udp_info->uuid,udp_info->userCode,udp_info->direct);
    udp_info->fp=fopen(filename,"ab+");
    switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "filename=[%s][%p]\n",filename,udp_info->fp);
#endif
    channel = switch_core_session_get_channel(session);
    if (session == NULL)
        return;
    //Has the media bug started?
    bug = (switch_media_bug_t *) switch_channel_get_private(channel, "_mediabugudp_");
    if (bug != NULL)
    {
        switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_WARNING, "[[mediabugudp_start_function]] bug is already running,retun!!!\n");
        return;
    }
    //添加media bug
    status   = switch_core_media_bug_add(session, "_mediabugudp_", 0, mediabugudp_callback, udp_info, 0,
                                         SMBF_READ_STREAM|SWITCH_ABC_TYPE_READ_REPLACE|SMBF_NO_PAUSE,&bug);
    if (status != SWITCH_STATUS_SUCCESS) {
            switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "[[mediabugudp_start_function]] bug add failed!\n");
            return;
    }
    //设置变量, 用于检测重复
    switch_channel_set_private(channel, "_mediabugudp_", bug);
    return;
}
SWITCH_STANDARD_APP(mediabugudp_stop_function)   //其实这个函数调用不到,电话挂机后bug,channel,session都会自动销毁
{
    switch_media_bug_t *bug;
    switch_channel_t *channel;
    if (session == NULL)
        return;
    channel = switch_core_session_get_channel(session);
    if (switch_channel_get_private(channel, "_mediabugudp_"))
    {
        bug = (switch_media_bug_t *) switch_channel_get_private(channel, "_mediabugudp_");
        switch_channel_set_private(channel, "_mediabugudp_", NULL);
        switch_core_media_bug_remove(session, &bug); // 移除 media bug
    }
    switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[[mediabugudp stop function]]\n");
}

1.15.2 callback 函数

static switch_bool_t mediabugudp_callback(switch_media_bug_t *bug, void *user_data, switch_abc_type_t type)
{
    char uuid_1[80],uuid_2[80];
    switch_channel_t *channel;
    switch_core_session_t * session;
    switch_codec_t *read_codec;
    udp_info_t *udp_info = (udp_info_t *)user_data;
    uint8_t data[SWITCH_RECOMMENDED_BUFFER_SIZE];
    switch_frame_t frame = { 0 };
    frame.data = data;
    frame.buflen = sizeof(data);
    session = switch_core_media_bug_get_session(bug);
    channel= switch_core_session_get_channel(session);
    memset(uuid_1,0x00,sizeof(uuid_1));
    memset(uuid_2,0x00,sizeof(uuid_2));
    strcpy(uuid_1,udp_info->uuid);
    strcpy(uuid_2,switch_channel_get_variable(channel,"uuid"));
    switch (type) {
        case SWITCH_ABC_TYPE_INIT:
        {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[[mediabugudp_callback-->SWITCH_ABC_TYPE_INIT]] init,|uuid_in[%s]|uuid_this[%s]\n",uuid_1,uuid_2);
                read_codec = switch_core_session_get_read_codec(session);
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "read_codec=[%s]!\n",read_codec->implementation->iananame);
        ////###############################################################
                if(strcmp(read_codec->implementation->iananame,"PCMA")==0 || strcmp(read_codec->implementation->iananame,"pcma")==0)
        {
            strcpy(udp_info->mediaType,"8");
                }
                else if(strcmp(read_codec->implementation->iananame,"PCMU")==0 || strcmp(read_codec->implementation->iananame,"pcmu")==0)
        {
            strcpy(udp_info->mediaType,"0");
        }
                else
        {
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "[[mediabugudp_callback-->SWITCH_ABC_TYPE_INIT]]read_codec is[%s] neither PCMA nor PCMU,mediabug will end!\n",
                                          read_codec->implementation->iananame);
                return SWITCH_FALSE;
        }
                if(socket_init(udp_info) < 0){
                switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_ERROR, "mediabugudp function start and socket created failed!\n");
                return SWITCH_FALSE;
        }
                udp_info->seq=0;
                time(&udp_info->timestamp);
                memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));
                strcpy(udp_info->udp_key,"SWITCH_ABC_TYPE_INIT");
        ////###############################################################
        }
        break;
    case SWITCH_ABC_TYPE_READ:
        {
            if (switch_core_media_bug_read(bug, &frame, SWITCH_FALSE) != SWITCH_STATUS_FALSE) {
        ////###############################################################
                udp_info->seq++;
                time(&udp_info->timestamp);
                memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));
                strcpy(udp_info->udp_key,"SWITCH_ABC_TYPE_READ");
                //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[[SWITCH_ABC_TYPE_READ]]:data_len=[%d],rate[%d],channels[%d],samples[%d],seq[%lld],timestamp[%ld]\n",
                //                   frame.datalen,frame.rate,frame.channels,frame.samples,udp_info->seq,udp_info->timestamp);
        socket_send_framedata(udp_info,&frame);
                ////##############################################################
             }
        }
    break;
    case SWITCH_ABC_TYPE_READ_REPLACE:
        {
            //frame = *switch_core_media_bug_get_read_replace_frame(bug);
        //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[[mediabugudp_callback-->SWITCH_ABC_TYPE_READ_REPLACE]]: data_len=[%d], rate[%d], channels[%d], samples[%d] \n",
        //              frame.datalen, frame.rate, frame.channels, frame.samples);
        }
    break;
        case SWITCH_ABC_TYPE_WRITE_REPLACE:
        {
            //frame = *switch_core_media_bug_get_write_replace_frame(bug);
        //switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_INFO, "[[mediabugudp_callback-->SWITCH_ABC_TYPE_WRITE_REPLACE]]: data_len=[%d], rate[%d], channels[%d], samples[%d] \n",
        //              frame.datalen, frame.rate, frame.channels, frame.samples);
    }
        break;
    case SWITCH_ABC_TYPE_CLOSE:
    {
        switch_log_printf(SWITCH_CHANNEL_LOG, SWITCH_LOG_WARNING, "[[mediabugudp_callback-->SWITCH_ABC_TYPE_CLOSE]],uuid_in[%s]|uuid_this[%s]\n",uuid_1,uuid_2);
        ////##################################################################
        udp_info->seq=0;
                time(&udp_info->timestamp);
                memset(udp_info->udp_key,0x00,sizeof(udp_info->udp_key));
                strcpy(udp_info->udp_key,"SWITCH_ABC_TYPE_CLOSE");
                usleep(10000);//sleep 10 毫秒
        socket_close(udp_info);
                ////#################################################################
        }
        break;
        default:
                break;
    }
    return SWITCH_TRUE;
}

1.15.2.1 为什么用 SWITCH_ABC_TYPE_READ 不用 SWITCH_ABC_TYPE_READ_REPLACE

咨询了业界大佬鼎鼎通(江湖上神一样的存在,自己开个公司,自己玩,据说年入 300-500 个)SWITCH_ABC_TYPE_READ_REPLACE 会导致 cpu 2 次调用影响性能,我还没做压测,先按照大佬的建议写吧:

1.15.2.2 关于 L16,PCMA,PCMU

mediabug 中直接获取的 frame->data 是 L16 的 PCM,无压缩的,一般的 ASR 是直接直接可以识别的,而且效果更好,但是我司的一堆科学家搞得 ASR 就必须是 PCMA 或 PCMU,所以就从 switch_pcm.c 中抄了一个转码函数:

static int L16Encode_To_G711(
 char *targetpcm,                    //PCMA or PCMU
 void *decoded_data,                  //要解码的buffer指针
 uint32_t decoded_data_len,           //要解码buffer长度=320
 void *encoded_data,                  //输出的alaw/ulaw的buffer指针
 uint32_t *encoded_data_len          //输出的alaw/ulau的buffer长度
 )
{
    short *dbuf;
    unsigned char *ebuf;
    uint32_t i;
    dbuf = (short *)decoded_data;
    ebuf = (unsigned char *)encoded_data;
        if(strcmp(targetpcm,"PCMA")==0 || strcmp(targetpcm,"pcma")==0)
    {
        for (i = 0; i < decoded_data_len / sizeof(short); i++)
            ebuf[i] = linear_to_alaw(dbuf[i]);
    }
        else if (strcmp(targetpcm,"PCMU")==0 || strcmp(targetpcm,"pcmu")==0)
        {
        for (i = 0; i < decoded_data_len / sizeof(short); i++)
            ebuf[i] = linear_to_ulaw(dbuf[i]);
        }
    else
        return -1;
        *encoded_data_len = i;
    return 0;
}

感觉有点粗暴,但是能用,就先这么着吧,问了杜老师也说能用就行:

1.15.2.3 frame 里的 codec 的名字是这个:read_codec->implementation->iananame

我在想这个 payload 为嘛起这个名, iana 安娜是否是原代码开发者的媳妇或小姨子吧!

1.15.3 lightwebsocket client

websocket 就不描述了,很简单,只是 lightwebsocket client 的 close 有个坑,等着 server 端发 bye 消息,不能自己中断,我这么粗暴的人怎么忍得了这个,改下:light_websocket_client.cpp

int CWebsocket::close() {
    int ret = LIGHTWS_NO_ERR;
    if ((m_ws_state == CLOSING) || (m_ws_state == CLOSED)) {
        return LIGHTWS_NO_ERR;
    }
    m_ws_state = CLOSING;
    // last 4 bytes are a masking key
    uint8_t closeFrame[6] = {0x88, 0x80, 0x00, 0x00, 0x00, 0x00};
    std::vector<uint8_t> header(closeFrame, closeFrame+6);
    m_txbuf.insert(m_txbuf.end(), header.begin(), header.end());
    //add close(socket)
    closesocket(m_sock_fd);
    m_ws_state = CLOSED;
    //上面2句是加的,强制关闭socket
    return ret;
}

1.15.4 编译 mod

[root@anhaibo002 mod_mediabugwebsocket]# more Makefile
FREESWITCH_MOD_PATH=/usr/local/freeswitch/mod
FREESWITCH_INCLUDE=/usr/local/freeswitch/include/freeswitch
ROCKET_WEBSOCKET_INCLUDE=./
MODNAME = mod_mediabugwebsocket.so
MODOBJ = mod_mediabugwebsocket.o light_websocket_client.o
MODCFLAGS = -Wall -Wno-unused-function -I$(FREESWITCH_INCLUDE) -I$(ROCKET_WEBSOCKET_INCLUDE)
#MODLDFLAGS = -lfreeswitch
CC = g++
CFLAGS = -fPIC -g -ggdb  $(MODCFLAGS)
CPPFLAGS = -fPIC -std=c++11 -g -ggdb  $(MODCFLAGS)
LDFLAGS = $(MODLDFLAGS)
LDLIBS = -lstdc++
.PHONY: all
all: $(MODNAME)
$(MODNAME): $(MODOBJ)
    @$(CC) -shared $(CPPFLAGS) -o $@ $(MODOBJ) $(LDFLAGS)
.c.o: $<
    @$(CC) $(CFLAGS) -o $@ -c $<
.PHONY: clean
clean:
    rm -f $(MODNAME) $(MODOBJ)
install: $(MODNAME)
    install -d $(FREESWITCH_MOD_PATH)
    install $(MODNAME) $(FREESWITCH_MOD_PATH)
light_websocket_client.o:   light_websocket_client.cpp      light_websocket_client.hpp

1.15.5 freeswitch fs_cli 中加载

freeswitch@ivrin> reload mod_mediabugwebsocket
+OK Reloading XML
+OK module unloaded
+OK module loaded
2023-02-02 23:10:44.795247 [INFO] switch_xml.c:1373 No files to include at /usr/local/freeswitch/conf/autoload_configs/../sip_profiles/external/*.xml
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:1263 Deleting Application 'mediabugwebsocket_start'
2023-02-02 23:10:44.835247 [DEBUG] switch_loadable_module.c:1265 Write lock interface 'mediabugwebsocket_start' to wait for existing references.
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:1263 Deleting Application 'mediabugwebsocket_stop'
2023-02-02 23:10:44.835247 [DEBUG] switch_loadable_module.c:1265 Write lock interface 'mediabugwebsocket_stop' to wait for existing references.
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:2396 Stopping: mod_mediabugwebsocket
2023-02-02 23:10:44.835247 [WARNING] mod_mediabugwebsocket.cpp:99 mediabugudp shutdown
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:2416 mod_mediabugwebsocket unloaded.
2023-02-02 23:10:44.835247 [WARNING] mod_mediabugwebsocket.cpp:88 mediabugudp mod loaded version[20230202_v1.08].
2023-02-02 23:10:44.835247 [CONSOLE] switch_loadable_module.c:1803 Successfully Loaded [mod_mediabugwebsocket]
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:350 Adding Application 'mediabugwebsocket_start'
2023-02-02 23:10:44.835247 [NOTICE] switch_loadable_module.c:350 Adding Application 'mediabugwebsocket_stop'

1.15.6 dialplan 里调用方式

  <extension name="mod_websocket1004">
    <condition field="destination_number" expression="^(1004)$">
        <action application="export" data="nolocal:execute_on_answer=lua mediabug/mediabugwebsocket.lua ws://liveasr.test.xxx.com/websocket? 26544309 800196 inbound"/>
        <action application="bridge" data="user/$1"/>
    </condition>
  </extension>

  <extension name="9xxxxxxxxxxx">
    <condition field="destination_number" expression="^9(\d{11})$">
        <action application="export" data="nolocal:execute_on_answer=lua mediabug/mediabugwebsocket.lua ws://liveasr.test.xxx.com/websocket? 26544309 800196 outbound"/>
        <action application="bridge" data="{origination_caller_id_number=0571xxxxxx}sofia/external/135xxxxxxxx@1.1.1.1:5060"/>
    </condition>
  </extension>

中间的 lua 这么写:(两个 leg 分别执行)

cmd="uuid_broadcast "..uuid_this.." mediabugwebsocket_start::'"..ws_url.." "..uuid_this.." "..userCode.." "..userExt.." "..direct_this.."'".." aleg";
ddd_debug(cmd);
api:executeString(cmd);

cmd="uuid_broadcast "..uuid_this.." mediabugwebsocket_start::'"..ws_url.." "..uuid_this.." "..userCode.." "..userExt.." "..direct_that.."'".." bleg";
ddd_debug(cmd);
api:executeString(cmd);

1.15.7 效果



本书版权所有 © 杜金房及各位贡献者 2016-2023,仅供在线阅读,谢绝一切形式转载。 本书还在写作中,持续更新。 如果你也想写上几句,欢迎加入我们。 | 返回首页 |