【问题标题】:QuickFIX/J Server with Vert.x带有 Vert.x 的 QuickFIX/J 服务器
【发布时间】:2021-03-16 13:38:26
【问题描述】:

我正在尝试创建一个启动 QuickFIX/J 接受器服务器(TCP FIX 服务器)的 Verticle。启动时,接受器线程在单独的线程上运行,而 Vert.x 不知道这一点(不会阻塞事件循环)。 但是我可以从接受者线程访问事件总线并将消息传递给其他顶点。

问题是,这是一个好习惯吗?

package com.millenniumit.fixgateway.service.impl.quickfix;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.MessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.*;
import quickfix.fix42.NewOrderSingle;

public class FIXServerVerticle extends AbstractVerticle {

    private DynamicSessionProviderConfigHelper dynamicSessionProviderConfig;
    private ThreadedSocketAcceptor threadedSocketAcceptor;
    private static final Logger LOGGER = LoggerFactory.getLogger(FIXServerVerticle.class);

    /**
     * You can’t block waiting for the tcp server to bind in the start method as that would break the Golden Rule.
     * To prevent this, implement the asynchronous start method. This version of the method takes a Future as a parameter.
     * When the method returns the verticle will not be considered deployed.
     * @param startPromise
     */
    @Override
    public void start(Promise<Void> startPromise) {
        Application serverApplication = new Application() {
            @Override
            public void onCreate(SessionID sessionID) {
                LOGGER.info("Session Created : " + sessionID);
            }

            @Override
            public void onLogon(SessionID sessionID) {

            }

            @Override
            public void onLogout(SessionID sessionID) {

            }

            @Override
            public void toAdmin(Message message, SessionID sessionID) {

            }

            @Override
            public void fromAdmin(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon {

            }

            @Override
            public void toApp(Message message, SessionID sessionID) throws DoNotSend {

            }

            @Override
            public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
                LOGGER.info("Processing in worker thread: " + message);
                //Offload processing logic from event loop
                getVertx().executeBlocking(future -> {
                    //blocking code, run on the worker thread
                    LOGGER.info("Processing in worker thread: " + message);
                    //processing logic
                    future.complete(message);
                }, res -> {
                    //non blocking code running on the event loop thread
                    getVertx().eventBus().request("in.message", res.result(), ar -> {
                        if (ar.succeeded()) {
                            Session.lookupSession(sessionID).send((Message) ar.result().body());
                        }
                    });
                });
            }
        };
        //Offload acceptor initialization from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            MessageStoreFactory messageStoreFactory = new NoopStoreFactory();
            MessageFactory messageFactory = new DefaultMessageFactory();
            dynamicSessionProviderConfig = new DynamicSessionProviderConfigHelper();
            try {
                SessionSettings sessionSettings = new SessionSettings("acceptor-config");
                threadedSocketAcceptor = new ThreadedSocketAcceptor(serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                dynamicSessionProviderConfig.configure(threadedSocketAcceptor, serverApplication, messageStoreFactory, sessionSettings, messageFactory);
                threadedSocketAcceptor.start();
                future.complete();
            } catch (ConfigError | FieldConvertError configError) {
                configError.printStackTrace();
                future.fail(configError.getMessage());
            }
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                startPromise.complete();
            }else{
                startPromise.fail(res.cause().getMessage());
            }
        });
    }

    public void stop(Promise<Void> stopPromise) {
        //Offload acceptor stop method from event loop
        getVertx().executeBlocking(future -> {
            //blocking code, run on the worker thread
            threadedSocketAcceptor.stop();
            future.complete();
        }, res -> {
            //non blocking code running on the event loop thread
            if(res.succeeded()){
                stopPromise.complete();
            }else{
                stopPromise.fail(res.cause().getMessage());
            }
        });
    }

}

【问题讨论】:

    标签: vert.x quickfixj


    【解决方案1】:

    我不熟悉 vert.x,但一般来说,如果需要更高的吞吐量,QuickFIX/J 应用程序中的常见做法是将传入消息卸载到单独的线程/队列。

    【讨论】:

      【解决方案2】:

      这种方法对我来说看起来不错,我认为与 vert.x 的工作方式没有冲突。

      顺便说一句,您可以稍微简化代码。

      这个:

      getVertx().executeBlocking(
        future -> {},
        res -> {
                  if(res.succeeded()){
                      startPromise.complete();
                  }else{
                      startPromise.fail(res.cause().getMessage());
                  }
              }
      )
      

      可以替换为:

      getVertx().executeBlocking(
        future -> {},
        startPromise
      )
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2022-10-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多