【问题标题】:Buffer size in Erlang / Golang port exampleErlang / Golang 端口示例中的缓冲区大小
【发布时间】:2015-08-25 09:29:54
【问题描述】:

我有一个粗略的 Erlang 到 Golang 端口示例,将数据从 Erlang 传递到 Golang 并回显响应。

问题是我可以传输的数据量似乎限制为 2^8 字节(见下文)。我认为问题可能出在 Golang 方面(没有创建足够大的缓冲区),但是用 bufio.NewReaderSize 替换 bufio.NewReader 没有用。所以我现在认为问题可能出在 Erlang 方面。

我需要做些什么来增加缓冲区大小/能够回显大于 2^8 字节的消息?

TIA

justin@justin-ThinkPad-X240:~/work/erlang_golang_port$ erl -pa ebin
Erlang/OTP 17 [erts-6.4.1] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

Eshell V6.4.1  (abort with ^G)
1> port:start("./echo").
<0.35.0>
2> port:ping(65000).
65000
3> port:ping(66000).
** exception error: bad argument
     in function  port:call_port/1 (port.erl, line 20)
4> port:start("./echo").
<0.40.0>
5> port:ping(66000).    
65536

package main

import (
    "bufio"
    "os"
)

const Delimiter = '\n'

func main() {
    // reader := bufio:NewReader(os.Stdin)
    reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
    bytes, _ := reader.ReadBytes(Delimiter)
    os.Stdout.Write(bytes[:len(bytes)-1])
}

二郎

-module(port).

-export([start/1, stop/0, init/1]).

-export([ping/1]).

-define(DELIMITER, [10]).

start(ExtPrg) ->
    spawn(?MODULE, init, [ExtPrg]).

stop() ->
    myname ! stop.

ping(N) ->
    Msg=[round(65+26*random:uniform()) || _ <- lists:seq(1, N)],
    call_port(Msg).

call_port(Msg) ->
    myname ! {call, self(), Msg},
    receive
    {myname, Result} ->
        length(Result)
    end.

init(ExtPrg) ->
    register(myname, self()),
    process_flag(trap_exit, true),
    Port = open_port({spawn, ExtPrg}, []),
    loop(Port).

loop(Port) ->
    receive
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        receive
        {Port, {data, Data}} ->
            Caller ! {myname, Data}
        end,
        loop(Port);
    stop ->
        Port ! {self(), close},
        receive
        {Port, closed} ->
            exit(normal)
        end;
    {'EXIT', Port, _Reason} ->
        exit(port_terminated)
    end.

