本文整理汇总了Java中org.apache.qpid.proton.amqp.messaging.Source类的典型用法代码示例。如果您正苦于以下问题:Java Source类的具体用法?Java Source怎么用?Java Source使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Source类属于org.apache.qpid.proton.amqp.messaging包,在下文中一共展示了Source类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: recvMessages
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
public CompletableFuture<List<String>> recvMessages(long numMessages, long attachTimeout, TimeUnit timeUnit) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<List<String>> future = new CompletableFuture<>();
List<String> messages = new ArrayList<>();
protonClient.connect("localhost", protonServer.actualPort(), event -> {
ProtonConnection connection = event.result().open();
Source source = new Source();
source.setAddress(address);
source.setCapabilities(Symbol.getSymbol("topic"));
connection.createReceiver(address)
.openHandler(opened -> latch.countDown())
.setSource(source)
.handler((delivery, message) -> {
messages.add((String) ((AmqpValue) message.getBody()).getValue());
if (messages.size() == numMessages) {
future.complete(new ArrayList<>(messages));
}
})
.open();
});
latch.await(attachTimeout, timeUnit);
return future;
}
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:24,代码来源:TestBroker.java
示例2: subscribe
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
public void subscribe(String address, Async subClosed, ProtonMessageHandler handler) {
client.connect(new ProtonClientOptions().setConnectTimeout(10000), serverHost, serverPort, connectResult -> {
if (connectResult.succeeded()) {
System.out.println("Connected'");
connection = connectResult.result();
connection.closeHandler(c -> closeLatch.countDown());
connection.open();
System.out.println("Creating receiver");
Source source = new Source();
source.setAddress(address);
Map<Symbol, Map<String, String>> filter = new LinkedHashMap<>();
filter.put(Symbol.getSymbol("labels"), Collections.singletonMap("my", "label"));
filter.put(Symbol.getSymbol("annotations"), Collections.singletonMap("my", "annotation"));
source.setFilter(filter);
connection.createReceiver(address).setSource(source).closeHandler(c -> {if (subClosed != null) { subClosed.complete(); }}).handler(handler).open();
} else {
System.out.println("Connection failed: " + connectResult.cause().getMessage());
}
});
}
开发者ID:EnMasseProject,项目名称:enmasse,代码行数:21,代码来源:TestClient.java
示例3: filters_nonIntegerPartitionFilter
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/** When happens when partition filter is not an Integer? */
@Test
public <K, V> void filters_nonIntegerPartitionFilter() throws Exception {
String topic = "my_topic";
Vertx vertx = Vertx.vertx();
AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
endpoint.open();
ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
// Call handle()
Map<Symbol, Object> filter = new HashMap<>();
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), "not an integer");
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
((Source)mockSender.getRemoteSource()).setFilter(filter);
endpoint.handle(new AmqpEndpoint(mockSender));
assertDetach(mockSender,
AmqpBridge.AMQP_ERROR_WRONG_PARTITION_FILTER,
"Wrong partition filter");
}
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java
示例4: filters_nonLongOffsetFilter
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/** When happens when offset filter is not a Long? */
@Test
public <K, V> void filters_nonLongOffsetFilter() throws Exception {
String topic = "my_topic";
Vertx vertx = Vertx.vertx();
AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
endpoint.open();
ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
// Call handle()
Map<Symbol, Object> filter = new HashMap<>();
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), 0);
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), "not a long");
((Source)mockSender.getRemoteSource()).setFilter(filter);
endpoint.handle(new AmqpEndpoint(mockSender));
assertDetach(mockSender,
// TODO really?
AmqpBridge.AMQP_ERROR_WRONG_OFFSET_FILTER,
"Wrong offset filter");
}
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:21,代码来源:AmqpSinkBridgeEndpointMockTest.java
示例5: filters_negativeIntegerPartitionFilter
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/** When happens when the partition filter < 0? */
@Test
public <K, V> void filters_negativeIntegerPartitionFilter() throws Exception {
String topic = "my_topic";
Vertx vertx = Vertx.vertx();
AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
endpoint.open();
ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
// Call handle()
Map<Symbol, Object> filter = new HashMap<>();
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), -1);
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
((Source)mockSender.getRemoteSource()).setFilter(filter);
endpoint.handle(new AmqpEndpoint(mockSender));
ArgumentCaptor<ErrorCondition> errorCap = ArgumentCaptor.forClass(ErrorCondition.class);
verify(mockSender).setCondition(errorCap.capture());
verify(mockSender).close();
assertDetach(mockSender,
AmqpBridge.AMQP_ERROR_WRONG_FILTER,
"Wrong filter");
}
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:24,代码来源:AmqpSinkBridgeEndpointMockTest.java
示例6: filters_negativeLongOffsetFilter
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/** When happens when the offset filter is < 0? */
@Test
public <K, V> void filters_negativeLongOffsetFilter() throws Exception {
String topic = "my_topic";
Vertx vertx = Vertx.vertx();
AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
endpoint.open();
ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
// Call handle()
Map<Symbol, Object> filter = new HashMap<>();
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_PARTITION_FILTER), 0);
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), -10L);
((Source)mockSender.getRemoteSource()).setFilter(filter);
endpoint.handle(new AmqpEndpoint(mockSender));
assertDetach(mockSender,
AmqpBridge.AMQP_ERROR_WRONG_FILTER,
"Wrong filter");
}
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java
示例7: filters_offsetFilterButNoPartitionFilter
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/** When happens when there's a filter for offset, but not for partition? */
@Test
public <K, V> void filters_offsetFilterButNoPartitionFilter() throws Exception {
String topic = "my_topic";
Vertx vertx = Vertx.vertx();
AmqpSinkBridgeEndpoint<K,V> endpoint = new AmqpSinkBridgeEndpoint<K,V>(vertx, new AmqpBridgeConfigProperties());
endpoint.open();
ProtonSender mockSender = mockSender(ProtonQoS.AT_MOST_ONCE, topic+"/group.id/blah");
// Call handle()
Map<Symbol, Object> filter = new HashMap<>();
//filter.put(Symbol.getSymbol(Bridge.AMQP_PARTITION_FILTER), 0);
filter.put(Symbol.getSymbol(AmqpBridge.AMQP_OFFSET_FILTER), 10L);
((Source)mockSender.getRemoteSource()).setFilter(filter);
endpoint.handle(new AmqpEndpoint(mockSender));
assertDetach(mockSender,
AmqpBridge.AMQP_ERROR_NO_PARTITION_FILTER,
"No partition filter specified");
}
开发者ID:strimzi,项目名称:amqp-kafka-bridge,代码行数:20,代码来源:AmqpSinkBridgeEndpointMockTest.java
示例8: testConsumeWhenOnlyAnycast
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Test(timeout = 60000)
public void testConsumeWhenOnlyAnycast() throws Exception {
server.addAddressInfo(new AddressInfo(address, RoutingType.ANYCAST));
sendMessages(address.toString(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source jmsSource = createJmsSource(true);
jmsSource.setAddress(address.toString());
try {
session.createReceiver(jmsSource);
fail("should throw exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:21,代码来源:BrokerDefinedMulticastConsumerTest.java
示例9: doTestCreateDynamicReceiver
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@SuppressWarnings("unchecked")
protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
Source source = createDynamicSource(topic);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);
Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
assertTrue(remoteSource.getDynamic());
assertTrue(remoteSource.getDurable().equals(TerminusDurability.NONE));
assertTrue(remoteSource.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
// Check the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
Queue queueView = getProxyToQueue(remoteSource.getAddress());
assertNotNull(queueView);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:27,代码来源:AmqpTempDestinationTest.java
示例10: doTestDynamicReceiverLifetimeBoundToLinkQueue
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
protected void doTestDynamicReceiverLifetimeBoundToLinkQueue(boolean topic) throws Exception {
Source source = createDynamicSource(topic);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(source);
assertNotNull(receiver);
Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
Queue queueView = getProxyToQueue(remoteSource.getAddress());
assertNotNull(queueView);
receiver.close();
queueView = getProxyToQueue(remoteSource.getAddress());
assertNull(queueView);
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:22,代码来源:AmqpTempDestinationTest.java
示例11: createDynamicSource
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
protected Source createDynamicSource(boolean topic) {
Source source = new Source();
source.setDynamic(true);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
// Set the dynamic node lifetime-policy
Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
source.setDynamicNodeProperties(dynamicNodeProperties);
// Set the capability to indicate the node type being created
if (!topic) {
source.setCapabilities(TEMP_QUEUE_CAPABILITY);
} else {
source.setCapabilities(TEMP_TOPIC_CAPABILITY);
}
return source;
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:22,代码来源:AmqpTempDestinationTest.java
示例12: test2ConsumersOnNonSharedDurableAddress
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Test(timeout = 60000)
public void test2ConsumersOnNonSharedDurableAddress() throws Exception {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
} finally {
receiver.close();
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:24,代码来源:ClientDefinedMultiConsumerTest.java
示例13: testAddressDoesntExist
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Test(timeout = 60000)
public void testAddressDoesntExist() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
AmqpSession session = connection.createSession();
Source source = createNonSharedSource(TerminusDurability.CONFIGURATION);
Source source1 = createSharedSource(TerminusDurability.CONFIGURATION);
AmqpReceiver receiver = session.createMulticastReceiver(source, "myReceiverID", "mySub");
try {
session.createMulticastReceiver(source1, "myReceiverID", "mySub|2");
fail("Exception expected");
} catch (Exception e) {
//expected
} finally {
receiver.close();
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:21,代码来源:ClientDefinedMultiConsumerTest.java
示例14: testConsumeWhenOnlyMulticast
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Test(timeout = 60000)
public void testConsumeWhenOnlyMulticast() throws Exception {
server.addAddressInfo(new AddressInfo(address, RoutingType.MULTICAST));
sendMessages(address.toString(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
Source jmsSource = createJmsSource(false);
jmsSource.setAddress(address.toString());
try {
session.createReceiver(jmsSource);
fail("should throw exception");
} catch (Exception e) {
//ignore
}
connection.close();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:21,代码来源:BrokerDefinedAnycastConsumerTest.java
示例15: doOpen
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Override
protected void doOpen() {
Coordinator coordinator = new Coordinator();
coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
Source source = new Source();
String coordinatorName = "qpid-jms:coordinator:" + session.getConnection().getConnectionId();
Sender sender = session.getEndpoint().sender(coordinatorName);
sender.setSource(source);
sender.setTarget(coordinator);
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
setEndpoint(sender);
super.doOpen();
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:19,代码来源:AmqpTransactionCoordinator.java
示例16: createReceiver
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/**
* Create a receiver instance using the given Source
*
* @param source the caller created and configured Source used to create the receiver link.
* @param receiverId the receiver id to use.
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createReceiver(Source source, String receiverId) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:30,代码来源:AmqpSession.java
示例17: createMulticastReceiver
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
/**
* Create a receiver instance using the given Source
*
* @param source the caller created and configured Source used to create the receiver link.
* @return a newly created receiver that is ready for use.
* @throws Exception if an error occurs while creating the receiver.
*/
public AmqpReceiver createMulticastReceiver(Source source, String receiverId, String receiveName) throws Exception {
checkClosed();
final ClientFuture request = new ClientFuture();
final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId);
receiver.setSubscriptionName(receiveName);
connection.getScheduler().execute(new Runnable() {
@Override
public void run() {
checkClosed();
receiver.setStateInspector(getStateInspector());
receiver.open(request);
pumpToProtonTransport(request);
}
});
request.sync();
return receiver;
}
开发者ID:apache,项目名称:activemq-artemis,代码行数:30,代码来源:AmqpSession.java
示例18: createEndpoint
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Override
protected Sender createEndpoint(JmsSessionInfo resourceInfo) {
Coordinator coordinator = new Coordinator();
coordinator.setCapabilities(TxnCapability.LOCAL_TXN);
Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL, Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
Source source = new Source();
source.setOutcomes(outcomes);
String coordinatorName = "qpid-jms:coordinator:" + resourceInfo.getId().toString();
Sender sender = getParent().getSession().getEndpoint().sender(coordinatorName);
sender.setSource(source);
sender.setTarget(coordinator);
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
return sender;
}
开发者ID:apache,项目名称:qpid-jms,代码行数:21,代码来源:AmqpTransactionCoordinatorBuilder.java
示例19: doOpen
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Override
protected void doOpen() {
String sourceAddress = info.getName();
if (info.isQueue()) {
sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
} else {
sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
}
Source source = new Source();
source.setAddress(sourceAddress);
Target target = new Target();
target.setDynamic(true);
String senderName = sourceAddress;
endpoint = session.getProtonSession().sender(senderName);
endpoint.setSource(source);
endpoint.setTarget(target);
endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
this.connection.addTemporaryDestination(this);
}
开发者ID:fusesource,项目名称:hawtjms,代码行数:24,代码来源:AmqpTemporaryDestination.java
示例20: doOpen
import org.apache.qpid.proton.amqp.messaging.Source; //导入依赖的package包/类
@Override
protected void doOpen() {
JmsDestination destination = info.getDestination();
String destnationName = session.getQualifiedName(destination);
String sourceAddress = getProducerId().toString();
Source source = new Source();
source.setAddress(sourceAddress);
Target target = new Target();
target.setAddress(destnationName);
String senderName = sourceAddress + ":" + destnationName != null ? destnationName : "Anonymous";
endpoint = session.getProtonSession().sender(senderName);
endpoint.setSource(source);
endpoint.setTarget(target);
if (presettle) {
endpoint.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
endpoint.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
endpoint.setReceiverSettleMode(ReceiverSettleMode.FIRST);
}
开发者ID:fusesource,项目名称:hawtjms,代码行数:23,代码来源:AmqpFixedProducer.java
注:本文中的org.apache.qpid.proton.amqp.messaging.Source类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论