【问题标题】:Using Cowboy Websocket Client for Testing with Elixir使用 Cowboy Websocket 客户端使用 Elixir 进行测试
【发布时间】:2019-11-04 03:25:42
【问题描述】:

首先,完全缺乏 Cowboy 的文档,尤其是 Websockets,但总的来说,一旦解密,它就可以很好地使用。然后将这些信息从 Erlang 获取到 Elixir 是另一个步骤。感谢this post by 7stud,我能够获得一个功能正常的 websocket 用于测试目的,但我无法让它同时收听和选择性地发送消息。我认为这是因为接收阻塞了需要发送的线程,这本质上与 websocket 连接相关联,因此它在等待接收时无法发送。也许这种理解是有缺陷的。我很想得到纠正。我尝试生成无济于事,这就是为什么我认为接收阻塞了 websocket 线程。

def ws do
    localhost = 'localhost'
    path = '/ws/app/1'
    port = 5000

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, conn_pid} = :gun.open(localhost, port, connect_opts)
    IO.inspect(conn_pid, label: "conn_pid")
    {:ok, protocol} = :gun.await_up(conn_pid)
    IO.inspect(protocol, label: "protocol")
    # Set custom header with cookie for device id
    stream_ref = :gun.ws_upgrade(conn_pid, path, [{"cookie", "device_id=1235"}])
    IO.inspect(stream_ref, label: "stream_ref")
    receive do
      {:gun_upgrade, ^conn_pid, ^stream_ref, ["websocket"], headers} ->
              upgrade_success(conn_pid, headers, stream_ref)
      {:gun_response, ^conn_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _conn_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    :ok
  end

  def upgrade_success(conn_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(conn_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")

    IO.inspect(self(), label: "upgrade self")
    # This one runs and message is received
    run_test(conn_pid)
    # This should spawn and therefore not block
    listen(conn_pid, stream_ref)
    # This never runs
    run_test(conn_pid)
  end

  def listen(conn_pid, stream_ref) do
    spawn receive_messages(conn_pid, stream_ref)
  end
  def receive_messages(conn_pid, stream_ref) do
    IO.inspect conn_pid, label: "conn_pid!"
    IO.inspect stream_ref, label: "stream_ref!"
    IO.inspect(self(), label: "self pid")
    receive do
      {:gun_ws, ^conn_pid, ^stream_ref, {:text, msg} } ->
          IO.inspect(msg, label: "Message from websocket server:")
      other_messages ->
        IO.inspect(other_messages, label: "Other messages")
    after 5000 ->
      IO.puts "Receive timed out"
    end
    receive_messages(conn_pid, stream_ref)
  end

  def send_message(message, conn_pid) do
    :gun.ws_send(conn_pid, {:text, message})
  end

  def run_test(conn_pid) do
    IO.puts "Running test"
    message = "{\"type\":\"init\",\"body\":{\"device_id\":1234}}"
    send_message(message, conn_pid)
  end

  def stop(conn_pid) do
    :gun.shutdown(conn_pid)
  end

【问题讨论】:

  • 我认为这是因为接收阻塞了需要发送的线程,这本质上与 websocket 连接相关联,因此它在等待接收时无法发送。也许这种理解是有缺陷的。 -- 是的,我认为它一定是有缺陷的,因为这里所说的:ninenines.eu/docs/en/cowboy/2.1/guide/ws_protocol使用Websocket,客户端和服务器都可以随时发送帧,没有任何限制。
  • 我能够让一个正常运行的 websocket 用于测试目的,但我无法让它同时监听和发送消息。 -- @ 是什么意思987654325@那句话指的是什么?服务器还是客户端?
  • @7stud 我已经有一个服务器正在运行。现在我正在尝试获得一个有效的客户。 “服务器”是指等待从各种客户端接收 websocket 连接的程序。
  • 对于任何感兴趣的人我已经完成了这个,如果你正在寻找一个更完整的例子,你可以看看这个 repo。 github.com/GamgeeNL/websocket-client

标签: websocket erlang elixir client


【解决方案1】:

来自gun docs

接收数据

Gun 为每个 Websocket 向 所有者进程 发送 Erlang 消息 它收到的消息。

and:

连接

...

枪连接

...

Gun 连接是一个 Erlang 进程,它管理一个连接到 远程端点。此 Gun 连接由一个用户进程拥有,该用户进程 被称为连接的所有者,并由 枪应用程序的监督树。

所有者进程通过调用与 Gun 连接进行通信 模块枪的功能。所有功能执行各自的 异步操作。 Gun 连接将发送 Erlang 在需要时向所有者进程发送消息。

虽然文档中没有特别提到,但我很确定所有者进程是调用gun:open()的进程。我的尝试还显示所有者进程必须调用gun:ws_send()。换句话说,所有者进程既要向服务器发送消息,又要从服务器接收消息。

以下代码使用gen_server 操作gun,这样gen_server 既向服务器发送消息,又从服务器接收消息。

当gun从牛仔http服务器接收到消息时,gun将消息,即Pid ! Msg,发送到所有者进程。在下面的代码中,gen_serverinit/1 回调中创建了连接,这意味着枪会在gen_server 处接收到来自牛仔的消息(!)。 gen_server 使用handle_info() 处理直接发送到其邮箱的消息。

handle_cast() 中,gen_server 使用 gun 向牛仔发送请求。因为handle_cast() 是异步的,这意味着您可以向cowboy 发送异步消息。而且,当 gun 收到来自牛仔的消息时,gun 将消息发送(!)到 gen_server,然后 gen_server 的 handle_info() 函数处理该消息。在handle_info() 内部,调用gen_server:reply/2 将消息中继到gen_server 客户端。因此,gen_server 客户端可以在想要检查从 gun 发送的服务器消息时跳转到接收子句。

-module(client).
-behavior(gen_server).
-export([start_server/0, send_sync/1, send_async/1, get_message/2, go/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
-export([terminate/2, code_change/3]).  %%% client functions
-export([sender/1]).

%%% client functions
%%%

start_server() ->
    gen_server:start({local, ?MODULE}, ?MODULE, [], []).

send_sync(Requ) ->
    gen_server:call(?MODULE, Requ).

send_async(Requ) -> 
    gen_server:cast(?MODULE, {websocket_request, Requ}).

get_message(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Inside get_message(): Ref = ~w~n", [ClientRef]),
            io:format("Client received gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end.

receive_loop(WebSocketPid, ClientRef) ->
    receive
        {ClientRef, {gun_ws, WebSocketPid, {text, Msg} }} ->
            io:format("Client received Gun message: ~s~n", [Msg]);
        Other ->
            io:format("Client received other message: ~w~n", [Other])
    end,
    receive_loop(WebSocketPid, ClientRef).

go() ->
    {ok, GenServerPid} = start_server(),
    io:format("[ME]: Inside go(): GenServerPid=~w~n", [GenServerPid]),

    [{conn_pid, ConnPid}, {ref, ClientRef}] = send_sync(get_conn_pid),
    io:format("[ME]: Inside go(): ConnPid=~w~n", [ConnPid]),

    ok = send_async("ABCD"),
    get_message(ConnPid, ClientRef),

    spawn(?MODULE, sender, [1]),

    ok = send_async("XYZ"),
    get_message(ConnPid, ClientRef),

    receive_loop(ConnPid, ClientRef).

sender(Count) -> %Send messages to handle_info() every 3 secs
    send_async(lists:concat(["Hello", Count])),
    timer:sleep(3000),
    sender(Count+1).

%%%%%% gen_server callbacks
%%%

init(_Arg) ->
    {ok, {no_client, ws()}}.

handle_call(get_conn_pid, From={_ClientPid, ClientRef}, _State={_Client, WebSocketPid}) ->
    io:format("[ME]: Inside handle_call(): From = ~w~n", [From]),
    {reply, [{conn_pid, WebSocketPid}, {ref, ClientRef}], _NewState={From, WebSocketPid} };
handle_call(stop, _From, State) ->
    {stop, normal, shutdown_ok, State}; %Calls terminate()
handle_call(_Other, _From, State) ->
    {ok, State}.

handle_cast({websocket_request, Msg}, State={_From, WebSocketPid}) ->
    gun:ws_send(WebSocketPid, {text, Msg}), %{text, "It's raining!"}),
    {noreply, State}.

handle_info(Msg, State={From, _WebSocketPid}) ->
    io:format("[ME]: Inside handle_info(): Msg=~w~n", [Msg]),
    gen_server:reply(From, Msg),
    {noreply, State}.

terminate(_Reason, _State={_From, WebSocketPid}) -> 
    gun:shutdown(WebSocketPid).


code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%%% private functions
%%%

ws() ->
    {ok, _} = application:ensure_all_started(gun),
    {ok, ConnPid} = gun:open("localhost", 8080),
    {ok, _Protocol} = gun:await_up(ConnPid),

    gun:ws_upgrade(ConnPid, "/please_upgrade_to_websocket"),

    receive
        {gun_ws_upgrade, ConnPid, ok, Headers} ->
            io:format("[ME]: Inside gun_ws_upgrade receive clause: ~w~n", 
                      [ConnPid]),
            upgrade_success_handler(ConnPid, Headers);
        {gun_response, ConnPid, _, _, Status, Headers} ->
            exit({ws_upgrade_failed, Status, Headers});
        {gun_error, _ConnPid, _StreamRef, Reason} ->
            exit({ws_upgrade_failed, Reason})
    after 1000 ->
        exit(timeout)
    end.


upgrade_success_handler(ConnPid, _Headers) ->
    io:format("[ME]: Inside upgrade_success_handler(): ~w~n", [ConnPid]),  
    ConnPid.

=======

哎呀,下面的答案展示了如何让服务器将数据推送到客户端。

好的,我知道了——在 erlang 中。这个例子有点折磨。你需要做几件事:

1) 需要获取运行websocket_*函数的进程的pid,与请求的pid不一样:

升级后初始化

Cowboy 有单独的进程来处理连接和 要求。因为 Websocket 接管了连接,所以 Websocket 协议处理发生在与请求不同的进程中 处理。

这反映在 Websocket 处理程序具有的不同回调中。 init/2 回调是从临时请求进程中调用的,并且 连接过程中的 websocket_ 回调。

这意味着无法从 init/2 进行某些初始化。 任何需要当前 pid 或绑定到当前 pid,将无法按预期工作。可选的 websocket_init/1 可以是 用于[获取运行 websocket_ 回调的进程的 pid]:

https://ninenines.eu/docs/en/cowboy/2.6/guide/ws_handlers/

这是我使用的代码:

init(Req, State) ->
    {cowboy_websocket, Req, State}.  %Perform websocket setup

websocket_init(State) ->
    io:format("[ME]: Inside websocket_init"),
    spawn(?MODULE, push, [self(), "Hi, there"]),
    {ok, State}.

push(WebSocketHandleProcess, Greeting) ->
    timer:sleep(4000),
    WebSocketHandleProcess ! {text, Greeting}.

websocket_handle({text, Msg}, State) ->
    timer:sleep(10000), %Don't respond to client request just yet.
    {
     reply, 
     {text, io_lib:format("Server received: ~s", [Msg]) },
     State
    };
websocket_handle(_Other, State) ->  %Ignore
    {ok, State}.

这将在客户端等待对客户端先前发送到服务器的请求的回复时向客户端推送消息。

2) 如果您向正在运行websocket_* 函数的进程发送消息:

Pid ! {text, Msg}

那么该消息将由websocket_info() 函数处理——而不是websocket_handle() 函数:

websocket_info({text, Text}, State) ->
    {reply, {text, Text}, State};
websocket_info(_Other, State) ->
    {ok, State}.

websocket_info() 函数的返回值与websocket_handle() 函数的返回值一样。

因为你的gun客户端现在接收到多条消息,所以gun客户端需要循环接收:

upgrade_success_handler(ConnPid, Headers) ->
    io:format("Upgraded ~w. Success!~nHeaders:~n~p~n", 
              [ConnPid, Headers]),

    gun:ws_send(ConnPid, {text, "It's raining!"}),

    get_messages(ConnPid).  %Move the receive clause into a recursive function

get_messages(ConnPid) ->
    receive
        {gun_ws, ConnPid, {text, "Greeting: " ++ Greeting} } ->
            io:format("~s~n", [Greeting]),
            get_messages(ConnPid);

        {gun_ws, ConnPid, {text, Msg} } ->
            io:format("~s~n", [Msg]),
            get_messages(ConnPid)
    end.

【讨论】:

  • 谢谢!修改反映在我的回答中。
  • @GenericJam,翻译工作做得很好!如果您不介意,请编辑您的问题并添加erlanggun 标签——这将使erlangers 更容易找到信息。
  • 我添加了erlangerlang-gun,因为gun 指的是枪数据库。我删除了cowboy,因为它不是严格意义上的牛仔,我不得不将其保留为 5 个标签。
【解决方案2】:

感谢 7stud 提供示例代码和如下所示的编辑:

这是我对 Elixir 的解释,为 gun 提供一个基本的 WebSocket 客户端:

defmodule WebsocketTester.Application do

  use Application

  def start(_type, _args) do

    path = '/ws/app/1'

    port = 5000

    host = 'localhost'

    args = %{path: path, port: port, host: host}

    children = [
      { WebSocket.Client, args }
    ]
    Supervisor.start_link(children, strategy: :one_for_one, name: WebsocketTester.Supervisor)
  end
end

defmodule WebSocket.Client do

  use GenServer

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :permanent,
      shutdown: 500
    }
  end

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  # GenServer callbacks

  def init(args) do
    # Set up the websocket connection
    # get > upgrade
    # Initial state with gun_pid and stream_ref
    # %{gun_pid: gun_pid, stream_ref: stream_ref} = ws(args)
    {:ok, init_ws(args)}
  end

  # Give back gun_pid from state
  def handle_call(:get_conn, from, %{gun_pid: gun_pid, stream_ref: stream_ref}) do
    IO.inspect(gun_pid, label: "handle call gun pid")
    {:reply, %{gun_pid: gun_pid, stream_ref: stream_ref}, %{from: from, gun_pid: gun_pid} }
  end
  # Everything else
  def handle_call(other, from, state) do
    IO.inspect(other, label: "other call")
    IO.inspect(from, label: "from")
    {:ok, state}
  end
  # Client sends message to server.
  def handle_cast({:websocket_request, message}, %{gun_pid: gun_pid} = state) do
    IO.puts message
    IO.inspect(gun_pid, label: "gun_pid")
    :gun.ws_send(gun_pid, {:text, message})
    {:noreply, state}
  end

  def handle_info(message, %{from: from} = state) do
    IO.inspect(message, label: "Inside handle_info(): ")
    GenServer.reply(from, message)
    {:noreply, state}
  end

  def terminate(reason, _state) do
    IO.puts "Terminated due to #{reason}."
    :ok
  end


  def code_change(_old_version, state, _extra) do
    {:ok, state}
  end

  ## Client functions
  # Used for getting gun_pid from state
  def send_sync(request) do
    GenServer.call(__MODULE__, request)
  end

  # Send a message async
  def send_async(request) do
    GenServer.cast(__MODULE__, {:websocket_request, request})
  end

  # Receive a single message
  def get_message(stream_ref, gun_pid) do
      receive do
          {^stream_ref, {:gun_ws, ^gun_pid, {:text, message} }} ->
              IO.puts("Client received gun message: #{message}")
          other ->
            IO.inspect(other, label: "Client received other message")
      end
  end

  # Receive all messages recursively
  def receive_loop(stream_ref, gun_pid) do
    IO.puts "Listening"
      get_message(stream_ref, gun_pid)
      receive_loop(stream_ref, gun_pid)
  end

  def go() do
    # Get the gun_pid from state
    %{gun_pid: gun_pid, stream_ref: stream_ref} = send_sync(:get_gun_pid)
    IO.inspect(gun_pid, label: "Inside go(): gun_pid=")
    # Send messages manually
    :ok = send_async(Jason.encode!(%{type: "info", greet: "yo"}))
    # Or to send just text
    # :ok = send_async("yo")

    # Receive messages manually
    get_message(stream_ref, gun_pid)

    # Start sending loop
    spawn sender 1

    # Start listening
    receive_loop(stream_ref, gun_pid)
  end

  # Send messages to handle_info() every 3 secs
  def sender(count) do
      send_async("count is #{count}")
      :timer.sleep(3000)
      sender(count+1)
  end

  ## End of client functions

  # Initialize the websocket connection
  def init_ws(args) do

    %{ path: path, port: port, host: host} = args

    {:ok, _} = :application.ensure_all_started(:gun)
    connect_opts = %{
      connect_timeout: :timer.minutes(1),
      retry: 10,
      retry_timeout: 100
    }

    {:ok, gun_pid} = :gun.open(host, port, connect_opts)
    {:ok, _protocol} = :gun.await_up(gun_pid)
    # Set custom header with cookie for device id - set_headers can be left out if you don't want custom headers
    stream_ref = :gun.ws_upgrade(gun_pid, path, set_headers("I like cookies"))
    receive do
      {:gun_upgrade, ^gun_pid, ^stream_ref, ["websocket"], headers} ->
            upgrade_success(gun_pid, headers, stream_ref)
      {:gun_response, ^gun_pid, _, _, status, headers} ->
              exit({:ws_upgrade_failed, status, headers})
      {:gun_error, _gun_pid, _stream_ref, reason} ->
              exit({:ws_upgrade_failed, reason})
      whatever ->
        IO.inspect(whatever, label: "Whatever")
      # More clauses here as needed.
    after 5000 ->
        IO.puts "Took too long!"
        :erlang.exit("barf!")
    end
    # stop(gun_pid)
  end


  def set_headers(cookie_value) do
    [{"cookie", "my_cookie=#{cookie_value}"}]
  end

  # This just returns the gun_pid for further reference which gets stored in the GenServer state.
  def upgrade_success(gun_pid, headers, stream_ref) do
    IO.puts("Upgraded #{inspect(gun_pid)}. Success!\nHeaders:\n#{inspect(headers)}\n")
    %{stream_ref: stream_ref, gun_pid: gun_pid}
  end

  # To stop gun
  def stop(gun_pid) do
    :gun.shutdown(gun_pid)
  end

end

要使用这个:

iex -S mix
iex> WebSocket.Client.go

【讨论】:

  • 对不起,我不得不做一些改变。 gen_server:call() 创建一个唯一的 ref 来标识客户端,gen_server:reply/2 将返回它的消息。我忽略了客户端接收子句中的 Ref,但最好对 Ref 进行模式匹配,以便客户端知道消息来自 gen_server 而不是其他进程。现在,handle_call() 同时返回了 ConnPid 和 ClientRef,这意味着客户端可以在接收中使用 ClientRef 来确保消息来自 gen_server。
猜你喜欢
  • 1970-01-01
  • 2014-09-27
  • 1970-01-01
  • 1970-01-01
  • 2020-03-24
  • 2018-05-12
  • 1970-01-01
  • 2017-12-13
  • 1970-01-01
相关资源
最近更新 更多