• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Chronicle-Queue: Micro second messaging that stores everything to disk

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

Chronicle-Queue

开源软件地址:

https://gitee.com/chaotic1988/Chronicle-Queue

开源软件介绍:

Chronicle Queue

badgeGitHubLobby

Queue line

Contents

Table of Contents

What is Chronicle Queue

What is Chronicle Queue

  • You could consider Chronicle Queue to be similar to a low latency broker less durable/persisted JVM topic.

Chronicle Queue is a distributed unbounded persisted queue.Chronicle Queue:

  • supports asynchronous RMI and Publish/Subscribe interfaces with microsecond latencies.

  • passes messages between JVMs in under a microsecond (in optimised examples)

  • passes messages between JVMs on different machines via replication in under 10 microseconds (in optimised examples)

  • provides stable, soft, real-time latencies into the millions of messages per second for a single thread to one queue; with total ordering of every event.

When publishing 40-byte messages, a high percentage of the time we achieve latencies under 1 microsecond.The 99th percentile latency is the worst 1 in 100, and the 99.9th percentile is the worst 1 in 1000 latency.

Table 1. Latency to send/receive on the same machine.
Batch Size10 million events per minute60 million events per minute100 million events per minute

99%ile

0.78 µs

0.78 µs

1.2 µs

99.9%ile

1.2 µs

1.3 µs

1.5 µs

Table 2. Latency to send/receive on a second machine.
Batch Size10 million events per minute60 million events per minute100 million events per minute

99%ile

20 µs

28 µs

176 µs

99.9%ile

901 µs

705 µs

5,370 µs

Note
100 million events per minute is sending an event every 660 nanoseconds; replicated and persisted.
Important
This performance is not achieved using a large cluster of machines.This is using one thread to publish, and one thread to consume.

Downloading Chronicle Queue

Releases are available on Maven Central as:

<dependency>  <groupId>net.openhft</groupId>  <artifactId>chronicle-queue</artifactId>  <version><!--replace with the latest version, see below--></version></dependency>

Click here to get the Latest Version Number

Snapshots are available on https://oss.sonatype.org

Overview

Ths project covers the Java version of Chronicle Queue.Chronicle Queue is a persisted low-latency messaging framework for high performance and critical applications.A C++ version of this project is also available and can be evaluated upon request.If you are interested in looking at the C++ version please contact [email protected].

At first glance Chronicle Queue can be seen as simply another queue implementation.However, it has major design choices that should be emphasised.

Using non-heap storage options (RandomAccessFile), Chronicle Queue provides a processing environment where applications do not suffer from Garbage Collection (GC).When implementing high-performance and memory-intensive applications (you heard the fancy term "bigdata"?) in Java, one of the biggest problems is garbage collection.

Garbage collection may slow down your critical operations non-deterministically at any time.In order to avoid non-determinism, and escape from garbage collection delays, off-heap memory solutions are ideal.The main idea is to manage your memory manually so it does not suffer from garbage collection.Chronicle Queue behaves like a management interface over off-heap memory so you can build your own solutions over it.

Chronicle Queue uses RandomAccessFiles while managing memory and this choice brings lots of possibilities. RandomAccessFiles permit non-sequential, or random, access to a file’s contents.To access a file randomly, you open the file, seek a particular location, and read from or write to that file.RandomAccessFiles can be seen as "large" C-type byte arrays that you can access at any random index "directly" using pointers.File portions can be used as ByteBuffers if the portion is mapped into memory.

This memory mapped file is also used for exceptionally fast interprocess communication (IPC) without affecting your system performance.There is no garbage collection as everything is done off-heap.

Chronicle Queue (Enterprise Edition)

Chronicle Queue (Enterprise Edition) is a commercially supported version of our successful open source Chronicle Queue.

The open source documentation is extended by this document to describe the additional features that are available when you are licenced for Enterprise Edition.These are:

  • Encryption of message queues and messages.For more information see Encryption.

  • TCP/IP Replication between hosts to ensure real-time backup of all your queue data.For more information see Replication, the protocol is covered here.

  • Timezone support for daily queue rollover scheduling.For more information see Timezone support.

  • Ring Buffer support to give improved performance at high throughput on slower filesystems.For more information see Ring Buffer and also performance.

In addition, you will be fully supported by our technical experts.

