在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:Chronicle-Queue开源软件地址:https://gitee.com/chaotic1988/Chronicle-Queue开源软件介绍:Chronicle QueueContentsTable of Contents
What is Chronicle QueueWhat is Chronicle Queue
Chronicle Queue is a distributed unbounded persisted queue.Chronicle Queue:
When publishing 40-byte messages, a high percentage of the time we achieve latencies under 1 microsecond.The 99th percentile latency is the worst 1 in 100, and the 99.9th percentile is the worst 1 in 1000 latency.
Downloading Chronicle QueueReleases are available on Maven Central as: <dependency> <groupId>net.openhft</groupId> <artifactId>chronicle-queue</artifactId> <version><!--replace with the latest version, see below--></version></dependency> Click here to get the Latest Version Number Snapshots are available on https://oss.sonatype.org OverviewThs project covers the Java version of Chronicle Queue.Chronicle Queue is a persisted low-latency messaging framework for high performance and critical applications.A C++ version of this project is also available and can be evaluated upon request.If you are interested in looking at the C++ version please contact [email protected]. At first glance Chronicle Queue can be seen as simply another queue implementation.However, it has major design choices that should be emphasised. Using non-heap storage options ( Garbage collection may slow down your critical operations non-deterministically at any time.In order to avoid non-determinism, and escape from garbage collection delays, off-heap memory solutions are ideal.The main idea is to manage your memory manually so it does not suffer from garbage collection.Chronicle Queue behaves like a management interface over off-heap memory so you can build your own solutions over it. Chronicle Queue uses This memory mapped file is also used for exceptionally fast interprocess communication (IPC) without affecting your system performance.There is no garbage collection as everything is done off-heap. Chronicle Queue (Enterprise Edition)Chronicle Queue (Enterprise Edition) is a commercially supported version of our successful open source Chronicle Queue. The open source documentation is extended by this document to describe the additional features that are available when you are licenced for Enterprise Edition.These are:
In addition, you will be fully supported by our technical experts. For more information on Chronicle Queue (Enterprise Edition), please contact [email protected]. How does Chronicle Queue workTerminology
At a high level:
By using Chronicle Datagrid, a Java or C# client can publish to a queue to act as a remote appender, and you subscribe to a queue to act as a remote tailer. Important noteChronicle Queue does not support operating off any network file system, be it NFS, AFS, SAN-based storage or anything else.The reason for this is those file systems do not provide all the required primitives for memory-mapped files Chronicle Queue uses. If any networking is needed (e.g. to make the data accessible to multiple hosts), the only supported way is Chronicle Queue Replication (Enterprise feature). Topics and Queue filesEach topic is a directory of queues.There is a file for each roll cycle.If you have a topic called mytopic/ 20160710.cq4 20160711.cq4 20160712.cq4 20160713.cq4 To copy all the data for a single day (or cycle), you can copy the file for that day on to your development machine for replay testing. File RetentionYou can add a The only thing each tailer retains is an index which is composed from:
Appenders and tailers are cheap as they don’t even require a TCP connection; they are just a few Java objects. Rather than partition the queue files across servers, we support each server, storing as much data as you have disk space.This is much more scalable than being limited to the amount of memory space that you have.You can buy a redundant pair of 6TB of enterprise disks very much more cheaply than 6TB of memory. Restrictions on topics and messages.Topics are limited to being strings which can be used as directory names.Within a topic, you can have sub-topics which can be any data type that can be serialized.Messages can be any serializable data. Chronicle Queue supports:
Every Tailer sees every message.An abstraction can be added to filter messages, or assign messages to just one message processor.However, in general you only need one main tailer for a topic, with possibly, some supporting tailers for monitoring etc. As Chronicle Queue doesn’t partition its topics, you get total ordering of all messages within that topic.Across topics, there is no guarantee of ordering; if you want to replay deterministically from a system which consumes from multiple topics, we suggest replaying from that system’s output. Replaying from the output, not the input.It is common practice to replay a state machine from its inputs.To do this, there are two assumptions that you have to make; these are difficult to implement;
You can see from this that if you want to be able to upgrade your system, then you’ll want to replay from the output. Replaying from the output means that;
GuaranteesChronicle Queue provides the following guarantees;
Replication has three modes of operation;
Use CasesChronicle Queue is most often used for producer-centric systems where you need to retain a lot of data for days or years. What is a producer-centric system?Most messaging systems are consumer-centric.Flow control is implemented to avoid the consumer ever getting overloaded; even momentarily.A common example is a server supporting multiple GUI users.Those users might be on different machines (OS and hardware), different qualities of network (latency and bandwidth), doing a variety of other things at different times.For this reason it makes sense for the client consumer to tell the producer when to back off, delaying any data until the consumer is ready to take more data. Chronicle Queue is a producer-centric solution and does everything possible to never push back on the producer, or tell it to slow down.This makes it a powerful tool, providing a big buffer between your system, and an upstream producer over which you have little, or no, control. Market DataMarket data publishers don’t give you the option to push back on the producer for long; if at all.A few of our users consume data from CME OPRA. This produces peaks of 10 million events per minute, sent as UDP packets without any retry.If you miss, or drop a packet, then it is lost.You have to consume and record those packets as fast as they come to you, with very little buffering in the network adapter. For market data in particular, real time means in a few microseconds; it doesn’t mean intra-day (during the day). Chronicle Queue is fast and efficient, and has been used to increase the speed that data is passed between threads.In addition, it also keeps a record of every message passed allowing you to significantly reduce the amount of logging that you need to do. Compliance SystemsCompliance Systems are required by more and more systems these days.Everyone has to have them, but no one wants to be slowed down by them.By using Chronicle Queue to buffer data between monitored systems and the compliance system, you don’t need to worry about the impact of compliance recording for your monitored systems. Again, Chronicle Queue can support millions of events per-second, per-server, and access data which has been retained for years. Latency Sensitive Micro-servicesChronicle Queue supports low latency IPC (Inter Process Communication) between JVMs on the same machine in the order of magnitude of 1 microsecond; as well as between machines with a typical latency of 10 microseconds for modest throughputs of a few hundred thousands.Chronicle Queue supports throughputs of millions of events per second, with stable microsecond latencies. MetricsChronicle Queue can be monitored to obtain latency, throughput, and activity metrics, in real time (that is, within microseconds of the event triggering it). Log ReplacementAs Chronicle Queue can be used to build state machines.All the information about the state of those components can be reproduced externally, without direct access to the components, or to their state.This significantly reduces the need for additional logging. However, any logging you do need can be recorded in great detail.This makes enabling Chronicle Queue is being used to store 100+ TB of data, which can be replayed from any point in time. Lambda Stream ProcessingNon-batching streaming components are highly performant, deterministic, and reproducible.You can reproduce bugs which only show up after a million events played in a particular order, with accelerated realistic timings. This makes using Stream Processing attractive for systems which need a high degree of quality outcomes. Using Chronicle QueueChronicle Queue is designed to be driven from code.You can easily add an interface which suits your needs.
Writing to a QueueIn Chronicle Queue we refer to the act of writing your data to the Chronicle queue, as storing an excerpt.This data could be made up from any data type, including text, numbers, or serialised blobs.Ultimately, all your data, regardless of what it is, is stored as a series of bytes. Just before storing your excerpt, Chronicle Queue reserves a 4-byte header.Chronicle Queue writes the length of your data into this header.This way, when Chronicle Queue comes to read your excerpt, it knows how long each blob of data is.We refer to this 4-byte header, along with your excerpt, as a document.So strictly speaking Chronicle Queue can be used to read and write documents.
To write data to a Chronicle-Queue, you must first create an Appender try (ChronicleQueue queue = ChronicleQueue.singleBuilder(path + "/trades").build()) { final ExcerptAppender appender = queue.acquireAppender();} So, Chronicle Queue uses an Appender to write to the queue and a Tailer to read from the queue.Unlike other java queuing solutions, messages are not lost when they are read with a Tailer.This is covered in more detail in the section below on "Reading from a Queue". Chronicle Queue uses the following low-level interface to write the data: try (final DocumentContext dc = appender.writingDocument()) { dc.wire().write().text(“your text data“);} The close on the try-with-resources, is the point when the length of the data is written to the header.You can also use the try (final DocumentContext dc = appender.writingDocument()) { dc.wire().write().text(“your text data“); System.out.println("your data was store to index="+ dc.index());} The high-level methods below such as /** * @param text to write a message */void writeText(CharSequence text) { try (DocumentContext dc = writingDocument()) { dc.wire().bytes().append8bit(text); }} So you have a choice of a number of high-level interfaces, down to a low-level API, to raw memory. This is the highest-level API which hides the fact you are writing to messaging at all.The benefit is that you can swap calls to the interface with a real component, or an interface to a different protocol. // using the method writer interface.RiskMonitor riskMonitor = appender.methodWriter(RiskMonitor.class);final LocalDateTime now = LocalDateTime.now(Clock.systemUTC());riskMonitor.trade(new TradeDetails(now, "GBPUSD", 1.3095, 10e6, Side.Buy, "peter")); You can write a "self-describing message".Such messages can support schema changes.They are also easier to understand when debugging or diagnosing problems. // writing a self describing messageappender.writeDocument(w -> w.write("trade").marshallable( m -> m.write("timestamp").dateTime(now) .write("symbol").text("EURUSD") .write("price").float64(1.1101) .write("quantity").float64(15e6) .write("side").object(Side.class, Side.Sell) .write("trader").text("peter"))); You can write "raw data" which is self-describing.The types will always be correct; position is the only indication as to the meaning of those values. // writing just dataappender.writeDocument(w -> w .getValueOut().int32(0x123456) .getValueOut().int64(0x999000999000L) .getValueOut().text("Hello World")); You can write "raw data" which is not self-describing.Your reader must know what this data means, and the types that were used. // writing raw dataappender.writeBytes(b -> b .writeByte((byte) 0x12) .writeInt(0x345678) .writeLong(0x999000999000L) .writeUtf8("Hello World")); Below, the lowest level way to write data is illustrated.You get an address to raw memory and you can write whatever you want. // Unsafe low levelappender.writeBytes(b -> { long address = b.address(b.writePosition()); Unsafe unsafe = UnsafeMemory.UNSAFE; unsafe.putByte(address, (byte) 0x12); address += 1; unsafe.putInt(address, 0x345678); address += 4; unsafe.putLong(address, 0x999000999000L); address += 8; byte[] bytes = "Hello World".getBytes(StandardCharsets.ISO_8859_1); unsafe.copyMemory(bytes, Jvm.arrayByteBaseOffset(), null, address, bytes.length); b.writeSkip(1 + 4 + 8 + bytes.length);}); You can print the contents of the queue.You can see the first two, and last two messages store the same data. // dump the content of the queueSystem.out.println(queue.dump()); prints: # position: 262568, header: 0--- !!data #binarytrade: { timestamp: 2016-07-17T15:18:41.141, symbol: GBPUSD, price: 1.3095, quantity: 10000000.0, side: Buy, trader: peter}# position: 262684, header: 1--- !!data #binarytrade: { timestamp: 2016-07-17T15:18:41.141, symbol: EURUSD, price: 1.1101, quantity: 15000000.0, side: Sell, trader: peter}# position: 262800, header: 2--- !!data #binary!int 1193046168843764404224Hello World# position: 262830, header: 3--- !!data #binary000402b0 12 78 56 34 00 00 90 99 00 90 99 00 00 0B ·xV4·· ········000402c0 48 65 6C 6C 6F 20 57 6F 72 6C 64 Hello Wo rld# position: 262859, header: 4--- !!data #binary000402c0 12 ·000402d0 78 56 34 00 00 90 99 00 90 99 00 00 0B 48 65 6C xV4····· ·····Hel000402e0 6C 6F 20 57 6F 72 6C 64 lo World Finding the index at the end of a Chronicle QueueChronicle Queue appenders are thread-local.In fact when you ask for: final ExcerptAppender appender = queue.acquireAppender(); the As such, the method call to: long index = appender.lastIndexAppended(); will only give you the last index appended by this appender; not the last index appended by any appender. If you wish to find the index of the last record written, then you have to call: long index = queue.createTailer().toEnd().index() - 1; (note that How to determine how many messages are between two indexesto count the number of messages between two indexes you can use: ((SingleChronicleQueue)queue).countExcerpts(<firstIndex>,<lastIndex>);
for more information on this see : net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.countExcerpts How to write 10 messages, then move to the 5th message to read it.@Testpublic void read5thMessageTest() { try (final ChronicleQueue queue = singleBuilder(getTmpDir()).build()) { final ExcerptAppender appender = queue.acquireAppender(); int i = 0; for (int j = 0; j < 10; j++) { try (DocumentContext dc = appender.writingDocument()) { dc.wire().write("hello").text("world " + (i++)); long indexWritten = dc.index(); } } // get the current cycle int cycle; final ExcerptTailer tailer = queue.createTailer(); try (DocumentContext documentContext = tailer.readingDocument()) { long index = documentContext.index(); cycle = queue.rollCycle().toCycle(index); } long index = queue.rollCycle().toIndex(cycle, 5); tailer.moveToIndex(index); try (DocumentContext dc = tailer.readingDocument()) { System.out.println(dc.wire().read("hello").text()); } }} Dumping a Chronicle Queue, cq4 file as text to the Command LineChronicle Queue stores its data in binary format, with a file extension of \�@πheader∂SCQStoreÇE��»wireType∂WireTypeÊBINARYÕwritePositionèèèèß��������ƒroll∂SCQSRollÇ*���∆length¶ÄÓ6�∆formatÎyyyyMMdd-HH≈epoch¶ÄÓ6�»indexing∂SCQSIndexingÇN��� indexCount•�ÃindexSpacingÀindex2Indexé����ß��������…lastIndexé����ß��������fllastAcknowledgedIndexReplicatedé�����ߡˇˇˇˇˇˇˇ»recovery∂TimedStoreRecoveryÇ���…timeStampèèèß���������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������� This can often be a bit difficult to read, so it is better to dump the The example below shows how to read a You have to use the Once you have the dependent jars on the class path (like below): $ ls -ltrtotal 9920-rw-r--r-- 1 robaustin staff 112557 28 Jul 14:52 chronicle-queue-4.5.5.jar-rw-r--r-- 1 robaustin staff 209268 28 Jul 14:53 chronicle-bytes-1.7.3.jar-rw-r--r-- 1 robaustin staff 136434 28 Jul 14:56 chronicle-core-1.7.3.jar-rw-r--r-- 1 robaustin staff 33562 28 Jul 15:03 slf4j-api-1.7.14.jar-rw-r--r-- 1 robaustin staff 324302 28 Jul 15:04 chronicle-wire-1.7.5.jar-rw-r--r-- 1 robaustin staff 35112 28 Jul 15:05 chronicle-threads-1.7.2.jar-rw-r--r-- 1 robaustin staff 4198400 28 Jul 15:05 19700101-02.cq4 you can run, from the command line $ java -cp chronicle-queue-4.5.5.jar net.openhft.chronicle.queue.DumpQueueMain 19700101-02.cq4 this will dump the !!meta-data #binaryheader: !SCQStore { wireType: !WireType BINARY, writePosition: 0, roll: !SCQSRoll { length: !int 3600000, format: yyyyMMdd-HH, |
请发表评论