【问题讨论】:

    标签: go erlang


    【解决方案1】:

    如果你改用start_link,你会看到端口在第一个命令后崩溃:

    1> port:start('go run port.go').
    <0.118.0>
    2> port:ping(65000).
    65000
    ** exception error: port_terminated
    

    如果将 Go 代码更改为循环运行,则可以避免这种崩溃:

    func main() {
        for {
            // reader := bufio:NewReader(os.Stdin)
            reader := bufio.NewReaderSize(os.Stdin, 1677216) // 2**24;
            bytes, _ := reader.ReadBytes(Delimiter)
            os.Stdout.Write(bytes[:len(bytes)-1])
        }
    }
    

    现在我们可以看到另一个有趣的结果:

    33> c(port).
    {ok,port}
    40> port:ping(66000).
    65536
    41> port:ping(66000).
    464
    42> port:ping(66000).
    65536
    43> port:ping(66000).
    464
    

    现在我们可以看到实际上没有数据丢失,只是在端口中缓冲。由于您没有指定帧协议(使用{packet, N}{line, N},您自己负责收集数据。似乎Erlang 端口的内部缓冲区大小为64K(尽管我没有找到这方面的文档,也没有方法来改变它)。

    如果您在返回之前更改接收以获取所有数据,您将每次接收每个字节:

    loop(Port) ->
        receive
        {call, Caller, Msg} ->
            Port ! {self(), {command, Msg++?DELIMITER}},
            Caller ! {myname, receive_all(Port, 10)},
            loop(Port);
        stop ->
            Port ! {self(), close},
            receive
            {Port, closed} ->
                exit(normal)
            end;
        {'EXIT', Port, _Reason} ->
            exit(port_terminated)
        end.
    
    receive_all(Port, Timeout) -> receive_all(Port, Timeout, []).
    
    receive_all(Port, Timeout, Data) ->
        receive
        {Port, {data, New}} ->
            receive_all(Port, Timeout, [New|Data])
        after Timeout ->
            lists:flatten(lists:reverse(Data))
        end.
    

    运行这个,我们得到:

    1> c(port).
    {ok,port}
    2>
    3> port:start('go run port.go').
    <0.311.0>
    4> port:ping(66000).
    66000
    5> port:ping(66000).
    66000
    6> port:ping(66000).
    66000
    

    【讨论】:

    • 您的 receice_all/3 是 O(N^2) 到期 Data ++ New。这不是一个好习惯。
    【解决方案2】:
    1. 2^8 是 256,而不是 65536,它是 2^16(或 2 个字节)。
    2. 要排除 golang 程序,您可以简单地将 echo 替换为 GNU cat
    3. 端口通信的默认消息最大大小为 64k,因此当您的端口接收消息时,第一个在字符串的前导 64k。您可以再次读取端口以获取剩余数据,但只需将它们放入代码中即可。
    4. 如果你真的想通过基于线路的协议进行通信,你应该配置你的端口accordingly

    {line, L}

    消息是按行传递的。每一行 (由依赖于操作系统的换行符序列分隔)在一个中交付 单个消息。消息数据格式为{Flag, Line},其中Flag为 eol 或 noeol 和 Line 是交付的实际数据(没有 换行符)。

    L 指定最大行长度(以字节为单位)。比这更长的行 将在多条消息中传递,标志设置为 noeol 除了最后一条消息之外的所有消息。如果在任何地方遇到文件结尾 除了紧跟换行符序列之外,最后一行将 也可以在标志设置为 noeol 的情况下交付。在所有其他情况下, 行交付时将 Flag 设置为 eol。

    {packet, N}{line, L} 设置是互斥的。

    所以你的代码是

    Port = open_port({spawn, ExtPrg}, [{line, ?PACKET_SIZE]),
    %%...
    {call, Caller, Msg} ->
        Port ! {self(), {command, Msg++?DELIMITER}},
        D = read_data(Port, []),
        Caller ! {myname, D},
        loop(Port);
    %%...
    read_data(Port, Prefix) ->
    receive
        {Port, {data, {noeol, Data}}} ->
            read_data(Port, Prefix ++ Data);
        {Port, {data, {eol, Data}}} ->
            Prefix ++ Data
    end.
    

    【讨论】:

      【解决方案3】:

      我一直在努力解决类似的问题。 这里是管道模块的完整代码。

      它允许发送文本数据到端口并读取所有回复。

      -module(apr_pipe).
      
      -export([open_pipe/2,send/2,close/1]).
      
      -export([loop/1,status/1,init/1]).
      
      -include_lib("kernel/include/logger.hrl").
      
      -define(MAX_LINE_LEN,4096).
      
      open_pipe(Path,Cmd) ->
         State = #{path => Path, cmd => Cmd},
          Pid = spawn(?MODULE,init,[State]),
          Pid.
      
      init(State) ->
          #{path := Path,cmd := Cmd} = State,
          FullFn = filename:join(Path,Cmd),
          Settings = [{line,?MAX_LINE_LEN},use_stdio,stderr_to_stdout,hide,binary,exit_status],
          Port = erlang:open_port({spawn_executable,FullFn},Settings),
          State2 = State#{port => Port, data => #{}},
          loop(State2).
      
      
      send(Pid,Data)  -> Pid!{self(),send,Data}.
      close(Pid)      -> Pid!{self(),send,close}.
      status(Pid)     -> Pid!{self(),status}.
      
      get_eol() -> <<"\n">>.
      
      loop(State) ->
          receive
              {_Pid,send,close} -> 
                          ?LOG(notice,"got cmd: Close",[]),
                          Port = maps:get(port,State),
                          port_close(Port),
                          exit(normal);
              {Pid,send,Data} ->
                          ?LOG(notice,"Send Data ...",[]),
                          Port = maps:get(port,State),
                          port_command(Port,Data),
                          port_command(Port,get_eol()),
                          State2 = State#{status => data_sent, client => Pid},
                          loop(State2);
               {Pid,status} -> 
                          Port = maps:get(port,State),
                          ?LOG(notice,"Status: Port: ~p State: ~p",[Port,State]),
                          Pid!{status,Port,State},
                          loop(State);
              % port messages.
              {Port, {data,{noeol,Data}}} ->
                      ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                      CurData = maps:get(cur_data,State,[]),
                      State2 = State#{cur_data => [Data | CurData]},
                      loop(State2);
      
              {Port, {data, {eol,Data}}} ->
                      ?LOG(notice,"Port: ~p Data: ~p",[Port,Data]),
                      CurData = [Data | maps:get(cur_data,State,[])],
                      CurData2 = lists:reverse(CurData),
                      Reply    = list_to_binary(CurData2),
                      Client = maps:get(client,State,undefined),
                      State2 = State#{cur_data => [], client => undefined},
                      case Client of
                          undefined -> ?LOG(error,"can not sent reply. Client: ~p Reply: ~p", [Client,Reply]),
                                       loop(State2);
                          _ -> Client!{reply,Reply},
                               loop(State2)
                       end;
              {_Port, closed} ->
                      ?LOG(warning, "Port: ~p closed",[]),
                      exit(normal);
              {'EXIT',  Port, Reason} ->
                       ?LOG(notice,"Port: ~p exit. Reason: ~p",[Port,Reason]),
                       exit(Reason);
              _Other -> ?LOG(error,"unexpected message: ~p",[_Other]),
                        exit({error,{unexpected_message,_Other}})
          end.
      

      【讨论】:

        猜你喜欢
        • 2015-07-01
        • 1970-01-01
        • 2015-07-16
        • 1970-01-01
        • 1970-01-01
        • 2013-05-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多