Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
264 views
in Technique[技术] by (71.8m points)

python - Interoperating with Django/Celery From Java

Our company has a Python based web site and some Python based worker nodes which communicate via Django/Celery and RabbitMQ. I have a Java based application which needs to submit tasks to the Celery based workers. I can send jobs to RabbitMQ from Java just fine, but the Celery based workers are never picking up the jobs. From looking at the packet captures of both types of job submissions, there are differences, but I cannot fathom how to account for them because a lot of it is binary that I cannot find documentation about decoding. Does anyone here have any reference or experience with having Java/RabbitMQ and Celery working together?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I found the solution. The Java library for RabbitMQ refers to exchanges/queues/routekeys. In Celery, the queue name is actually mapping to the exchange referred to in the Java library. By default, the queue for Celery is simply "celery". If your Django settings define a queue called "myqueue" using the following syntax:

CELERY_ROUTES = {
    'mypackage.myclass.runworker'      : {'queue':'myqueue'},
}

Then the Java based code needs to do something like the following:

        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null ;
        try {
            connection = factory.newConnection(mqHost, mqPort);
        } catch (IOException ioe) {
            log.error("Unable to create new MQ connection from factory.", ioe) ;
        }

        Channel channel = null ;
        try {
            channel = connection.createChannel();
        } catch (IOException ioe) {
            log.error("Unable to create new channel for MQ connection.", ioe) ;
        }

        try {
            channel.queueDeclare("celery", false, false, false, true, null);
        } catch (IOException ioe) {
            log.error("Unable to declare queue for MQ channel.", ioe) ;
        }

        try {
            channel.exchangeDeclare("myqueue", "direct") ;
        } catch (IOException ioe) {
            log.error("Unable to declare exchange for MQ channel.", ioe) ;
        }

        try {
            channel.queueBind("celery", "myqueue", "myqueue") ;
        } catch (IOException ioe) {
            log.error("Unable to bind queue for channel.", ioe) ;
        }

            // Generate the message body as a string here.

        try {
            channel.basicPublish(mqExchange, mqRouteKey, 
                new AMQP.BasicProperties("application/json", "ASCII", null, null, null, null, null, null, null, null, null, "guest", null, null),
                messageBody.getBytes("ASCII"));
        } catch (IOException ioe) {
            log.error("IOException encountered while trying to publish task via MQ.", ioe) ;
        }

It turns out that it is just a difference in terminology.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...