For more information on Chronicle Queue (Enterprise Edition), please contact [email protected].

How does Chronicle Queue work

Terminology

  • Messages are grouped by topics.A topic can contain any number of sub-topics which are logically stored together under the queue/topic.

  • An appender is the source of messages.

  • A tailer is a receiver of messages.

  • Chronicle Queue is broker-less by default.You can use Chronicle Datagrid to act as a broker for remote access.

Note
We deliberately avoid the term consumer as messages are not consumed/destroyed by reading.

At a high level:

  • appenders write to the end of a queue.There is no way to insert, or delete excerpts.

  • tailers read the next available message each time they are called.

By using Chronicle Datagrid, a Java or C# client can publish to a queue to act as a remote appender, and you subscribe to a queue to act as a remote tailer.

Important note

Chronicle Queue does not support operating off any network file system, be it NFS, AFS, SAN-based storage or anything else.The reason for this is those file systems do not provide all the required primitives for memory-mapped files Chronicle Queue uses.

If any networking is needed (e.g. to make the data accessible to multiple hosts), the only supported way is Chronicle Queue Replication (Enterprise feature).

Topics and Queue files

Each topic is a directory of queues.There is a file for each roll cycle.If you have a topic called mytopic, the layout could look like this:

mytopic/    20160710.cq4    20160711.cq4    20160712.cq4    20160713.cq4

To copy all the data for a single day (or cycle), you can copy the file for that day on to your development machine for replay testing.

File Retention

You can add a StoreFileListener to notify you when a file is added, or no longer used.This can be used to delete files after a period of time.However, by default, files are retained forever.Our largest users have over 100 TB of data stored in queues.

The only thing each tailer retains is an index which is composed from:

  • a cycle number.For example, days since epoch, and

  • a sequence number within that cycle.

    In the case of a DAILY cycle, the sequence number is 32 bit and the index = ((long) cycle << 32) | sequenceNumber providing up to 4 billion entries per day.if more messages per day are anticipated, the XLARGE_DAILY cycle, for example, provides up 4 trillion entries per day using a 48-bit sequence number.

    Printing the index in hexadecimal is common in our libraries, to make it easier to see these two components.

Appenders and tailers are cheap as they don’t even require a TCP connection; they are just a few Java objects.

Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space.This is much more scalable than being limited to the amount of memory space that you have.You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory.

Restrictions on topics and messages.

Topics are limited to being strings which can be used as directory names.Within a topic, you can have sub-topics which can be any data type that can be serialized.Messages can be any serializable data.

Chronicle Queue supports:

  • Serializable objects, though this is to be avoided as it is not efficient

  • Externalizable objects is preferred if you wish to use standard Java APIs.

  • byte[] and String

  • Marshallable; a self describing message which can be written as YAML, Binary YAML, or JSON.

  • BytesMarshallable which is low-level binary, or text encoding.

Every Tailer sees every message.

An abstraction can be added to filter messages, or assign messages to just one message processor.However, in general you only need one main tailer for a topic, with possibly, some supporting tailers for monitoring etc.

As Chronicle Queue doesn’t partition its topics, you get total ordering of all messages within that topic.Across topics, there is no guarantee of ordering; if you want to replay deterministically from a system which consumes from multiple topics, we suggest replaying from that system’s output.

Replaying from the output, not the input.

It is common practice to replay a state machine from its inputs.To do this, there are two assumptions that you have to make; these are difficult to implement;

  • you have either just one input, or you can always determine the order the inputs were consumed,

  • you have not changed the software (or all the software is stored in the queue).

You can see from this that if you want to be able to upgrade your system, then you’ll want to replay from the output.

Replaying from the output means that;

  • you have a record of the order of the inputs that you processed

  • you have a record of all the decisions your new system is committed to; even if the new code would have made different decisions.

Guarantees

Chronicle Queue provides the following guarantees;

  • for each appender, messages are written in the order the appender wrote them.Messages by different appenders are interleaved,

  • for each tailer, it will see every message for a topic in the same order as every other tailer,

  • when replicated, every replica has a copy of every message.

Replication has three modes of operation;

  • replicate as soon as possible; < 1 millisecond in as many as 99.9% of cases,

  • a tailer will only see messages which have been replicated,

  • an appender doesn’t return until a replica has acknowledged it has been received.

