【问题标题】:Impala queries are not executed in async mannerImpala 查询不以异步方式执行
【发布时间】:2018-09-03 18:49:36
【问题描述】:

基本上,有一个小的 aiohttp 应用程序,它接收 Impala 查询列表,然后将它们发送给 Impala。然而,有些查询可能需要很长时间才能完成,因此决定以异步/并行方式进行。得到了一个线程工作的解决方案,但很想看看是否有可能只使用 asyncio/tornado 来达到相同的速度。

我的代码如下:

async def run(self, queries):
    # Here I validate queries
    query_list    = await self.build_query_list(split_queries)        # Format: [[queries for connection_1], [queries for connection_2], ...]

    start         = time.time()
    # Assing group of queries to each connection and wait results
    result_queue = deque()
    await multi([self.execute_impala_query(connection.connection, query_list[index], result_queue) for index, connection in enumerate(connection_list)])

    # Close all connections
    [await self.impala_connect_advance_pool.release_connection(connection) for connection in connection_list]

    # Wait for Impala responses
    while len(result_queue) < connect_limit: 
        continue

    # Send results back


async def execute_impala_query(self, impala_connect, queries, queue):
    return await multi([self.impala_response_to_json_response(impala_connect.cursor(), query, queue) for query in queries])

async def impala_response_to_json_response(self, impala_cursor, query, queue):
    self.logger.info('execute query: {}'.format(query))
    print ('execute query: {}'.format(query))

    def get_results():
        impala_cursor.execute(query)
        results = as_pandas(impala_cursor)
        impala_cursor.close()
        self.logger.info('{} completed'.format(query))
        print ('{} completed'.format(query))
        queue.append(results.to_json(orient='records'))

    IOLoop.current().spawn_callback(get_results)

发生的情况是,一旦它运行,我可以在标准输出中看到“执行查询:查询”消息,我假设它们都被触发并正在执行,但是,它需要 2(或更多) 只要是带有 Threads 的版本。我是把整个异步概念弄错了,还是在方法的某个地方犯了一些愚蠢的错误?

【问题讨论】:

    标签: python-3.6 tornado python-asyncio impyla


    【解决方案1】:

    整个异步概念错误 是的,仅仅通过使用 spawn_callback 调用函数不会使其异步:您的数据库连接器应该支持异步 IO。正如我所看到的:我建议你看看execute_async 方法。然后你需要编写你自己的等待函数,比如Impyla's _wait_to_finish,但是用tornado.gen.sleep而不是time.sleep()

    【讨论】:

    • 哦,我明白了。我知道我需要做类似的事情,但希望是最好的:) 届时会试一试。感谢您为我指出这一点:)
    猜你喜欢
    • 2013-09-29
    • 1970-01-01
    • 2011-08-22
    • 1970-01-01
    • 2014-04-03
    • 1970-01-01
    • 1970-01-01
    • 2017-09-09
    • 1970-01-01
    相关资源
    最近更新 更多