在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:rabbitmq/rabbitmq-stream-dotnet-client开源软件地址:https://github.com/rabbitmq/rabbitmq-stream-dotnet-client开源编程语言:C# 97.1%开源软件介绍:RabbitMQ client for the stream protocol#@ Table of Contents OverviewDotnet client for RabbitMQ Stream Queues Installing via NuGetThe client is distributed via NuGet. Getting startedA rapid getting started var config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
VirtualHost = "/"
};
// Connect to the broker
var system = await StreamSystem.Create(config);
const string stream = "my_first_stream";
// Create the stream. It is important to put some retention policy
// in this case is 200000 bytes.
await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 200000,
});
var producer = await system.CreateProducer(
new ProducerConfig
{
Reference = Guid.NewGuid().ToString(),
Stream = stream,
// Here you can receive the messages confirmation
// it means the message is stored on the server
ConfirmHandler = conf =>
{
Console.WriteLine($"message: {conf.PublishingId} - confirmed");
}
});
// Publish the messages and set the publishingId that
// should be sequential
for (ulong i = 0; i < 100; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"hello {i}"));
await producer.Send(i, message);
}
// not mandatory. Just to show the confirmation
Thread.Sleep(TimeSpan.FromSeconds(1));
// Create a consumer
var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = Guid.NewGuid().ToString(),
Stream = stream,
// Consume the stream from the beginning
// See also other OffsetSpec
OffsetSpec = new OffsetTypeFirst(),
// Receive the messages
MessageHandler = async (consumer, ctx, message) =>
{
Console.WriteLine($"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())} - consumed");
await Task.CompletedTask;
}
});
Console.WriteLine($"Press to stop");
Console.ReadLine();
await producer.Close();
await consumer.Close();
await system.DeleteStream(stream);
await system.Close(); UsageConnectvar config = new StreamSystemConfig
{
UserName = "myuser",
Password = "mypassword",
VirtualHost = "myhost",
Endpoints = new List<EndPoint> {new IPEndPoint(IPAddress.Parse("<<brokerip>>"), 5552)},
} Multi Hostvar config = new StreamSystemConfig
{
UserName = "myuser",
Password = "mypassword",
VirtualHost = "myhost",
Endpoints = new List<EndPoint>
{
new IPEndPoint(IPAddress.Parse("<<brokerip1>>"), 5552),
new IPEndPoint(IPAddress.Parse("<<brokerip2>>"), 5552),
new IPEndPoint(IPAddress.Parse("<<brokerip3>>"), 5552)
},
}; TLSvar config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
VirtualHost = "/",
Ssl = new SslOption()
{
Enabled = true
}, Load Balancervar lbAddressResolver = new AddressResolver(new IPEndPoint(IPAddress.Parse("<<loadBalancerIP>>"), 5552));
var config = new StreamSystemConfig
{
UserName = "guest",
Password = "guest",
VirtualHost = "/",
AddressResolver = lbAddressResolver,
Endpoints = new List<EndPoint> {addressResolver.EndPoint},
} Manage Streamsawait system.CreateStream(new StreamSpec(stream));
It is possible to set up the retention policy when creating the stream, based on size or time: await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 200000,
MaxAge = TimeSpan.FromHours(8),
}); Set a policy is highly recommended. RabbitMQ does not store the whole stream in a single file, but splits it in segment files.
This is also used for truncate the stream: when a stream reaches his maximum size, an entire segment file is deleted. For this reason await system.CreateStream(new StreamSpec(stream)
{
MaxLengthBytes = 20_000,
MaxSegmentSizeBytes = 1000
}); ProducerA Producer instance is created from the var producer = await system.CreateProducer(
new ProducerConfig
{
Stream = "my_stream",
}); Consider a Producer instance like a long-lived object, do not create one to send just one message.
Publish MessagesStandard publish var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes("hello"));
await producer.Send(publishingId, message);
Sub Entries BatchingA sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency. var subEntryMessages = List<Messages>();
for (var i = 1; i <= 500; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"SubBatchMessage_{i}"));
subEntryMessages.Add(message);
}
var publishingId = 1;
await producer.Send(publishingId, subEntryMessages, CompressionType.Gzip);
messages.Clear(); Not all the compressions are implemented by defaults, to avoid to many dependencies. See the table:
You can add missing codecs with DeduplicationSee here for more details Set a producer reference to enable the deduplication: var producer = await system.CreateProducer(
new ProducerConfig
{
Reference = "my_producer",
Stream = "my_stream",
}); then: var publishingId = 0;
var message = new Message(Encoding.UTF8.GetBytes($"my deduplicate message {i}"));
await producer.Send(publishingId, message); Consume MessagesDefine a consumer: var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "my_consumer",
Stream = stream,
MessageHandler = async (consumer, ctx, message) =>
{
Console.WriteLine(
$"message: {Encoding.Default.GetString(message.Data.Contents.ToArray())}");
await Task.CompletedTask;
}
}); Offset TypesThere are five types of Offset and they can be set by the var consumerOffsetTypeFirst = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "my_consumer_offset_first",
Stream = stream,
OffsetSpec = new OffsetTypeFirst(),
MessageHandler = async (consumer, ctx, message) =>
{
await Task.CompletedTask;
}
}); The five types are:
var offsetTypeFirst = new OffsetTypeFirst();
var offsetTypeLast = new OffsetTypeLast();
var offsetTypeNext = new OffsetTypeNext()
ulong iWantToStartFromPubId = 10;
var offsetTypeOffset = new OffsetTypeOffset(iWantToStartFromPubId);
var anHourAgo = (long)DateTime.UtcNow.AddHours(-1).Subtract(new DateTime(1970, 1, 1)).TotalSeconds;
var offsetTypeTimestamp = new OffsetTypeTimestamp(anHourAgo); Track OffsetThe server can store the current delivered offset given a consumer with var messagesConsumed = 0;
var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "my_consumer",
Stream = stream,
MessageHandler = async (consumer, ctx, message) =>
{
if (++messagesConsumed % 1000 == 0)
{
await consumer.StoreOffset(ctx.Offset);
} Note: Avoid to store the offset for each single message, it can reduce the performances. It is possible to retrieve the offset with var trackedOffset = await system.QueryOffset("my_consumer", stream);
var consumer = await system.CreateConsumer(
new ConsumerConfig
{
Reference = "my_consumer",
Stream = stream,
OffsetSpec = new OffsetTypeOffset(trackedOffset), OBS. if don't have stored an offset for the consumer's reference on the stream you get an OffsetNotFoundException exception. Handle CloseProducers/Consumers raise and event when the client is disconnected: new ProducerConfig/ConsumerConfig
{
ConnectionClosedHandler = s =>
{
Console.WriteLine($"Connection Closed: {s}");
return Task.CompletedTask;
}, Handle Metadata UpdateStream metadata update is raised when the stream topology changes or the stream is deleted.
You can use new ProducerConfig/ConsumerConfig
{
MetadataHandler = update =>
{
......
},
} Reliable
Reliable ProducerReliable Producer is a smart layer built up of the standard The main features are:
Provide publishingID automaticallyReliable Producer retrieves the last publishingID given the producer name. Zero(0) is the default value in case there is no a publishingID. Auto-ReconnectReliable Producer restores the TCP connection in case the Producer is disconnected for some reason. During the reconnection it continues to store the messages in a local-list. The user will receive back the confirmed or un-confirmed messages. Trace sent and received messagesReliable Producer keeps in memory each sent message and remove from the memory when the message is confirmed or goes in timout.
ConfirmationHandler = confirmation =>
{
if (confirmation.Status == ConfirmationStatus.Confirmed)
{
// OK
}
else
{
// Some problem
}
} Invalidate messagesIf the client doesn't receive a confirmation within 2 seconds Reliable Producer removes the message from the internal messages cache.
The user will receive Handle the metadata UpdateIf the streams changes the topology (ex:Stream deleted or add/remove follower), the client receives an Send APIReliable Producer implements two
Reconnection StrategyBy default Reliable Producer uses an 全部评论
专题导读
上一篇:Esri/visibility-addin-dotnet: ArcGIS Add-in provides the capability to quickly d ...发布时间:2022-05-15下一篇:jxnkwlp/DotnetSpiderLite: 轻量级爬虫框架。源于DotnetSpider发布时间:2022-05-15热门推荐
热门话题
阅读排行榜
|
请发表评论