Use Cases

Chronicle Queue is most often used for producer-centric systems where you need to retain a lot of data for days or years.

What is a producer-centric system?

Most messaging systems are consumer-centric.Flow control is implemented to avoid the consumer ever getting overloaded; even momentarily.A common example is a server supporting multiple GUI users.Those users might be on different machines (OS and hardware), different qualities of network (latency and bandwidth), doing a variety of other things at different times.For this reason it makes sense for the client consumer to tell the producer when to back off, delaying any data until the consumer is ready to take more data.

Chronicle Queue is a producer-centric solution and does everything possible to never push back on the producer, or tell it to slow down.This makes it a powerful tool, providing a big buffer between your system, and an upstream producer over which you have little, or no, control.

Market Data

Market data publishers don’t give you the option to push back on the producer for long; if at all.A few of our users consume data from CME OPRA. This produces peaks of 10 million events per minute, sent as UDP packets without any retry.If you miss, or drop a packet, then it is lost.You have to consume and record those packets as fast as they come to you, with very little buffering in the network adapter.

For market data in particular, real time means in a few microseconds; it doesn’t mean intra-day (during the day).

Chronicle Queue is fast and efficient, and has been used to increase the speed that data is passed between threads.In addition, it also keeps a record of every message passed allowing you to significantly reduce the amount of logging that you need to do.

Compliance Systems

Compliance Systems are required by more and more systems these days.Everyone has to have them, but no one wants to be slowed down by them.By using Chronicle Queue to buffer data between monitored systems and the compliance system, you don’t need to worry about the impact of compliance recording for your monitored systems.

Again, Chronicle Queue can support millions of events per-second, per-server, and access data which has been retained for years.

Latency Sensitive Micro-services

Chronicle Queue supports low latency IPC (Inter Process Communication) between JVMs on the same machine in the order of magnitude of 1 microsecond; as well as between machines with a typical latency of 10 microseconds for modest throughputs of a few hundred thousands.Chronicle Queue supports throughputs of millions of events per second, with stable microsecond latencies.

Metrics

Chronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it).

Log Replacement

As Chronicle Queue can be used to build state machines.All the information about the state of those components can be reproduced externally, without direct access to the components, or to their state.This significantly reduces the need for additional logging.

However, any logging you do need can be recorded in great detail.This makes enabling DEBUG logging in production practical.This is because the cost of logging is very low; less than 10 microseconds.Logs can be replicated centrally for log consolidation.

Chronicle Queue is being used to store 100+ TB of data, which can be replayed from any point in time.

Lambda Stream Processing

Non-batching streaming components are highly performant, deterministic, and reproducible.You can reproduce bugs which only show up after a million events played in a particular order, with accelerated realistic timings.

This makes using Stream Processing attractive for systems which need a high degree of quality outcomes.

Using Chronicle Queue

Chronicle Queue is designed to be driven from code.You can easily add an interface which suits your needs.

Note
Due to fairly low-level operation, Chronicle Queue read/write operations can throw unchecked exceptions.In order to prevent thread death, it might be practical to catch `RuntimeException`s and log/analyze them as appropriate.

Writing to a Queue

In Chronicle Queue we refer to the act of writing your data to the Chronicle queue, as storing an excerpt.This data could be made up from any data type, including text, numbers, or serialised blobs.Ultimately, all your data, regardless of what it is, is stored as a series of bytes.

Just before storing your excerpt, Chronicle Queue reserves a 4-byte header.Chronicle Queue writes the length of your data into this header.This way, when Chronicle Queue comes to read your excerpt, it knows how long each blob of data is.We refer to this 4-byte header, along with your excerpt, as a document.So strictly speaking Chronicle Queue can be used to read and write documents.

Note
Within this 4-byte header we also reserve a few bits for a number of internal operations, such as locking, to make Chronicle Queue thread-safe across both processors and threads.The important thing to note is that because of this, you can’t strictly convert the 4 bytes to an integer to find the length of your data blob.

To write data to a Chronicle-Queue, you must first create an Appender

try (ChronicleQueue queue = ChronicleQueue.singleBuilder(path + "/trades").build()) {   final ExcerptAppender appender = queue.acquireAppender();}

