【问题标题】:Celery to manage java tasks芹菜管理java任务
【发布时间】:2016-03-02 05:15:59
【问题描述】:

我有一个 java 客户端,它使用 rabbitmq 与 celery 通信,并将任务发送到 celery 服务器以添加 2 个数字 xy

String QUEUE_NAME = "celery";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
String x = "5";
String y = "10";
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String ID = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "{\"id\":\""+ID+"\", \"task\": \"tasks.add\", \"args\": ["+x+","+y+"], \"kwargs\": {}, \"retries\": 0, \"eta\": \"2009-11-17T12:30:56.527191\"}";
channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder()
        .contentType("application/json").contentEncoding("utf-8")
        .build(), message.getBytes("utf-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();

我们有一个 python api add 来添加这些数字,它由 celery 管理。

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

我想在 java 中编写这个 add(x,y) 函数,芹菜以某种方式识别 java add(x,y) method 并管理它。

注意:我正在寻找没有 webhook 的解决方案。

提前致谢。

【问题讨论】:

  • 如何使用 [celery.execute.delay_tas] (docs.celeryproject.org/en/2.1-archived/reference/…)。使用它,您可以使用任务名称调用任务
  • celery 怎么知道这个任务名是给这个 java 函数的?这是原来的问题
  • 所以你想要一些 java 代码将任务发送到 rabbitmq,并且你想让 celery 将任务识别为用 java 编写并运行适当的 java 代码?
  • 我已经有 java 代码向 celery 发送任务,它是一个 json 任务并且有要执行的方法名称,现在我希望 celery 读取方法名称并将其链接到相应的 java 方法和执行那个方法。

标签: java python rabbitmq celery


【解决方案1】:

查看https://crabhi.github.io/celery-java/

这是 Celery 客户端和 JVM 工作者的(目前非常 alpha)实现。

您可以注释您的任务类:

 import org.sedlakovi.celery.Task;

 @Task
 public class TestTask {

     public int sum(int x, int y) {
         return x + y;
     }
 }

然后像这样调用任务:

Client client = new Client(rabbitConnectionChannel, rabbitBackend);

Integer result = TestTaskProxy.with(client).sum(1, 7).get();

【讨论】:

    【解决方案2】:
    import json
    
    from celery import Celery
    from celery import bootsteps
    from kombu import Consumer, Exchange, Queue
    
    queue = Queue("input.queue", Exchange("default"), "input.key")
    
    app = Celery(broker="amqp://")
    
    
    # Decalring the general input message handler
    class InputMessageHandler(object):
        def handle(self, body):
            body_json = json.loads(body)
            _type = body_json["type"]
            if _type == "ETL":
                ETLMessageHandler().handle(body_json)
    
    
    # Declaring the ETL message handler
    class ETLMessageHandler(object):
        def handle(self, body):
            print("Working on ETL for message: {0}".format(body))
            # Calling out your Celery tasks here
    
    
    # Declaring the bootstep for our purposes
    class InputMessageConsumerStep(bootsteps.ConsumerStep):
        def get_consumers(self, channel):
            return [Consumer(channel,
                             queues=[queue],
                             callbacks=[self.handle_message],
                             accept=["json"])]
    
        def handle_message(self, body, message):
            InputMessageHandler().handle(body)
            message.ack()
    
    
    app.steps["consumer"].add(InputMessageConsumerStep)
    
    
    if __name__ == "__main__":
        app.start()
    

    【讨论】:

      猜你喜欢
      • 2014-10-02
      • 2014-05-29
      • 2017-11-06
      • 1970-01-01
      • 2014-12-04
      • 1970-01-01
      • 1970-01-01
      • 2012-12-01
      • 2014-04-16
      相关资源
      最近更新 更多