本文整理汇总了Scala中org.apache.curator.framework.CuratorFramework类的典型用法代码示例。如果您正苦于以下问题:Scala CuratorFramework类的具体用法?Scala CuratorFramework怎么用?Scala CuratorFramework使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了CuratorFramework类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: CuratorFactory
//设置package包名称以及导入依赖的类
package com.box.castle.core
import com.box.castle.core.common.{CastleFatalException}
import com.box.castle.core.config.CastleConfig
import org.slf4s.Logging
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.{CuratorFrameworkFactory, CuratorFramework}
import scala.concurrent.duration._
// $COVERAGE-OFF$
class CuratorFactory(castleConfig: CastleConfig) extends Logging {
def create(): CuratorFramework = {
log.info("Begin creating Curator...")
val curator: CuratorFramework = CuratorFrameworkFactory.builder
.namespace(util.join(castleConfig.castleZookeeperConfig.root, castleConfig.namespace))
.connectString(castleConfig.castleZookeeperConfig.connect)
.sessionTimeoutMs(castleConfig.castleZookeeperConfig.sessionTimeout.toMillis.toInt)
.connectionTimeoutMs(castleConfig.castleZookeeperConfig.connectionTimeout.toMillis.toInt)
.retryPolicy(castleConfig.castleZookeeperConfig.retryPolicy)
.build
curator.start()
curator.blockUntilConnected(castleConfig.castleZookeeperConfig.initialConnectTimeout.toMillis.toInt, MILLISECONDS)
if (curator.getState != CuratorFrameworkState.STARTED) {
throw new CastleFatalException(s"Could not connect to ZK within the " +
s"configured timeout of ${castleConfig.castleZookeeperConfig.initialConnectTimeout}")
}
log.info("Successfully created a CuratorFramework instance")
curator
}
}
// $COVERAGE-ON$
开发者ID:Box-Castle,项目名称:core,代码行数:38,代码来源:CuratorFactory.scala
示例2: CuratorTests
//设置package包名称以及导入依赖的类
package com.bwsw.cloudstack.common.curator
import com.bwsw.cloudstack.imp.dao.zookeeper.ZookeeperTestServer
import com.google.common.io.Files
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
class CuratorTests extends FlatSpec with Matchers with BeforeAndAfterAll {
System.getProperty("java.io.tmpdir", "./target/")
val ZOOKEEPER_PORT = 21810
var zk: ZookeeperTestServer = null
implicit var curator: CuratorFramework = null
override def beforeAll() = {
zk = new ZookeeperTestServer(ZOOKEEPER_PORT, Files.createTempDir().toString)
curator = CuratorFrameworkFactory.builder()
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("tests")
.connectString(s"127.0.0.1:$ZOOKEEPER_PORT").build()
curator.start()
}
override def afterAll() = {
curator.close()
zk.stop
}
}
开发者ID:bwsw,项目名称:cs-imp,代码行数:31,代码来源:CuratorTests.scala
示例3: ZKDrlManager
//设置package包名称以及导入依赖的类
package com.muziyuchen.drule.zk
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
import org.apache.curator.retry.ExponentialBackoffRetry
import scala.collection.mutable
import scala.collection.JavaConversions._
private[zk] class ZKDrlManager(connectString: String, path: String) {
private var client: CuratorFramework = _
private var cache: PathChildrenCache = _
def start: Unit = {
client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3))
client.start
client.blockUntilConnected
cache = new PathChildrenCache(client, path, true)
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE)
}
def getDrls: mutable.MutableList[String] = {
var drls = new mutable.MutableList[String]()
for (childData <- cache.getCurrentData) {
val drl = new String(childData.getData(), "UTF-8")
drls += drl
}
drls
}
def drlChanged(callback: () => Unit): Unit = {
cache.getListenable.addListener(new PathChildrenCacheListener {
override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
callback
}
})
}
def close:Unit = {
cache.close
client.close
}
}
开发者ID:gspandy,项目名称:d-rule,代码行数:51,代码来源:ZKDrlManager.scala
示例4: ZookeeperServerStartup
//设置package包名称以及导入依赖的类
import java.util
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.ZooDefs.Ids
import org.apache.zookeeper.data.ACL
object ZookeeperServerStartup {
val port = 2181
val endpoints = s"127.0.0.1:$port"
def main(args: Array[String]): Unit = {
val zkServer = new TestingServer(port, true)
createNodes()
addShutdownHook(zkServer)
}
private def createNodes() = {
val zkClient: CuratorFramework = {
val connection = CuratorFrameworkFactory.builder()
.connectString(endpoints)
.sessionTimeoutMs(2000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()
connection.start()
connection.blockUntilConnected(5000, TimeUnit.MILLISECONDS)
connection
}
zkClient.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(new util.ArrayList[ACL](Ids.OPEN_ACL_UNSAFE))
.forPath(BookieServerStartup.zkLedgersRootPath)
zkClient.create()
.withMode(CreateMode.PERSISTENT)
.withACL(new util.ArrayList[ACL](Ids.OPEN_ACL_UNSAFE))
.forPath(BookieServerStartup.zkBookiesAvailablePath)
zkClient.close()
}
private def addShutdownHook(zkServer: TestingServer) = {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
zkServer.close()
}
})
}
}
开发者ID:bwsw,项目名称:bookkeeper-tutorial-scala,代码行数:59,代码来源:ZookeeperServerStartup.scala
示例5: LeaderSelector
//设置package包名称以及导入依赖的类
package client
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
class LeaderSelector(client: CuratorFramework,
electionPath: String)
extends LeaderSelectorListenerAdapter
with LeaderElectionMember {
private val leaderSelector = {
val leader =
new org.apache.curator.framework.recipes.leader.LeaderSelector(
client,
electionPath,
this
)
leader.autoRequeue()
leader.start()
leader
}
override def hasLeadership: Boolean = leaderSelector.hasLeadership
@throws[Exception]
override def takeLeadership(client: CuratorFramework): Unit = {
this.synchronized {
println("Becoming leader")
try {
while (true) this.wait()
}
catch {
case _: InterruptedException =>
Thread.currentThread.interrupt()
}
}
}
final def close(): Unit = leaderSelector.close()
}
开发者ID:bwsw,项目名称:bookkeeper-tutorial-scala,代码行数:42,代码来源:LeaderSelector.scala
示例6: ZookeeperOffsetKeeper
//设置package包名称以及导入依赖的类
package com.bwsw.imp.message.kafka.offsets
import java.nio.ByteBuffer
import org.apache.curator.framework.CuratorFramework
class ZookeeperOffsetKeeper(implicit curatorFramework: CuratorFramework) extends OffsetKeeper {
private def createContainers(topic: String) = curatorFramework.createContainers(s"/offsets/$topic")
private def path(topic: String, partition: Int) = s"/offsets/$topic/$partition"
private def allocateBuffer() = ByteBuffer.allocate(java.lang.Long.SIZE / java.lang.Byte.SIZE)
def store(topic: String, offsets: Map[Int, Long]): Unit = {
createContainers(topic)
offsets foreach {
case (partition, offset) => curatorFramework
.create()
.orSetData()
.forPath(path(topic, partition), allocateBuffer().putLong(offset).array())
}
}
def load(topic: String, partitions: Set[Int]): Map[Int, Long] = {
createContainers(topic)
partitions.map {
partition => Option(curatorFramework.checkExists().forPath(path(topic, partition)))
.fold (partition -> 0L) {
_ => partition -> {
val buffer = allocateBuffer()
buffer.put(curatorFramework.getData.forPath(path(topic, partition)))
buffer.rewind()
buffer.getLong()
}
}
}.filter {
case (k, v) => k != -1
}.toMap
}
}
开发者ID:bwsw,项目名称:imp,代码行数:42,代码来源:ZookeeperOffsetKeeper.scala
示例7: CuratorTests
//设置package包名称以及导入依赖的类
package com.bwsw.imp.curator
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
class CuratorTests extends FlatSpec with Matchers with BeforeAndAfterAll {
val ZOOKEEPER_PORT = 21810
var testingServer = new TestingServer(ZOOKEEPER_PORT)
implicit var curator: CuratorFramework = _
override def beforeAll() = {
testingServer.start()
curator = CuratorFrameworkFactory.builder()
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("tests")
.connectString(s"127.0.0.1:$ZOOKEEPER_PORT").build()
curator.start()
}
it must "do nothing" in {}
override def afterAll() = {
curator.close()
testingServer.stop()
}
}
开发者ID:bwsw,项目名称:imp,代码行数:32,代码来源:CuratorTests.scala
示例8: StreamDatabaseZK
//设置package包名称以及导入依赖的类
package com.bwsw.tstreamstransactionserver.netty.server.db.zk
import java.util.concurrent.ConcurrentHashMap
import com.bwsw.tstreamstransactionserver.netty.server.streamService.{StreamCRUD}
import com.bwsw.tstreamstransactionserver.netty.server.streamService
import org.apache.curator.framework.CuratorFramework
final class StreamDatabaseZK(client: CuratorFramework, path: String)
extends StreamCRUD
{
private val streamCache =
new ConcurrentHashMap[streamService.StreamKey, streamService.StreamValue]()
private val streamNamePath = new StreamNamePath(client, s"$path/names")
private val streamIDPath = new StreamIDPath(client, s"$path/ids")
override def putStream(streamValue: streamService.StreamValue): streamService.StreamKey = {
if (!streamNamePath.checkExists(streamValue.name)) {
val streamRecord = streamIDPath.put(streamValue)
streamNamePath.put(streamRecord)
streamCache.put(streamRecord.key, streamRecord.stream)
streamRecord.key
} else streamService.StreamKey(-1)
}
override def checkStreamExists(name: String): Boolean =
streamNamePath.checkExists(name)
override def delStream(name: String): Boolean =
streamNamePath.delete(name)
override def getStream(name: String): Option[streamService.StreamRecord] =
streamNamePath.get(name)
override def getStream(streamKey: streamService.StreamKey): Option[streamService.StreamRecord] = {
Option(streamCache.get(streamKey))
.map(steamValue => streamService.StreamRecord(streamKey, steamValue))
.orElse{
val streamRecordOpt = streamIDPath.get(streamKey)
streamRecordOpt.foreach(streamRecord =>
streamCache.put(streamRecord.key, streamRecord.stream)
)
streamRecordOpt
}
}
}
开发者ID:bwsw,项目名称:tstreams-transaction-server,代码行数:52,代码来源:StreamDatabaseZK.scala
示例9: SubscriberUtils
//设置package包名称以及导入依赖的类
package util
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
object SubscriberUtils {
def putSubscriberInStream(client: CuratorFramework,
path: String,
partition: Int,
subscriber: String
): Unit = {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(
s"$path/subscribers/$partition/$subscriber",
Array.emptyByteArray
)
}
def deleteSubscriberInStream(client: CuratorFramework,
path: String,
partition: Int,
subscriber: String
): Unit = {
client.delete()
.forPath(
s"$path/subscribers/$partition/$subscriber"
)
}
}
开发者ID:bwsw,项目名称:tstreams-transaction-server,代码行数:33,代码来源:SubscriberUtils.scala
示例10: DLock
//设置package包名称以及导入依赖的类
package yamrcraft.etlite.utils
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
class DLock(zkConnect: String, lockFile: String, waitForLockSeconds: Int) {
val logger = LoggerFactory.getLogger(this.getClass)
private var zkClient: Option[CuratorFramework] = None
private var lock: Option[InterProcessSemaphoreMutex] = None
def tryLock(): Boolean = {
require(lock.isEmpty, "lock can't be reused")
logger.info("acquiring lock...")
zkClient = Some(CuratorFrameworkFactory.newClient(zkConnect, new ExponentialBackoffRetry(1000, 3)))
zkClient.get.start()
lock = Some(new InterProcessSemaphoreMutex(zkClient.get, lockFile))
lock.get.acquire(waitForLockSeconds, TimeUnit.SECONDS)
}
def release() = {
require(lock.nonEmpty, "lock wasn't acquired")
logger.info("releasing lock")
lock.foreach(_.release())
zkClient.foreach(_.close())
}
}
class FakeLock extends DLock("", "", 0) {
override def tryLock() = true
override def release() = {}
}
开发者ID:yamrcraft,项目名称:etl-light,代码行数:41,代码来源:DLock.scala
注:本文中的org.apache.curator.framework.CuratorFramework类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论