【问题标题】:How to send requests while receiving responses in IB API?如何在 IB API 中接收响应时发送请求?
【发布时间】:2017-06-05 01:49:19
【问题描述】:

我不太明白如何使用 IB API Wrapper 和客户端流。我知道我正在通过客户端流向 IB 服务器发送请求,并通过 Wrapper 流接收响应,但我希望能够为用户实现一个菜单,例如“Press 'p' for open positions , ETC。”随后显示服务器响应并提示其他操作。

from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper

# types
from ibapi.common import *
from ibapi.contract import *


class TestClient(EClient):
    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)


class TestWrapper(wrapper.EWrapper):
    def __init__(self):
        wrapper.EWrapper.__init__(self)


class TestApp(TestWrapper, TestClient):
    def __init__(self):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

    def position(self, account: str, contract: Contract, position: float,
                 avgCost: float):
        """This event returns real-time positions for all accounts in
        response to the reqPositions() method."""

        super().position(account, contract, position, avgCost)
        print("Position.", account, "Symbol:", contract.symbol, "SecType:",
              contract.secType, "Currency:", contract.currency,
              "Position:", position, "Avg cost:", avgCost)


def main():
    app = TestApp()
    usr_in = ''

    app.connect("127.0.0.1", 7497, clientId=0)
    print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),
                                              app.twsConnectionTime()))

    while(usr_in != 'exit'):
         usr_in = input('What to do next?')
         if usr_in == 'p':
             app.reqPositions()

    app.run()

if __name__ == '__main__':
    main()

现在最终发生的是程序要求输入,但在我退出循环之前不显示服务器响应,之后它会显示响应。我知道我可以在 position() 函数中包含一些逻辑,但这非常有限,必须有更好的方法来与两个流交互。此外,非常感谢您提供有关流主题的任何进一步阅读的指针,例如 IB API 中使用的流。

【问题讨论】:

    标签: python api stream


    【解决方案1】:

    我想通了!所有需要做的就是启动两个线程。一个将处理 EClient 的 run() 函数,另一个将运行应用程序的循环函数。下面是代码的样子:

    from ibapi import wrapper
    from ibapi.client import EClient
    from ibapi.contract import *
    
    import threading
    import time
    
    
    class TestApp(wrapper.EWrapper, EClient):
        def __init__(self):
            wrapper.EWrapper.__init__(self)
            EClient.__init__(self, wrapper=self)
    
            self.connect("127.0.0.1", 7497, clientId=0)
            print(f"serverVersion:{self.serverVersion()}
                  connectionTime:{self.twsConnectionTime()}")
    
        def position(self, account: str, contract: Contract, position: float,
                     avgCost: float):
            """This event returns real-time positions for all accounts in
            response to the reqPositions() method."""
    
            super().position(account, contract, position, avgCost)
            print("Position.", account, "Symbol:", contract.symbol, "SecType:",
                  contract.secType, "Currency:", contract.currency,
                  "Position:", position, "Avg cost:", avgCost)
    
        def loop(self):
            while True:
                self.reqPositions()
                time.sleep(15)
    
    
    def main():
        app = TestApp()
    
        reader_thread = threading.Thread(target=app.run)
        loop_thread = threading.Thread(target=app.loop)
    
        reader_thread.start()
        loop_thread.start()
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-05-05
      • 1970-01-01
      • 1970-01-01
      • 2014-07-13
      • 2011-03-03
      • 2018-04-17
      • 2012-03-11
      • 2011-06-07
      相关资源
      最近更新 更多