So, Chronicle Queue uses an Appender to write to the queue and a Tailer to read from the queue.Unlike other java queuing solutions, messages are not lost when they are read with a Tailer.This is covered in more detail in the section below on "Reading from a Queue".

Chronicle Queue uses the following low-level interface to write the data:

try (final DocumentContext dc = appender.writingDocument()) {      dc.wire().write().text(your text data);}

The close on the try-with-resources, is the point when the length of the data is written to the header.You can also use the DocumentContext to find out the index that your data has just been assigned (see below).You can later use this index to move-to/look up this excerpt.Each Chronicle Queue excerpt has a unique index.

try (final DocumentContext dc = appender.writingDocument()) {    dc.wire().write().text(your text data);    System.out.println("your data was store to index="+ dc.index());}

The high-level methods below such as writeText() are convenience methods on calling appender.writingDocument(), but both approaches essentially do the same thing.The actual code of writeText(CharSequence text) looks like this:

/** * @param text to write a message */void writeText(CharSequence text) {    try (DocumentContext dc = writingDocument()) {        dc.wire().bytes().append8bit(text);    }}

So you have a choice of a number of high-level interfaces, down to a low-level API, to raw memory.

This is the highest-level API which hides the fact you are writing to messaging at all.The benefit is that you can swap calls to the interface with a real component, or an interface to a different protocol.

// using the method writer interface.RiskMonitor riskMonitor = appender.methodWriter(RiskMonitor.class);final LocalDateTime now = LocalDateTime.now(Clock.systemUTC());riskMonitor.trade(new TradeDetails(now, "GBPUSD", 1.3095, 10e6, Side.Buy, "peter"));

You can write a "self-describing message".Such messages can support schema changes.They are also easier to understand when debugging or diagnosing problems.

// writing a self describing messageappender.writeDocument(w -> w.write("trade").marshallable(        m -> m.write("timestamp").dateTime(now)                .write("symbol").text("EURUSD")                .write("price").float64(1.1101)                .write("quantity").float64(15e6)                .write("side").object(Side.class, Side.Sell)                .write("trader").text("peter")));

You can write "raw data" which is self-describing.The types will always be correct; position is the only indication as to the meaning of those values.

// writing just dataappender.writeDocument(w -> w        .getValueOut().int32(0x123456)        .getValueOut().int64(0x999000999000L)        .getValueOut().text("Hello World"));

You can write "raw data" which is not self-describing.Your reader must know what this data means, and the types that were used.

// writing raw dataappender.writeBytes(b -> b        .writeByte((byte) 0x12)        .writeInt(0x345678)        .writeLong(0x999000999000L)        .writeUtf8("Hello World"));

Below, the lowest level way to write data is illustrated.You get an address to raw memory and you can write whatever you want.

// Unsafe low levelappender.writeBytes(b -> {    long address = b.address(b.writePosition());    Unsafe unsafe = UnsafeMemory.UNSAFE;    unsafe.putByte(address, (byte) 0x12);    address += 1;    unsafe.putInt(address, 0x345678);    address += 4;    unsafe.putLong(address, 0x999000999000L);    address += 8;    byte[] bytes = "Hello World".getBytes(StandardCharsets.ISO_8859_1);    unsafe.copyMemory(bytes, Jvm.arrayByteBaseOffset(), null, address, bytes.length);    b.writeSkip(1 + 4 + 8 + bytes.length);});

You can print the contents of the queue.You can see the first two, and last two messages store the same data.

// dump the content of the queueSystem.out.println(queue.dump());

prints:

# position: 262568, header: 0--- !!data #binarytrade: {  timestamp: 2016-07-17T15:18:41.141,  symbol: GBPUSD,  price: 1.3095,  quantity: 10000000.0,  side: Buy,  trader: peter}# position: 262684, header: 1--- !!data #binarytrade: {  timestamp: 2016-07-17T15:18:41.141,  symbol: EURUSD,  price: 1.1101,  quantity: 15000000.0,  side: Sell,  trader: peter}# position: 262800, header: 2--- !!data #binary!int 1193046168843764404224Hello World# position: 262830, header: 3--- !!data #binary000402b0       12 78 56 34 00 00  90 99 00 90 99 00 00 0B   ·xV4·· ········000402c0 48 65 6C 6C 6F 20 57 6F  72 6C 64                Hello Wo rld# position: 262859, header: 4--- !!data #binary000402c0                                               12                 ·000402d0 78 56 34 00 00 90 99 00  90 99 00 00 0B 48 65 6C xV4····· ·····Hel000402e0 6C 6F 20 57 6F 72 6C 64                          lo World

