本文整理汇总了Java中javax.ws.rs.Encoded类的典型用法代码示例。如果您正苦于以下问题:Java Encoded类的具体用法?Java Encoded怎么用?Java Encoded使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Encoded类属于javax.ws.rs包,在下文中一共展示了Encoded类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/stats")
@ApiOperation(value = "Get the stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public ProxyTopicStat getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination) {
destination = decode(destination);
DestinationName dn = DestinationName.get("persistent", property, cluster, namespace, destination);
validateUserAccess(dn);
ProxyTopicStat stats = getStat(dn.toString());
if (stats == null) {
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
}
return stats;
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:17,代码来源:WebSocketProxyStats.java
示例2: unloadTopic
import javax.ws.rs.Encoded; //导入依赖的package包/类
@PUT
@Path("/{property}/{cluster}/{namespace}/{destination}/unload")
@ApiOperation(value = "Unload a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public void unloadTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
log.info("[{}] Unloading topic {}/{}/{}/{}", clientAppId(), property, cluster, namespace, destination);
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
unloadTopic(dn, authoritative);
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:17,代码来源:NonPersistentTopics.java
示例3: getStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("{property}/{cluster}/{namespace}/{destination}/stats")
@ApiOperation(value = "Get the stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicStats getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminAndClientPermission(dn);
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
validateDestinationOwnership(dn, authoritative);
Topic topic = getTopicReference(dn);
return topic.getStats();
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:19,代码来源:PersistentTopics.java
示例4: getInternalStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("{property}/{cluster}/{namespace}/{destination}/internalStats")
@ApiOperation(value = "Get the internal stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminAndClientPermission(dn);
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
validateDestinationOwnership(dn, authoritative);
Topic topic = getTopicReference(dn);
return topic.getInternalStats();
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:20,代码来源:PersistentTopics.java
示例5: deleteOneSegment
import javax.ws.rs.Encoded; //导入依赖的package包/类
@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableName}/{segmentName}")
@ApiOperation(value = "Deletes a segment", notes = "Deletes a segment")
public SuccessResponse deleteOneSegment(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "realtime|offline", required = true) @QueryParam("type") String tableTypeStr) {
CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type must not be null", Response.Status.BAD_REQUEST);
}
try {
segmentName = URLDecoder.decode(segmentName, "UTF-8");
} catch (UnsupportedEncodingException e) {
String errStr = "Could not decode segment name '" + segmentName + "'";
throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST);
}
PinotSegmentRestletResource.toggleStateInternal(tableName, StateType.DROP, tableType, segmentName,
_pinotHelixResourceManager);
return new SuccessResponse("Segment deleted");
}
开发者ID:linkedin,项目名称:pinot,代码行数:24,代码来源:PinotSegmentUploadRestletResource.java
示例6: getUserByAccountName
import javax.ws.rs.Encoded; //导入依赖的package包/类
/**************************************************************************
* Gets a user by accountName.
*
* @param accountName
* The accountName of the user.
* @throws IllegalArgumentException
* Bad input.
* @throws NdexException
* Failed to change the password in the database.
* @throws SQLException
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
**************************************************************************/
@GET
@PermitAll
@Path("/account/{username}")
@Produces("application/json")
@ApiDoc("Return the user corresponding to the given user account name. Error if this account is not found.")
public User getUserByAccountName(@PathParam("username") @Encoded final String accountName)
throws IllegalArgumentException, NdexException, SQLException, JsonParseException, JsonMappingException, IOException {
logger.info("[start: Getting user by account name {}]", accountName);
try (UserDAO dao = new UserDAO()){
final User user = dao.getUserByAccountName(accountName.toLowerCase(),true,false);
logger.info("[end: User object returned for user account {}]", accountName);
return user;
}
}
开发者ID:ndexbio,项目名称:ndex-rest,代码行数:32,代码来源:UserService.java
示例7: getUserByUUID
import javax.ws.rs.Encoded; //导入依赖的package包/类
/**************************************************************************
* Gets a user by UUID
*
* @param userId
* The UUID of the user.
* @throws IllegalArgumentException
* Bad input.
* @throws NdexException
* Failed to change the password in the database.
* @throws IOException
* @throws SQLException
* @throws JsonMappingException
* @throws JsonParseException
**************************************************************************/
@SuppressWarnings("static-method")
@GET
@PermitAll
@Path("/uuid/{userid}")
@Produces("application/json")
@ApiDoc("Return the user corresponding to user's UUID. Error if no such user is found.")
public User getUserByUUID(@PathParam("userid") @Encoded final String userId)
throws IllegalArgumentException, NdexException, JsonParseException, JsonMappingException, SQLException, IOException {
logger.info("[start: Getting user from UUID {}]", userId);
try (UserDAO dao = new UserDAO() ){
final User user = dao.getUserById(UUID.fromString(userId),true,false);
logger.info("[end: User object returned for user uuid {}]", userId);
return user;
}
}
开发者ID:ndexbio,项目名称:ndex-rest,代码行数:33,代码来源:UserService.java
示例8: doGet
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("/{source}")
public Response doGet(
@Context final HttpServletRequest httpServletRequest,
@PathParam(SOURCE) final String source,
@Encoded @QueryParam(Command.DATA) final String data,
@QueryParam(Command.HELP) final String help,
@QueryParam(Command.NEW) final String nnew,
@QueryParam(Command.DIFF) final String diff,
@QueryParam(Command.REDIRECT) final String redirect,
@HeaderParam(HttpHeaders.CONTENT_TYPE) final String contentType,
@CookieParam("crowd.token_key") final String crowdTokenKey) {
final Request request = new Request.RequestBuilder()
.setData(decode(data, getCharset(contentType)))
.setNew(nnew)
.setHelp(help)
.setRedirect(redirect)
.setDiff(diff)
.setRemoteAddress(httpServletRequest.getRemoteAddr())
.setSource(source)
.setSsoToken(crowdTokenKey)
.build();
return doSyncUpdate(httpServletRequest, request, getCharset(contentType));
}
开发者ID:RIPE-NCC,项目名称:whois,代码行数:25,代码来源:SyncUpdatesService.java
示例9: getRowResource
import javax.ws.rs.Encoded; //导入依赖的package包/类
@Path("{rowspec: [^*]+}")
public RowResource getRowResource(
// We need the @Encoded decorator so Jersey won't urldecode before
// the RowSpec constructor has a chance to parse
final @PathParam("rowspec") @Encoded String rowspec,
final @QueryParam("v") String versions,
final @QueryParam("check") String check) throws IOException {
return new RowResource(this, rowspec, versions, check);
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:10,代码来源:TableResource.java
示例10: getRowResourceWithSuffixGlobbing
import javax.ws.rs.Encoded; //导入依赖的package包/类
@Path("{suffixglobbingspec: .*\\*/.+}")
public RowResource getRowResourceWithSuffixGlobbing(
// We need the @Encoded decorator so Jersey won't urldecode before
// the RowSpec constructor has a chance to parse
final @PathParam("suffixglobbingspec") @Encoded String suffixglobbingspec,
final @QueryParam("v") String versions,
final @QueryParam("check") String check) throws IOException {
return new RowResource(this, suffixglobbingspec, versions, check);
}
开发者ID:fengchen8086,项目名称:ditb,代码行数:10,代码来源:TableResource.java
示例11: getPartitionedMetadata
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
return getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:12,代码来源:NonPersistentTopics.java
示例12: getStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("{property}/{cluster}/{namespace}/{destination}/stats")
@ApiOperation(value = "Get the stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public NonPersistentTopicStats getStats(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminOperationOnDestination(dn, authoritative);
Topic topic = getTopicReference(dn);
return ((NonPersistentTopic)topic).getStats();
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:15,代码来源:NonPersistentTopics.java
示例13: getInternalStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("{property}/{cluster}/{namespace}/{destination}/internalStats")
@ApiOperation(value = "Get the internal stats for the topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminOperationOnDestination(dn, authoritative);
Topic topic = getTopicReference(dn);
return topic.getInternalStats();
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:16,代码来源:NonPersistentTopics.java
示例14: updatePartitionedTopic
import javax.ws.rs.Encoded; //导入依赖的package包/类
/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
* Already created partitioned producers and consumers can't see newly created partitions and it requires to
* recreate them at application so, newly created producers and consumers can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param property
* @param cluster
* @param namespace
* @param destination
* @param numPartitions
*/
@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
@ApiOperation(value = "Increment partitons of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist") })
public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
int numPartitions) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminAccessOnProperty(dn.getProperty());
if (dn.isGlobal()) {
log.error("[{}] Update partitioned-topic is forbidden on global namespace {}", clientAppId(), dn);
throw new RestException(Status.FORBIDDEN, "Update forbidden on global namespace");
}
if (numPartitions <= 1) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 1");
}
try {
updatePartitionedTopic(dn, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), dn, e.getCause());
throw new RestException(e.getCause());
}
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:44,代码来源:PersistentTopics.java
示例15: getPartitionedMetadata
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("/{property}/{cluster}/{namespace}/{destination}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(property, cluster, namespace, destination, authoritative);
if (metadata.partitions > 1) {
validateClientVersion();
}
return metadata;
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:16,代码来源:PersistentTopics.java
示例16: deleteTopic
import javax.ws.rs.Encoded; //导入依赖的package包/类
@DELETE
@Path("/{property}/{cluster}/{namespace}/{destination}")
@ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if there's any active subscription or producer connected to the it.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic has active producers/subscriptions") })
public void deleteTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
validateAdminOperationOnDestination(dn, authoritative);
Topic topic = getTopicReference(dn);
if (dn.isGlobal()) {
// Delete is disallowed on global topic
log.error("[{}] Delete topic is forbidden on global namespace {}", clientAppId(), dn);
throw new RestException(Status.FORBIDDEN, "Delete forbidden on global namespace");
}
try {
topic.delete().get();
log.info("[{}] Successfully removed topic {}", clientAppId(), dn);
} catch (Exception e) {
Throwable t = e.getCause();
log.error("[{}] Failed to get delete topic {}", clientAppId(), dn, t);
if (t instanceof TopicBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
} else {
throw new RestException(t);
}
}
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:32,代码来源:PersistentTopics.java
示例17: getPartitionedStats
import javax.ws.rs.Encoded; //导入依赖的package包/类
@GET
@Path("{property}/{cluster}/{namespace}/{destination}/partitioned-stats")
@ApiOperation(value = "Get the stats for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
public PartitionedTopicStats getPartitionedStats(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destination,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
destination, authoritative);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
try {
for (int i = 0; i < partitionMetadata.partitions; i++) {
PersistentTopicStats partitionStats = pulsar().getAdminClient().persistentTopics()
.getStats(dn.getPartition(i).toString());
stats.add(partitionStats);
stats.partitions.put(dn.getPartition(i).toString(), partitionStats);
}
} catch (Exception e) {
throw new RestException(e);
}
return stats;
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:33,代码来源:PersistentTopics.java
示例18: skipMessages
import javax.ws.rs.Encoded; //导入依赖的package包/类
@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/skip/{numMessages}")
@ApiOperation(value = "Skip messages on a topic subscription.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void skipMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@PathParam("subName") String subName, @PathParam("numMessages") int numMessages,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
destination, authoritative);
if (partitionMetadata.partitions > 0) {
throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
}
validateAdminOperationOnDestination(dn, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
try {
if (subName.startsWith(topic.replicatorPrefix)) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
checkNotNull(repl);
repl.skipMessages(numMessages).get();
} else {
PersistentSubscription sub = topic.getSubscription(subName);
checkNotNull(sub);
sub.skipMessages(numMessages).get();
}
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, dn, subName);
} catch (NullPointerException npe) {
throw new RestException(Status.NOT_FOUND, "Subscription not found");
} catch (Exception exception) {
log.error("[{}] Failed to skip {} messages {} {}", clientAppId(), numMessages, dn, subName, exception);
throw new RestException(exception);
}
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:41,代码来源:PersistentTopics.java
示例19: expireTopicMessages
import javax.ws.rs.Encoded; //导入依赖的package包/类
@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
@ApiOperation(value = "Expire messages on a topic subscription.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void expireTopicMessages(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("destination") @Encoded String destination,
@PathParam("subName") String subName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
destination = decode(destination);
expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative);
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:13,代码来源:PersistentTopics.java
示例20: expireMessagesForAllSubscriptions
import javax.ws.rs.Encoded; //导入依赖的package包/类
@POST
@Path("/{property}/{cluster}/{namespace}/{destination}/all_subscription/expireMessages/{expireTimeInSeconds}")
@ApiOperation(value = "Expire messages on all subscriptions of topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist") })
public void expireMessagesForAllSubscriptions(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("destination") @Encoded String destinationName, @PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
final String destination = decode(destinationName);
DestinationName dn = DestinationName.get(domain(), property, cluster, namespace, destination);
if (cluster.equals(Namespaces.GLOBAL_CLUSTER)) {
validateGlobalNamespaceOwnership(NamespaceName.get(property, cluster, namespace));
}
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(property, cluster, namespace,
destination, authoritative);
if (partitionMetadata.partitions > 0) {
try {
// expire messages for each partition destination
for (int i = 0; i < partitionMetadata.partitions; i++) {
pulsar().getAdminClient().persistentTopics()
.expireMessagesForAllSubscriptions(dn.getPartition(i).toString(), expireTimeInSeconds);
}
} catch (Exception e) {
log.error("[{}] Failed to expire messages up to {} on {} {}", clientAppId(), expireTimeInSeconds, dn,
e);
throw new RestException(e);
}
} else {
// validate ownership and redirect if current broker is not owner
validateAdminOperationOnDestination(dn, authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(dn);
topic.getReplicators().forEach((subName, replicator) -> {
expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative);
});
topic.getSubscriptions().forEach((subName, subscriber) -> {
expireMessages(property, cluster, namespace, destination, subName, expireTimeInSeconds, authoritative);
});
}
}
开发者ID:apache,项目名称:incubator-pulsar,代码行数:41,代码来源:PersistentTopics.java
注:本文中的javax.ws.rs.Encoded类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论