【问题标题】:Cloud Dataflow - how does Dataflow do parallelism?Cloud Dataflow - Dataflow 如何进行并行处理?
【发布时间】:2018-07-05 04:36:46
【问题描述】:

我的问题是,在幕后,对于按元素的 Beam DoFn (ParDo),Cloud Dataflow 如何并行工作负载?例如,在我的 ParDO 中,我针对一个元素向外部服务器发送一个 http 请求。我使用 30 个工人,每个工人有 4 个 vCPU。

  1. 这是否意味着每个工作线程最多有 4 个线程?
  2. 这是否意味着每个工作人员只需要 4 个 http 连接,或者如果我让它们保持活动状态以获得最佳性能,则可以建立它们?
  3. 除了使用更多内核或更多工作线程之外,我如何调整并行度?
  4. 使用我当前的设置(30*4vCPU worker),我可以在 http 服务器上建立大约 120 个 http 连接。但是 server 和 worker 的资源使用率都非常低。基本上我想通过每秒发送更多请求来让他们更加努力地工作。我该怎么办...

代码片段来说明我的工作:

public class NewCallServerDoFn extends DoFn<PreparedRequest,KV<PreparedRequest,String>> {


private static final Logger Logger = LoggerFactory.getLogger(ProcessReponseDoFn.class);

private static PoolingHttpClientConnectionManager _ConnManager = null;
private static CloseableHttpClient _HttpClient = null;
private static HttpRequestRetryHandler _RetryHandler = null;
private static  String[] _MapServers = MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.server_host").split(",");

@Setup
public void setupHttpClient(){

    Logger.info("Setting up HttpClient");

   //Question: the value of maxConnection below is actually 10, but with 30 worker machines, I can only see 115 TCP connections established on the server side. So this setting doesn't really take effect as I expected.....

    int maxConnection = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.max_connection");
    int timeout = MapServerBatchBeamApplication.CONFIG.getInt("mapserver.client.config.timeout");

    _ConnManager = new PoolingHttpClientConnectionManager();

    for (String mapServer : _MapServers) {
        HttpHost serverHost = new HttpHost(mapServer,80);
        _ConnManager.setMaxPerRoute(new HttpRoute(serverHost),maxConnection);
    }

    // config timeout
    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(timeout)
            .setConnectionRequestTimeout(timeout)
            .setSocketTimeout(timeout).build();

    // config retry
    _RetryHandler = new HttpRequestRetryHandler() {

        public boolean retryRequest(
                IOException exception,
                int executionCount,
                HttpContext context) {

            Logger.info(exception.toString());
            Logger.info("try request: " + executionCount);

            if (executionCount >= 5) {
                // Do not retry if over max retry count
                return false;
            }
            if (exception instanceof InterruptedIOException) {
                // Timeout
                return false;
            }
            if (exception instanceof UnknownHostException) {
                // Unknown host
                return false;
            }
            if (exception instanceof ConnectTimeoutException) {
                // Connection refused
                return false;
            }
            if (exception instanceof SSLException) {
                // SSL handshake exception
                return false;
            }
            return true;
        }

    };

    _HttpClient = HttpClients.custom()
                            .setConnectionManager(_ConnManager)
                            .setDefaultRequestConfig(requestConfig)
                            .setRetryHandler(_RetryHandler)
                            .build();

    Logger.info("Setting up HttpClient is done.");

}

@Teardown
public void tearDown(){
    Logger.info("Tearing down HttpClient and Connection Manager.");
    try {
        _HttpClient.close();
        _ConnManager.close();
    }catch (Exception e){
        Logger.warn(e.toString());
    }
    Logger.info("HttpClient and Connection Manager have been teared down.");
}




@ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    String response="{\"my_error\":\"failed to get response from map server with retries\"}";


    String chosenServer = _MapServers[request.getHardwareId() % _MapServers.length];

    String parameter;
    try {
        parameter = URLEncoder.encode(request.getRequest(),"UTF-8");
    } catch (UnsupportedEncodingException e) {
        Logger.error(e.toString());

        return;
    }

    StringBuilder sb = new StringBuilder().append(MapServerBatchBeamApplication.CONFIG.getString("mapserver.client.config.api_path"))
            .append("?coordinates=")
            .append(parameter);

    HttpGet getRequest = new HttpGet(sb.toString());
    HttpHost host = new HttpHost(chosenServer,80,"http");
    CloseableHttpResponse httpRes;

    try {
        httpRes = _HttpClient.execute(host,getRequest);
        HttpEntity entity = httpRes.getEntity();
        if(entity != null){
            try
            {
                response = EntityUtils.toString(entity);
            }finally{
                EntityUtils.consume(entity);
                httpRes.close();
            }
        }
    }catch(Exception e){
        Logger.warn("failed by get response from map server with retries for " + request.getRequest());
    }

    c.output(KV.of(request, response));

}
}

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:
    1. 是的,基于此answer
    2. 不,您可以建立更多连接。根据我的answer,您可以使用异步 http 客户端来处理更多并发请求。正如这个答案所描述的,您需要收集这些异步调用的结果,并将其同步输出到任何@ProcessElement@FinishBundle
    3. 见 2。
    4. 由于您的资源使用率很低,这表明工作人员大部分时间都在等待响应。我认为通过上述方法,您可以更好地利用资源,并且可以用更少的工人实现相同的性能。

    【讨论】:

    • 我花了一些时间,但我想我是从你提供的那个 spotify 库中学到了这个想法。它设计得非常好!我最终基于同样的想法实现了我自己的 AsyncDoFn。非常感谢!如果有其他问题,我会告诉你它的表现。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-15
    相关资源
    最近更新 更多