Finding the index at the end of a Chronicle Queue

Chronicle Queue appenders are thread-local.In fact when you ask for:

final ExcerptAppender appender = queue.acquireAppender();

the acquireAppender() uses a thread-local pool to give you an appender which will be reused to reduce object creation.

As such, the method call to:

long index = appender.lastIndexAppended();

will only give you the last index appended by this appender; not the last index appended by any appender.

If you wish to find the index of the last record written, then you have to call:

long index = queue.createTailer().toEnd().index() - 1;

(note that tailer.toEnd() puts the tailer just past the last index written)

How to determine how many messages are between two indexes

to count the number of messages between two indexes you can use:

((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
Note
You should avoid calling this method on latency sensitive code, because if the indexes are in different cycles this method may have to access the .cq4 files from the file system.

for more information on this see :

net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts

How to write 10 messages, then move to the 5th message to read it.

@Testpublic void read5thMessageTest() {    try (final ChronicleQueue queue = singleBuilder(getTmpDir()).build()) {        final ExcerptAppender appender = queue.acquireAppender();        int i = 0;        for (int j = 0; j < 10; j++) {            try (DocumentContext dc = appender.writingDocument()) {                dc.wire().write("hello").text("world " + (i++));                long indexWritten = dc.index();            }        }        // get the current cycle        int cycle;        final ExcerptTailer tailer = queue.createTailer();        try (DocumentContext documentContext = tailer.readingDocument()) {            long index = documentContext.index();            cycle = queue.rollCycle().toCycle(index);        }        long index = queue.rollCycle().toIndex(cycle, 5);        tailer.moveToIndex(index);        try (DocumentContext dc = tailer.readingDocument()) {            System.out.println(dc.wire().read("hello").text());        } }}

Dumping a Chronicle Queue, cq4 file as text to the Command Line

Chronicle Queue stores its data in binary format, with a file extension of cq4:

\�@πheader∂SCQStoreÇE��»wireType∂WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂SCQSRollÇ*���∆length¶ÄÓ6�∆formatÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•�ÃindexSpacingÀindex2Indexé����ß��������…lastIndexé����ß��������fllastAcknowledgedIndexReplicatedé�����ߡˇˇˇˇˇˇˇ»recovery∂TimedStoreRecoveryÇ���…timeStampèèèß����������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

This can often be a bit difficult to read, so it is better to dump the cq4 files as text.This can also help you fix your production issues, as it gives you the visibility as to what has been stored in the queue, and in what order.

The example below shows how to read a cq4 file from the command line:

You have to use the chronicle-queue.jar, from any version 4.5.3 or later, and set up the dependent files in the class path.To find out which version of jars to include please refer to the chronicle-bom.

Once you have the dependent jars on the class path (like below):

$ ls -ltrtotal 9920-rw-r--r--  1 robaustin  staff   112557 28 Jul 14:52 chronicle-queue-4.5.5.jar-rw-r--r--  1 robaustin  staff   209268 28 Jul 14:53 chronicle-bytes-1.7.3.jar-rw-r--r--  1 robaustin  staff   136434 28 Jul 14:56 chronicle-core-1.7.3.jar-rw-r--r--  1 robaustin  staff    33562 28 Jul 15:03 slf4j-api-1.7.14.jar-rw-r--r--  1 robaustin  staff   324302 28 Jul 15:04 chronicle-wire-1.7.5.jar-rw-r--r--  1 robaustin  staff    35112 28 Jul 15:05 chronicle-threads-1.7.2.jar-rw-r--r--  1 robaustin  staff  4198400 28 Jul 15:05 19700101-02.cq4

you can run, from the command line

$ java -cp chronicle-queue-4.5.5.jar net.openhft.chronicle.queue.DumpQueueMain 19700101-02.cq4

this will dump the 19700101-02.cq4 file out as text, as shown below:

!!meta-data #binaryheader: !SCQStore {  wireType: !WireType BINARY,  writePosition: 0,  roll: !SCQSRoll {    length: !int 3600000,    format: yyyyMMdd-HH,
                      

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap