本文整理汇总了Java中io.aeron.Publication类的典型用法代码示例。如果您正苦于以下问题:Java Publication类的具体用法?Java Publication怎么用?Java Publication使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Publication类属于io.aeron包,在下文中一共展示了Publication类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: awaitConnection
import io.aeron.Publication; //导入依赖的package包/类
private static void awaitConnection(final Publication publication, final long timeout, final TimeUnit unit) {
if (publication == null) {
throw new IllegalStateException("not started");
}
final long millis = unit.toMillis(timeout);
long wait = millis;
while (!publication.isConnected() && wait > 0) {
try {
Thread.sleep(Math.min(100, wait));
wait -= 100;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
if (!publication.isConnected()) {
throw new RuntimeException("not connected after " + timeout + " " + unit);
}
}
开发者ID:terzerm,项目名称:fx-highway,代码行数:20,代码来源:AeronPublisher.java
示例2: ControlSession
import io.aeron.Publication; //导入依赖的package包/类
ControlSession(
final long controlSessionId,
final long correlationId,
final ControlSessionDemuxer demuxer,
final Publication controlPublication,
final ArchiveConductor conductor,
final EpochClock epochClock,
final ControlResponseProxy controlResponseProxy)
{
this.controlSessionId = controlSessionId;
this.correlationId = correlationId;
this.demuxer = demuxer;
this.controlPublication = controlPublication;
this.conductor = conductor;
this.epochClock = epochClock;
this.controlResponseProxy = controlResponseProxy;
}
开发者ID:real-logic,项目名称:aeron,代码行数:18,代码来源:ControlSession.java
示例3: send
import io.aeron.Publication; //导入依赖的package包/类
private void send(final int length)
{
final int fullLength = MessageHeaderEncoder.ENCODED_LENGTH + length;
while (true)
{
// TODO: Under back pressure it should drop sends and then do an update on timeout to avoid tail loss.
final long result = recordingEventsPublication.offer(outboundBuffer, 0, fullLength);
if (result > 0 || result == Publication.NOT_CONNECTED)
{
idleStrategy.reset();
break;
}
if (result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED)
{
throw new IllegalStateException();
}
idleStrategy.idle();
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:22,代码来源:RecordingEventsProxy.java
示例4: onFragment
import io.aeron.Publication; //导入依赖的package包/类
public boolean onFragment(final UnsafeBuffer buffer, final int offset, final int length)
{
if (state != State.REPLAY)
{
return false;
}
final int frameOffset = offset - DataHeaderFlyweight.HEADER_LENGTH;
final int frameType = frameType(buffer, frameOffset);
final long result = frameType == FrameDescriptor.PADDING_FRAME_TYPE ?
replayPublication.appendPadding(length) :
replayFrame(buffer, offset, length, frameOffset);
if (result > 0)
{
return true;
}
else if (result == Publication.CLOSED || result == Publication.NOT_CONNECTED)
{
closeOnError(null, "replay stream has been shutdown mid-replay");
}
return false;
}
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:ReplaySession.java
示例5: sendResponse
import io.aeron.Publication; //导入依赖的package包/类
boolean sendResponse(
final long controlSessionId,
final long correlationId,
final long relevantId,
final ControlResponseCode code,
final String errorMessage,
final Publication controlPublication)
{
responseEncoder
.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
.controlSessionId(controlSessionId)
.correlationId(correlationId)
.relevantId(relevantId)
.code(code)
.errorMessage(null == errorMessage ? "" : errorMessage);
return send(controlPublication, buffer, HEADER_LENGTH + responseEncoder.encodedLength());
}
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ControlResponseProxy.java
示例6: attemptErrorResponse
import io.aeron.Publication; //导入依赖的package包/类
void attemptErrorResponse(
final long controlSessionId,
final long correlationId,
final String errorMessage,
final Publication controlPublication)
{
responseEncoder
.wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
.controlSessionId(controlSessionId)
.correlationId(correlationId)
.relevantId(0)
.code(ControlResponseCode.ERROR)
.errorMessage(null == errorMessage ? "" : errorMessage);
final int length = HEADER_LENGTH + responseEncoder.encodedLength();
for (int i = 0; i < 3; i++)
{
final long result = controlPublication.offer(buffer, 0, length);
if (result > 0)
{
break;
}
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:ControlResponseProxy.java
示例7: publishDataToRecorded
import io.aeron.Publication; //导入依赖的package包/类
private void publishDataToRecorded(final Publication publication, final int messageCount)
{
startPosition = publication.position();
buffer.setMemory(0, 1024, (byte)'z');
for (int i = 0; i < messageCount; i++)
{
final int messageLength = 64 + (rnd.nextInt((MAX_FRAGMENT_SIZE - 64) / 4) * 4);
totalPayloadLength += messageLength;
buffer.putInt(0, i, LITTLE_ENDIAN);
buffer.putInt(messageLength - 4, i, LITTLE_ENDIAN);
offer(publication, buffer, messageLength);
}
expectedRecordingLength = publication.position() - startPosition;
}
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ArchiveReplayLoadTest.java
示例8: offer
import io.aeron.Publication; //导入依赖的package包/类
public static void offer(final Publication publication, final UnsafeBuffer buffer, final int length)
{
await(
() ->
{
final long result = publication.offer(buffer, 0, length);
if (result > 0)
{
return true;
}
else if (result == Publication.ADMIN_ACTION || result == Publication.BACK_PRESSURED)
{
return false;
}
throw new IllegalStateException("Unexpected return code: " + result);
});
}
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:TestUtil.java
示例9: startPong
import io.aeron.Publication; //导入依赖的package包/类
private static Thread startPong(final String embeddedDirName)
{
return new Thread(() ->
{
System.out.println("Subscribing Ping at " + PING_CHANNEL + " on stream Id " + PING_STREAM_ID);
System.out.println("Publishing Pong at " + PONG_CHANNEL + " on stream Id " + PONG_STREAM_ID);
final Aeron.Context ctx = new Aeron.Context().aeronDirectoryName(embeddedDirName);
try (Aeron aeron = Aeron.connect(ctx);
Publication pongPublication = aeron.addPublication(PONG_CHANNEL, PONG_STREAM_ID);
Subscription pingSubscription = aeron.addSubscription(PING_CHANNEL, PING_STREAM_ID))
{
final FragmentAssembler dataHandler = new FragmentAssembler(
(buffer, offset, length, header) -> pingHandler(pongPublication, buffer, offset, length));
while (RUNNING.get())
{
PING_HANDLER_IDLE_STRATEGY.idle(pingSubscription.poll(dataHandler, FRAME_COUNT_LIMIT));
}
System.out.println("Shutting down...");
}
});
}
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:EmbeddedPingPong.java
示例10: main
import io.aeron.Publication; //导入依赖的package包/类
public static void main(final String... args) throws Exception {
final String aeronDirectoryName = args[0];
final String channel = args[1];
final int streamId = Integer.parseInt(args[2]);
final long messageCount = Long.parseLong(args[3]);
final long messagesPerSecond = Long.parseLong(args[4]);
final int marketDataDepth = Integer.parseInt(args[5]);
System.out.println("Started " + AeronPublisher.class.getSimpleName() + ":");
System.out.println("\tmessageCount : " + messageCount);
System.out.println("\tchannel : " + channel);
System.out.println("\tstreamId : " + streamId);
System.out.println("\tmessagesPerSecond : " + messagesPerSecond);
System.out.println("\tmarketDataDepth : " + marketDataDepth);
System.out.println("\tmessageSize : " + encode(new UnsafeBuffer(new byte[1024]), givenMarketDataSnapshot(new ImmutableMarketDataSnapshot.Builder(), marketDataDepth, marketDataDepth)) + " bytes");
System.out.println();
final Aeron aeron = aeron(aeronDirectoryName);
final Publication publication = aeron.addPublication(channel, streamId);
try {
awaitConnection(publication, 5, TimeUnit.SECONDS);
run(publication, messageCount, messagesPerSecond, marketDataDepth);
} finally {
publication.close();
aeron.close();
System.out.println("Shutdown " + AeronPublisher.class.getSimpleName() + "...");
}
}
开发者ID:terzerm,项目名称:fx-highway,代码行数:29,代码来源:AeronPublisher.java
示例11: run
import io.aeron.Publication; //导入依赖的package包/类
private static void run(final Publication publication, final long messageCount, final long messagesPerSecond, final int marketDataDepth) throws InterruptedException {
final NanoClock clock = new SystemNanoClock();
final MutableMarketDataSnapshot snapshot = new MutableMarketDataSnapshot();
final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(4096));
final long periodNs = 1000000000/messagesPerSecond;
Thread.sleep(2000);//make sure the subscriber is ready
long cntAdmin = 0;
long cntBackp = 0;
long cnt = 0;
final long t0 = clock.nanoTime();
while (cnt < messageCount) {
long tCur = clock.nanoTime();
while (tCur - t0 < cnt * periodNs) {
tCur = clock.nanoTime();
}
final MarketDataSnapshot newSnapshot = SerializerHelper.givenMarketDataSnapshot(snapshot.builder(), marketDataDepth, marketDataDepth);
final int len = SerializerHelper.encode(unsafeBuffer, newSnapshot);
long pubres;
do {
pubres = publication.offer(unsafeBuffer, 0, len);
if (pubres < 0) {
if (pubres == Publication.BACK_PRESSURED) {
cntBackp++;
} else if (pubres == Publication.ADMIN_ACTION) {
cntAdmin++;
} else {
throw new RuntimeException("publication failed with pubres=" + pubres);
}
}
} while (pubres < 0);
cnt++;
}
final long t1 = clock.nanoTime();
System.out.println((t1 - t0) / 1000.0 + " us total publishing time (backp=" + cntBackp + ", admin=" + cntAdmin + ", cnt=" + cnt + ")");
}
开发者ID:terzerm,项目名称:fx-highway,代码行数:36,代码来源:AeronPublisher.java
示例12: ArchiveProxy
import io.aeron.Publication; //导入依赖的package包/类
/**
* Create a proxy with a {@link Publication} for sending control message requests.
* <p>
* This provides a default {@link IdleStrategy} of a {@link YieldingIdleStrategy} when offers are back pressured
* with a defaults of {@link AeronArchive.Configuration#MESSAGE_TIMEOUT_DEFAULT_NS} and
* {@link #DEFAULT_RETRY_ATTEMPTS}.
*
* @param publication publication for sending control messages to an archive.
*/
public ArchiveProxy(final Publication publication)
{
this(
publication,
new YieldingIdleStrategy(),
new SystemNanoClock(),
MESSAGE_TIMEOUT_DEFAULT_NS,
DEFAULT_RETRY_ATTEMPTS);
}
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ArchiveProxy.java
示例13: offer
import io.aeron.Publication; //导入依赖的package包/类
private boolean offer(final int length)
{
retryIdleStrategy.reset();
int attempts = retryAttempts;
while (true)
{
final long result;
if ((result = publication.offer(buffer, 0, MessageHeaderEncoder.ENCODED_LENGTH + length)) > 0)
{
return true;
}
if (result == Publication.CLOSED)
{
throw new IllegalStateException("Connection to the archive has been closed");
}
if (result == Publication.NOT_CONNECTED)
{
throw new IllegalStateException("Connection to the archive is no longer available");
}
if (result == Publication.MAX_POSITION_EXCEEDED)
{
throw new IllegalStateException("Publication failed due to max position being reached");
}
if (--attempts <= 0)
{
return false;
}
retryIdleStrategy.idle();
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:37,代码来源:ArchiveProxy.java
示例14: offerWithTimeout
import io.aeron.Publication; //导入依赖的package包/类
private boolean offerWithTimeout(final int length, final AgentInvoker aeronClientInvoker)
{
retryIdleStrategy.reset();
final long deadlineNs = nanoClock.nanoTime() + connectTimeoutNs;
while (true)
{
final long result;
if ((result = publication.offer(buffer, 0, MessageHeaderEncoder.ENCODED_LENGTH + length)) > 0)
{
return true;
}
if (null != aeronClientInvoker)
{
aeronClientInvoker.invoke();
}
if (result == Publication.CLOSED)
{
throw new IllegalStateException("Connection to the archive has been closed");
}
if (result == Publication.MAX_POSITION_EXCEEDED)
{
throw new IllegalStateException("Publication failed due to max position being reached");
}
if (nanoClock.nanoTime() > deadlineNs)
{
return false;
}
retryIdleStrategy.idle();
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:37,代码来源:ArchiveProxy.java
示例15: sendDescriptor
import io.aeron.Publication; //导入依赖的package包/类
int sendDescriptor(
final long controlSessionId,
final long correlationId,
final UnsafeBuffer descriptorBuffer,
final Publication controlPublication)
{
final int length = Catalog.descriptorLength(descriptorBuffer);
for (int i = 0; i < 3; i++)
{
final long result = controlPublication.tryClaim(length, bufferClaim);
if (result > 0)
{
final MutableDirectBuffer buffer = bufferClaim.buffer();
final int bufferOffset = bufferClaim.offset();
final int contentOffset = bufferOffset + HEADER_LENGTH + recordingIdEncodingOffset();
final int contentLength = length - recordingIdEncodingOffset() - HEADER_LENGTH;
recordingDescriptorEncoder
.wrapAndApplyHeader(buffer, bufferOffset, messageHeaderEncoder)
.controlSessionId(controlSessionId)
.correlationId(correlationId);
buffer.putBytes(contentOffset, descriptorBuffer, DESCRIPTOR_CONTENT_OFFSET, contentLength);
bufferClaim.commit();
return length;
}
checkResult(controlPublication, result);
}
return 0;
}
开发者ID:real-logic,项目名称:aeron,代码行数:36,代码来源:ControlResponseProxy.java
示例16: send
import io.aeron.Publication; //导入依赖的package包/类
private boolean send(final Publication controlPublication, final DirectBuffer buffer, final int length)
{
for (int i = 0; i < 3; i++)
{
final long result = controlPublication.offer(buffer, 0, length);
if (result > 0)
{
return true;
}
checkResult(controlPublication, result);
}
return false;
}
开发者ID:real-logic,项目名称:aeron,代码行数:16,代码来源:ControlResponseProxy.java
示例17: checkResult
import io.aeron.Publication; //导入依赖的package包/类
private static void checkResult(final Publication controlPublication, final long result)
{
if (result == Publication.NOT_CONNECTED ||
result == Publication.CLOSED ||
result == Publication.MAX_POSITION_EXCEEDED)
{
throw new IllegalStateException("Response channel is down: " + controlPublication.channel());
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:10,代码来源:ControlResponseProxy.java
示例18: replay
import io.aeron.Publication; //导入依赖的package包/类
@Test(timeout = TEST_DURATION_SEC * 2000)
public void replay() throws InterruptedException
{
final String channel = archive.context().recordingEventsChannel();
final int streamId = archive.context().recordingEventsStreamId();
try (Publication publication = aeron.addPublication(PUBLISH_URI, PUBLISH_STREAM_ID);
Subscription recordingEvents = aeron.addSubscription(channel, streamId))
{
await(recordingEvents::isConnected);
aeronArchive.startRecording(PUBLISH_URI, PUBLISH_STREAM_ID, SourceLocation.LOCAL);
await(publication::isConnected);
final CountDownLatch recordingStopped = prepAndSendMessages(recordingEvents, publication);
assertNull(trackerError);
recordingStopped.await();
aeronArchive.stopRecording(PUBLISH_URI, PUBLISH_STREAM_ID);
assertNull(trackerError);
assertNotEquals(-1L, recordingId);
assertEquals(expectedRecordingLength, recordedLength);
}
final long deadlineMs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_DURATION_SEC);
int i = 0;
while (System.currentTimeMillis() < deadlineMs)
{
final long start = System.currentTimeMillis();
replay(i);
printScore(++i, System.currentTimeMillis() - start);
Thread.sleep(100);
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:39,代码来源:ArchiveReplayLoadTest.java
示例19: prepAndSendMessages
import io.aeron.Publication; //导入依赖的package包/类
private CountDownLatch prepAndSendMessages(final Subscription recordingEvents, final Publication publication)
{
System.out.printf("Sending %,d messages%n", MESSAGE_COUNT);
final CountDownLatch recordingStopped = new CountDownLatch(1);
trackRecordingProgress(recordingEvents, recordingStopped);
publishDataToRecorded(publication, MESSAGE_COUNT);
return recordingStopped;
}
开发者ID:real-logic,项目名称:aeron,代码行数:12,代码来源:ArchiveReplayLoadTest.java
示例20: checkResult
import io.aeron.Publication; //导入依赖的package包/类
private static void checkResult(final long result)
{
if (result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED)
{
throw new IllegalStateException("Unexpected publication state: " + result);
}
}
开发者ID:real-logic,项目名称:aeron,代码行数:8,代码来源:MemberStatusPublisher.java
注:本文中的io.aeron.Publication类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论