• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Java Publication类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中io.aeron.Publication的典型用法代码示例。如果您正苦于以下问题:Java Publication类的具体用法?Java Publication怎么用?Java Publication使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Publication类属于io.aeron包,在下文中一共展示了Publication类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: awaitConnection

import io.aeron.Publication; //导入依赖的package包/类
private static void awaitConnection(final Publication publication, final long timeout, final TimeUnit unit) {
    if (publication == null) {
        throw new IllegalStateException("not started");
    }
    final long millis = unit.toMillis(timeout);
    long wait = millis;
    while (!publication.isConnected() && wait > 0) {
        try {
            Thread.sleep(Math.min(100, wait));
            wait -= 100;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    if (!publication.isConnected()) {
        throw new RuntimeException("not connected after " + timeout + " " + unit);
    }
}
 
开发者ID:terzerm,项目名称:fx-highway,代码行数:20,代码来源:AeronPublisher.java


示例2: ControlSession

import io.aeron.Publication; //导入依赖的package包/类
ControlSession(
    final long controlSessionId,
    final long correlationId,
    final ControlSessionDemuxer demuxer,
    final Publication controlPublication,
    final ArchiveConductor conductor,
    final EpochClock epochClock,
    final ControlResponseProxy controlResponseProxy)
{
    this.controlSessionId = controlSessionId;
    this.correlationId = correlationId;
    this.demuxer = demuxer;
    this.controlPublication = controlPublication;
    this.conductor = conductor;
    this.epochClock = epochClock;
    this.controlResponseProxy = controlResponseProxy;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:18,代码来源:ControlSession.java


示例3: send

import io.aeron.Publication; //导入依赖的package包/类
private void send(final int length)
{
    final int fullLength = MessageHeaderEncoder.ENCODED_LENGTH + length;
    while (true)
    {
        // TODO: Under back pressure it should drop sends and then do an update on timeout to avoid tail loss.
        final long result = recordingEventsPublication.offer(outboundBuffer, 0, fullLength);
        if (result > 0 || result == Publication.NOT_CONNECTED)
        {
            idleStrategy.reset();
            break;
        }

        if (result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED)
        {
            throw new IllegalStateException();
        }

        idleStrategy.idle();
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:22,代码来源:RecordingEventsProxy.java


示例4: onFragment

import io.aeron.Publication; //导入依赖的package包/类
public boolean onFragment(final UnsafeBuffer buffer, final int offset, final int length)
{
    if (state != State.REPLAY)
    {
        return false;
    }

    final int frameOffset = offset - DataHeaderFlyweight.HEADER_LENGTH;
    final int frameType = frameType(buffer, frameOffset);

    final long result = frameType == FrameDescriptor.PADDING_FRAME_TYPE ?
        replayPublication.appendPadding(length) :
        replayFrame(buffer, offset, length, frameOffset);

    if (result > 0)
    {
        return true;
    }
    else if (result == Publication.CLOSED || result == Publication.NOT_CONNECTED)
    {
        closeOnError(null, "replay stream has been shutdown mid-replay");
    }

    return false;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:ReplaySession.java


示例5: sendResponse

import io.aeron.Publication; //导入依赖的package包/类
boolean sendResponse(
    final long controlSessionId,
    final long correlationId,
    final long relevantId,
    final ControlResponseCode code,
    final String errorMessage,
    final Publication controlPublication)
{
    responseEncoder
        .wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
        .controlSessionId(controlSessionId)
        .correlationId(correlationId)
        .relevantId(relevantId)
        .code(code)
        .errorMessage(null == errorMessage ? "" : errorMessage);

    return send(controlPublication, buffer, HEADER_LENGTH + responseEncoder.encodedLength());
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ControlResponseProxy.java


示例6: attemptErrorResponse

import io.aeron.Publication; //导入依赖的package包/类
void attemptErrorResponse(
    final long controlSessionId,
    final long correlationId,
    final String errorMessage,
    final Publication controlPublication)
{
    responseEncoder
        .wrapAndApplyHeader(buffer, 0, messageHeaderEncoder)
        .controlSessionId(controlSessionId)
        .correlationId(correlationId)
        .relevantId(0)
        .code(ControlResponseCode.ERROR)
        .errorMessage(null == errorMessage ? "" : errorMessage);

    final int length = HEADER_LENGTH + responseEncoder.encodedLength();

    for (int i = 0; i < 3; i++)
    {
        final long result = controlPublication.offer(buffer, 0, length);
        if (result > 0)
        {
            break;
        }
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:ControlResponseProxy.java


示例7: publishDataToRecorded

import io.aeron.Publication; //导入依赖的package包/类
private void publishDataToRecorded(final Publication publication, final int messageCount)
{
    startPosition = publication.position();
    buffer.setMemory(0, 1024, (byte)'z');

    for (int i = 0; i < messageCount; i++)
    {
        final int messageLength = 64 + (rnd.nextInt((MAX_FRAGMENT_SIZE - 64) / 4) * 4);

        totalPayloadLength += messageLength;
        buffer.putInt(0, i, LITTLE_ENDIAN);
        buffer.putInt(messageLength - 4, i, LITTLE_ENDIAN);

        offer(publication, buffer, messageLength);
    }

    expectedRecordingLength = publication.position() - startPosition;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ArchiveReplayLoadTest.java


示例8: offer

import io.aeron.Publication; //导入依赖的package包/类
public static void offer(final Publication publication, final UnsafeBuffer buffer, final int length)
{
    await(
        () ->
        {
            final long result = publication.offer(buffer, 0, length);
            if (result > 0)
            {
                return true;
            }
            else if (result == Publication.ADMIN_ACTION || result == Publication.BACK_PRESSURED)
            {
                return false;
            }

            throw new IllegalStateException("Unexpected return code: " + result);
        });
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:TestUtil.java


示例9: startPong

import io.aeron.Publication; //导入依赖的package包/类
private static Thread startPong(final String embeddedDirName)
{
    return new Thread(() ->
    {
        System.out.println("Subscribing Ping at " + PING_CHANNEL + " on stream Id " + PING_STREAM_ID);
        System.out.println("Publishing Pong at " + PONG_CHANNEL + " on stream Id " + PONG_STREAM_ID);

        final Aeron.Context ctx = new Aeron.Context().aeronDirectoryName(embeddedDirName);

        try (Aeron aeron = Aeron.connect(ctx);
            Publication pongPublication = aeron.addPublication(PONG_CHANNEL, PONG_STREAM_ID);
            Subscription pingSubscription = aeron.addSubscription(PING_CHANNEL, PING_STREAM_ID))
        {
            final FragmentAssembler dataHandler = new FragmentAssembler(
                (buffer, offset, length, header) -> pingHandler(pongPublication, buffer, offset, length));

            while (RUNNING.get())
            {
                PING_HANDLER_IDLE_STRATEGY.idle(pingSubscription.poll(dataHandler, FRAME_COUNT_LIMIT));
            }

            System.out.println("Shutting down...");
        }
    });
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:26,代码来源:EmbeddedPingPong.java


示例10: main

import io.aeron.Publication; //导入依赖的package包/类
public static void main(final String... args) throws Exception {
    final String aeronDirectoryName = args[0];
    final String channel = args[1];
    final int streamId = Integer.parseInt(args[2]);
    final long messageCount = Long.parseLong(args[3]);
    final long messagesPerSecond = Long.parseLong(args[4]);
    final int marketDataDepth = Integer.parseInt(args[5]);

    System.out.println("Started " + AeronPublisher.class.getSimpleName() + ":");
    System.out.println("\tmessageCount      : " + messageCount);
    System.out.println("\tchannel           : " + channel);
    System.out.println("\tstreamId          : " + streamId);
    System.out.println("\tmessagesPerSecond : " + messagesPerSecond);
    System.out.println("\tmarketDataDepth   : " + marketDataDepth);
    System.out.println("\tmessageSize       : " + encode(new UnsafeBuffer(new byte[1024]), givenMarketDataSnapshot(new ImmutableMarketDataSnapshot.Builder(), marketDataDepth, marketDataDepth)) + " bytes");
    System.out.println();

    final Aeron aeron = aeron(aeronDirectoryName);
    final Publication publication = aeron.addPublication(channel, streamId);
    try {
        awaitConnection(publication, 5, TimeUnit.SECONDS);
        run(publication, messageCount, messagesPerSecond, marketDataDepth);
    } finally {
        publication.close();
        aeron.close();
        System.out.println("Shutdown " + AeronPublisher.class.getSimpleName() + "...");
    }
}
 
开发者ID:terzerm,项目名称:fx-highway,代码行数:29,代码来源:AeronPublisher.java


示例11: run

import io.aeron.Publication; //导入依赖的package包/类
private static void run(final Publication publication, final long messageCount, final long messagesPerSecond, final int marketDataDepth) throws InterruptedException {
    final NanoClock clock = new SystemNanoClock();
    final MutableMarketDataSnapshot snapshot = new MutableMarketDataSnapshot();
    final UnsafeBuffer unsafeBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(4096));
    final long periodNs = 1000000000/messagesPerSecond;
    Thread.sleep(2000);//make sure the subscriber is ready
    long cntAdmin = 0;
    long cntBackp = 0;
    long cnt = 0;
    final long t0 = clock.nanoTime();
    while (cnt < messageCount) {
        long tCur = clock.nanoTime();
        while (tCur - t0 < cnt * periodNs) {
            tCur = clock.nanoTime();
        }
        final MarketDataSnapshot newSnapshot = SerializerHelper.givenMarketDataSnapshot(snapshot.builder(), marketDataDepth, marketDataDepth);
        final int len = SerializerHelper.encode(unsafeBuffer, newSnapshot);
        long pubres;
        do {
            pubres = publication.offer(unsafeBuffer, 0, len);
            if (pubres < 0) {
                if (pubres == Publication.BACK_PRESSURED) {
                    cntBackp++;
                } else if (pubres == Publication.ADMIN_ACTION) {
                    cntAdmin++;
                } else {
                    throw new RuntimeException("publication failed with pubres=" + pubres);
                }
            }
        } while (pubres < 0);
        cnt++;
    }
    final long t1 = clock.nanoTime();
    System.out.println((t1 - t0) / 1000.0 + " us total publishing time (backp=" + cntBackp + ", admin=" + cntAdmin + ", cnt=" + cnt + ")");
}
 
开发者ID:terzerm,项目名称:fx-highway,代码行数:36,代码来源:AeronPublisher.java


示例12: ArchiveProxy

import io.aeron.Publication; //导入依赖的package包/类
/**
 * Create a proxy with a {@link Publication} for sending control message requests.
 * <p>
 * This provides a default {@link IdleStrategy} of a {@link YieldingIdleStrategy} when offers are back pressured
 * with a defaults of {@link AeronArchive.Configuration#MESSAGE_TIMEOUT_DEFAULT_NS} and
 * {@link #DEFAULT_RETRY_ATTEMPTS}.
 *
 * @param publication publication for sending control messages to an archive.
 */
public ArchiveProxy(final Publication publication)
{
    this(
        publication,
        new YieldingIdleStrategy(),
        new SystemNanoClock(),
        MESSAGE_TIMEOUT_DEFAULT_NS,
        DEFAULT_RETRY_ATTEMPTS);
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:19,代码来源:ArchiveProxy.java


示例13: offer

import io.aeron.Publication; //导入依赖的package包/类
private boolean offer(final int length)
{
    retryIdleStrategy.reset();

    int attempts = retryAttempts;
    while (true)
    {
        final long result;
        if ((result = publication.offer(buffer, 0, MessageHeaderEncoder.ENCODED_LENGTH + length)) > 0)
        {
            return true;
        }

        if (result == Publication.CLOSED)
        {
            throw new IllegalStateException("Connection to the archive has been closed");
        }

        if (result == Publication.NOT_CONNECTED)
        {
            throw new IllegalStateException("Connection to the archive is no longer available");
        }

        if (result == Publication.MAX_POSITION_EXCEEDED)
        {
            throw new IllegalStateException("Publication failed due to max position being reached");
        }

        if (--attempts <= 0)
        {
            return false;
        }

        retryIdleStrategy.idle();
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:37,代码来源:ArchiveProxy.java


示例14: offerWithTimeout

import io.aeron.Publication; //导入依赖的package包/类
private boolean offerWithTimeout(final int length, final AgentInvoker aeronClientInvoker)
{
    retryIdleStrategy.reset();

    final long deadlineNs = nanoClock.nanoTime() + connectTimeoutNs;
    while (true)
    {
        final long result;
        if ((result = publication.offer(buffer, 0, MessageHeaderEncoder.ENCODED_LENGTH + length)) > 0)
        {
            return true;
        }

        if (null != aeronClientInvoker)
        {
            aeronClientInvoker.invoke();
        }

        if (result == Publication.CLOSED)
        {
            throw new IllegalStateException("Connection to the archive has been closed");
        }

        if (result == Publication.MAX_POSITION_EXCEEDED)
        {
            throw new IllegalStateException("Publication failed due to max position being reached");
        }

        if (nanoClock.nanoTime() > deadlineNs)
        {
            return false;
        }

        retryIdleStrategy.idle();
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:37,代码来源:ArchiveProxy.java


示例15: sendDescriptor

import io.aeron.Publication; //导入依赖的package包/类
int sendDescriptor(
    final long controlSessionId,
    final long correlationId,
    final UnsafeBuffer descriptorBuffer,
    final Publication controlPublication)
{
    final int length = Catalog.descriptorLength(descriptorBuffer);

    for (int i = 0; i < 3; i++)
    {
        final long result = controlPublication.tryClaim(length, bufferClaim);
        if (result > 0)
        {
            final MutableDirectBuffer buffer = bufferClaim.buffer();
            final int bufferOffset = bufferClaim.offset();
            final int contentOffset = bufferOffset + HEADER_LENGTH + recordingIdEncodingOffset();
            final int contentLength = length - recordingIdEncodingOffset() - HEADER_LENGTH;

            recordingDescriptorEncoder
                .wrapAndApplyHeader(buffer, bufferOffset, messageHeaderEncoder)
                .controlSessionId(controlSessionId)
                .correlationId(correlationId);

            buffer.putBytes(contentOffset, descriptorBuffer, DESCRIPTOR_CONTENT_OFFSET, contentLength);

            bufferClaim.commit();

            return length;
        }

        checkResult(controlPublication, result);
    }

    return 0;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:36,代码来源:ControlResponseProxy.java


示例16: send

import io.aeron.Publication; //导入依赖的package包/类
private boolean send(final Publication controlPublication, final DirectBuffer buffer, final int length)
{
    for (int i = 0; i < 3; i++)
    {
        final long result = controlPublication.offer(buffer, 0, length);
        if (result > 0)
        {
            return true;
        }

        checkResult(controlPublication, result);
    }

    return false;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:16,代码来源:ControlResponseProxy.java


示例17: checkResult

import io.aeron.Publication; //导入依赖的package包/类
private static void checkResult(final Publication controlPublication, final long result)
{
    if (result == Publication.NOT_CONNECTED ||
        result == Publication.CLOSED ||
        result == Publication.MAX_POSITION_EXCEEDED)
    {
        throw new IllegalStateException("Response channel is down: " + controlPublication.channel());
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:10,代码来源:ControlResponseProxy.java


示例18: replay

import io.aeron.Publication; //导入依赖的package包/类
@Test(timeout = TEST_DURATION_SEC * 2000)
public void replay() throws InterruptedException
{
    final String channel = archive.context().recordingEventsChannel();
    final int streamId = archive.context().recordingEventsStreamId();

    try (Publication publication = aeron.addPublication(PUBLISH_URI, PUBLISH_STREAM_ID);
        Subscription recordingEvents = aeron.addSubscription(channel, streamId))
    {
        await(recordingEvents::isConnected);
        aeronArchive.startRecording(PUBLISH_URI, PUBLISH_STREAM_ID, SourceLocation.LOCAL);

        await(publication::isConnected);

        final CountDownLatch recordingStopped = prepAndSendMessages(recordingEvents, publication);

        assertNull(trackerError);

        recordingStopped.await();
        aeronArchive.stopRecording(PUBLISH_URI, PUBLISH_STREAM_ID);

        assertNull(trackerError);
        assertNotEquals(-1L, recordingId);
        assertEquals(expectedRecordingLength, recordedLength);
    }

    final long deadlineMs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(TEST_DURATION_SEC);
    int i = 0;

    while (System.currentTimeMillis() < deadlineMs)
    {
        final long start = System.currentTimeMillis();
        replay(i);

        printScore(++i, System.currentTimeMillis() - start);
        Thread.sleep(100);
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:39,代码来源:ArchiveReplayLoadTest.java


示例19: prepAndSendMessages

import io.aeron.Publication; //导入依赖的package包/类
private CountDownLatch prepAndSendMessages(final Subscription recordingEvents, final Publication publication)
{
    System.out.printf("Sending %,d messages%n", MESSAGE_COUNT);

    final CountDownLatch recordingStopped = new CountDownLatch(1);

    trackRecordingProgress(recordingEvents, recordingStopped);
    publishDataToRecorded(publication, MESSAGE_COUNT);

    return recordingStopped;
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:12,代码来源:ArchiveReplayLoadTest.java


示例20: checkResult

import io.aeron.Publication; //导入依赖的package包/类
private static void checkResult(final long result)
{
    if (result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED)
    {
        throw new IllegalStateException("Unexpected publication state: " + result);
    }
}
 
开发者ID:real-logic,项目名称:aeron,代码行数:8,代码来源:MemberStatusPublisher.java



注:本文中的io.aeron.Publication类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java MessageDecoder类代码示例发布时间:2022-05-22
下一篇:
Java TypeHierarchy类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap