【问题标题】:websocket relay with Autobahn python带有高速公路 python 的 websocket 中继
【发布时间】:2021-04-23 16:21:18
【问题描述】:

我正在尝试使用 Autobahn python 构建一个 websocket 服务器,该服务器充当 IBM Watson 文本到语音服务的中间人或中继。我已经设法使用队列从客户端接收流式音频并将其转发到 Watson,并且我正在从 Watson 接收回转录假设作为 JSON 数据到我的服务器,但我不确定如何转发该 JSON 数据到客户端。似乎 Watson 转录端回调和 Autobahn 客户端回调独立存在,我无法从另一个回调中的一个回调调用例程,也无法从另一个回调中的一个回调访问数据。

我需要设置某种共享短信队列吗?我确信这应该是一些简单的事情,但我认为问题可能是我对“self”关键字缺乏理解,这似乎将这两个例程隔离开来。也将感谢任何有关理解“自我”的资源。

# For Watson
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For Autobahn
from autobahn.twisted.websocket import WebSocketServerProtocol, \
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue, Full
except ImportError:
    from queue import Queue, Full

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretapikeycanttellyou')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self, transcript):
        print(transcript)

    def on_connected(self):
        print('Connection was successful')

    def on_error(self, error):
        print('Error received: {}'.format(error))

    def on_inactivity_timeout(self, error):
        print('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self, hypothesis):
        print(hypothesis)
        #self.sendMessage(hypothesis, isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_data(self, data):
        print(data)
        #self.sendMessage(data, isBinary = false)
        # HOW TO FORWARD THIS TO CLIENT?

    def on_close(self):
        print("Connection closed")

# define callback for client-side websocket in Autobahn
class MyServerProtocol(WebSocketServerProtocol):

    def onConnect(self, request):
        print("Client connecting: {0}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")
        recognize_thread = Thread(target=recognize_using_weboscket, args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onMessage(self, payload, isBinary):
        if isBinary:
            # put audio in queue
            q.put(payload)
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

        # echo back message verbatim
        self.sendMessage(payload, isBinary)
    
    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))
        
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,
                                             content_type='audio/l16; rate=16000',
                                             recognize_callback=mycallback,
                                             interim_results=True)

if __name__ == '__main__':

    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001, factory)
    reactor.run()

看来我需要弥合MyRecognizeCallback()MyServerProtocol() 之间的差距。也请让我知道这是否是我想要完成的一个糟糕的实现。我知道有更简单的方法来中继 websocket 数据,但我想熟悉 websocket API/音频流和文本消息,因为最终我想从等式中删除 Watson 并使用我自己的转录算法。

【问题讨论】:

    标签: python json audio websocket autobahn


    【解决方案1】:

    根据here 的答案,似乎我从main 调用MyServerProtocol().sendMessage(u"this is a message2".encode('utf8')) 的努力实际上是在创建一个新的、不相关的MyServerProtocol 实例,而不是通过管道将消息传递到现有连接中。我能够使用here 描述的方法将新消息发送到打开的 websocket 连接。

    这是我的最终代码,还需要一些工作,但相关定义是broadcast_message。还需要自己“订阅”websocket onConnect 和“取消订阅”onClose 才能使此方法起作用:

    from ibm_watson import SpeechToTextV1
    from ibm_watson.websocket import RecognizeCallback, AudioSource
    from threading import Thread
    from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
    # For autobahn
    import json
    from autobahn.twisted.websocket import WebSocketServerProtocol, \
        WebSocketServerFactory
    from twisted.internet import reactor
    
    try:
        from Queue import Queue, Full
    except ImportError:
        from queue import Queue, Full
    
    ###############################################
    #### Initalize queue to store the recordings ##
    ###############################################
    CHUNK = 1024
    # Note: It will discard if the websocket client can't consumme fast enough
    # So, increase the max size as per your choice
    BUF_MAX_SIZE = CHUNK * 10
    # Buffer to store audio
    q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
    # Create an instance of AudioSource
    audio_source = AudioSource(q, True, True)
    
    ###############################################
    #### Prepare Speech to Text Service ########
    ###############################################
    
    # initialize speech to text service
    authenticator = IAMAuthenticator('secretapikey')
    speech_to_text = SpeechToTextV1(authenticator=authenticator)
    
    # define callback for the speech to text service
    class MyRecognizeCallback(RecognizeCallback):
        def __init__(self):
            RecognizeCallback.__init__(self)
    
        def on_transcription(self, transcript):
            # Forward to client
            MyServerProtocol.broadcast_message(transcript)
    
        def on_connected(self):
            print('Connection was successful')
    
        def on_error(self, error):
            # Forward to client
            MyServerProtocol.broadcast_message('Error received: {}'.format(error))
    
        def on_inactivity_timeout(self, error):
            # Forward to client
            MyServerProtocol.broadcast_message('Inactivity timeout: {}'.format(error))
    
        def on_listening(self):
            print('Service is listening')
    
        def on_hypothesis(self, hypothesis):
            # Forward to client
            MyServerProtocol.broadcast_message(hypothesis)
    
        def on_data(self, data):
            # Forward to client
            MyServerProtocol.broadcast_message(data)
    
        def on_close(self):
            print("Connection closed")
            MyServerProtocol.broadcast_message("Connection closed")
    
    class MyServerProtocol(WebSocketServerProtocol):
        connections = list()
    
        def onConnect(self, request):
            print("Client connecting: {0}".format(request.peer))
            self.connections.append(self)
            # Start recognizer on connection
            recognize_thread = Thread(target=recognize_using_weboscket, args=())
            recognize_thread.daemon = True
            recognize_thread.start()
    
        def onOpen(self):
            print("WebSocket connection open.")
    
        def onMessage(self, payload, isBinary):
            if isBinary:
                # Put incoming audio into the queue
                try:
                    q.put(payload)
                except Full:
                    pass # discard
            else:
                print("Text message received: {0}".format(payload.decode('utf8')))
    
        @classmethod
        def broadcast_message(cls, data):
            payload = json.dumps(data, ensure_ascii = False).encode('utf8')
            for c in set(cls.connections):
                reactor.callFromThread(cls.sendMessage, c, payload)
          
    
        def onClose(self, wasClean, code, reason):
            print("WebSocket connection closed: {0}".format(reason))
            self.connections.remove(self)
      
    ## this function will initiate the recognize service and pass in the AudioSource
    def recognize_using_weboscket(*args):
        mycallback = MyRecognizeCallback()
        speech_to_text.recognize_using_websocket(audio=audio_source,
                                                content_type='audio/l16; rate=16000',
                                                recognize_callback=mycallback,
                                                interim_results=True)
    
    if __name__ == '__main__':
    
    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
        factory.protocol = MyServerProtocol
    
        reactor.listenTCP(9001, factory)
        reactor.run()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多