Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
905 views
in Technique[技术] by (71.8m points)

How to connect consumer to Kafka topic with MassTransit using state machine

When there are two services:

Service 1 hosts state machine and produces message on topic. Service 2 should consume this message. How to setup this properly for Service 2 to consume messages?

When the code goes like this, it doesn't work:

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");
        });

        rider.AddConsumer<OrderConsumer>()
            .Endpoint(e =>
            {
                e.Name = "queue_name";
                e.Temporary = false;
                e.ConcurrentMessageLimit = 8;
            });
    });
});

When I did it like this, it throws System.ArgumentException: 'The consumer type was not found: OrderConsumer'

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null, OrderMessage>("queue_name", "group_id", cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You need a mix of both samples you posted:

services.AddMassTransit(mt =>
{
    mt.UsingInMemory((context, cfg) => cfg.ConfigureEndpoints(context, SnakeCaseEndpointNameFormatter.Instance));

    mt.AddRider(rider =>
    {
        rider.AddConsumer<OrderConsumer>()
        
        rider.UsingKafka((ctx, kafka) =>
        {
            kafka.Host("kafka_url");

            kafka.TopicEndpoint<Null, OrderMessage>("queue_name", "group_id", cfg =>
            {
                cfg.AutoOffsetReset = AutoOffsetReset.Earliest;
                cfg.ConfigureConsumer<OrderConsumer>(ctx);
            });
        });
    });
});

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...