• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala CuratorFramework类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中org.apache.curator.framework.CuratorFramework的典型用法代码示例。如果您正苦于以下问题:Scala CuratorFramework类的具体用法?Scala CuratorFramework怎么用?Scala CuratorFramework使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了CuratorFramework类的10个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: CuratorFactory

//设置package包名称以及导入依赖的类
package com.box.castle.core

import com.box.castle.core.common.{CastleFatalException}
import com.box.castle.core.config.CastleConfig
import org.slf4s.Logging
import org.apache.curator.framework.imps.CuratorFrameworkState
import org.apache.curator.framework.{CuratorFrameworkFactory, CuratorFramework}

import scala.concurrent.duration._

// $COVERAGE-OFF$

class CuratorFactory(castleConfig: CastleConfig) extends Logging {

  def create(): CuratorFramework = {
    log.info("Begin creating Curator...")
    val curator: CuratorFramework = CuratorFrameworkFactory.builder
      .namespace(util.join(castleConfig.castleZookeeperConfig.root, castleConfig.namespace))
      .connectString(castleConfig.castleZookeeperConfig.connect)
      .sessionTimeoutMs(castleConfig.castleZookeeperConfig.sessionTimeout.toMillis.toInt)
      .connectionTimeoutMs(castleConfig.castleZookeeperConfig.connectionTimeout.toMillis.toInt)
      .retryPolicy(castleConfig.castleZookeeperConfig.retryPolicy)
      .build

    curator.start()
    curator.blockUntilConnected(castleConfig.castleZookeeperConfig.initialConnectTimeout.toMillis.toInt, MILLISECONDS)
    if (curator.getState != CuratorFrameworkState.STARTED) {
      throw new CastleFatalException(s"Could not connect to ZK within the " +
        s"configured timeout of ${castleConfig.castleZookeeperConfig.initialConnectTimeout}")
    }

    log.info("Successfully created a CuratorFramework instance")
    curator
  }
}

// $COVERAGE-ON$ 
开发者ID:Box-Castle,项目名称:core,代码行数:38,代码来源:CuratorFactory.scala


示例2: CuratorTests

//设置package包名称以及导入依赖的类
package com.bwsw.cloudstack.common.curator

import com.bwsw.cloudstack.imp.dao.zookeeper.ZookeeperTestServer
import com.google.common.io.Files
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}


class CuratorTests extends FlatSpec with Matchers with BeforeAndAfterAll {
  System.getProperty("java.io.tmpdir", "./target/")
  val ZOOKEEPER_PORT = 21810
  var zk: ZookeeperTestServer = null
  implicit var curator: CuratorFramework = null

  override def beforeAll() = {
    zk = new ZookeeperTestServer(ZOOKEEPER_PORT, Files.createTempDir().toString)
    curator = CuratorFrameworkFactory.builder()
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("tests")
      .connectString(s"127.0.0.1:$ZOOKEEPER_PORT").build()
    curator.start()
  }

  override def afterAll() = {
    curator.close()
    zk.stop
  }

} 
开发者ID:bwsw,项目名称:cs-imp,代码行数:31,代码来源:CuratorTests.scala


示例3: ZKDrlManager

//设置package包名称以及导入依赖的类
package com.muziyuchen.drule.zk

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
import org.apache.curator.retry.ExponentialBackoffRetry

import scala.collection.mutable
import scala.collection.JavaConversions._


private[zk] class ZKDrlManager(connectString: String, path: String) {

  private var client: CuratorFramework = _

  private var cache: PathChildrenCache = _

  def start: Unit = {
    client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3))
    client.start
    client.blockUntilConnected

    cache = new PathChildrenCache(client, path, true)
    cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE)
  }

  def getDrls: mutable.MutableList[String] = {
    var drls = new mutable.MutableList[String]()

    for (childData <- cache.getCurrentData) {
      val drl = new String(childData.getData(), "UTF-8")
      drls += drl
    }

    drls
  }

  def drlChanged(callback: () => Unit): Unit = {
    cache.getListenable.addListener(new PathChildrenCacheListener {
      override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
        callback
      }
    })
  }

  def close:Unit = {
    cache.close
    client.close
  }

} 
开发者ID:gspandy,项目名称:d-rule,代码行数:51,代码来源:ZKDrlManager.scala


示例4: ZookeeperServerStartup

//设置package包名称以及导入依赖的类
import java.util
import java.util.concurrent.TimeUnit

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.ZooDefs.Ids
import org.apache.zookeeper.data.ACL

object ZookeeperServerStartup {
  val port = 2181
  val endpoints = s"127.0.0.1:$port"

  def main(args: Array[String]): Unit = {
    val zkServer = new TestingServer(port, true)

    createNodes()

    addShutdownHook(zkServer)
  }

  private def createNodes() = {
    val zkClient: CuratorFramework = {
      val connection = CuratorFrameworkFactory.builder()
        .connectString(endpoints)
        .sessionTimeoutMs(2000)
        .connectionTimeoutMs(5000)
        .retryPolicy(new ExponentialBackoffRetry(1000, 3))
        .build()

      connection.start()
      connection.blockUntilConnected(5000, TimeUnit.MILLISECONDS)
      connection
    }

    zkClient.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .withACL(new util.ArrayList[ACL](Ids.OPEN_ACL_UNSAFE))
      .forPath(BookieServerStartup.zkLedgersRootPath)

    zkClient.create()
      .withMode(CreateMode.PERSISTENT)
      .withACL(new util.ArrayList[ACL](Ids.OPEN_ACL_UNSAFE))
      .forPath(BookieServerStartup.zkBookiesAvailablePath)

    zkClient.close()
  }

  private def addShutdownHook(zkServer: TestingServer) = {
    Runtime.getRuntime.addShutdownHook(new Thread() {
      override def run(): Unit = {
        zkServer.close()
      }
    })
  }
} 
开发者ID:bwsw,项目名称:bookkeeper-tutorial-scala,代码行数:59,代码来源:ZookeeperServerStartup.scala


示例5: LeaderSelector

//设置package包名称以及导入依赖的类
package client

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter

class LeaderSelector(client: CuratorFramework,
                     electionPath: String)
  extends LeaderSelectorListenerAdapter
    with LeaderElectionMember {

  private val leaderSelector = {
    val leader =
      new org.apache.curator.framework.recipes.leader.LeaderSelector(
        client,
        electionPath,
        this
      )
    leader.autoRequeue()
    leader.start()

    leader
  }

  override def hasLeadership: Boolean = leaderSelector.hasLeadership

  @throws[Exception]
  override def takeLeadership(client: CuratorFramework): Unit = {
    this.synchronized {
      println("Becoming leader")
      try {
        while (true) this.wait()
      }
      catch {
        case _: InterruptedException =>
          Thread.currentThread.interrupt()
      }
    }
  }

  final def close(): Unit = leaderSelector.close()
} 
开发者ID:bwsw,项目名称:bookkeeper-tutorial-scala,代码行数:42,代码来源:LeaderSelector.scala


示例6: ZookeeperOffsetKeeper

//设置package包名称以及导入依赖的类
package com.bwsw.imp.message.kafka.offsets

import java.nio.ByteBuffer

import org.apache.curator.framework.CuratorFramework


class ZookeeperOffsetKeeper(implicit curatorFramework: CuratorFramework) extends OffsetKeeper {
  private def createContainers(topic: String) = curatorFramework.createContainers(s"/offsets/$topic")
  private def path(topic: String, partition: Int) = s"/offsets/$topic/$partition"
  private def allocateBuffer() = ByteBuffer.allocate(java.lang.Long.SIZE / java.lang.Byte.SIZE)


  def store(topic: String, offsets: Map[Int, Long]): Unit = {
    createContainers(topic)
    offsets foreach {
      case (partition, offset) => curatorFramework
        .create()
        .orSetData()
        .forPath(path(topic, partition), allocateBuffer().putLong(offset).array())
    }
  }

  def load(topic: String, partitions: Set[Int]): Map[Int, Long] = {
    createContainers(topic)
    partitions.map {
      partition => Option(curatorFramework.checkExists().forPath(path(topic, partition)))
        .fold (partition -> 0L) {
          _ => partition -> {
            val buffer = allocateBuffer()
            buffer.put(curatorFramework.getData.forPath(path(topic, partition)))
            buffer.rewind()
            buffer.getLong()
          }
        }
    }.filter {
      case (k, v) => k != -1
    }.toMap
  }

} 
开发者ID:bwsw,项目名称:imp,代码行数:42,代码来源:ZookeeperOffsetKeeper.scala


示例7: CuratorTests

//设置package包名称以及导入依赖的类
package com.bwsw.imp.curator

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}



class CuratorTests extends FlatSpec with Matchers with BeforeAndAfterAll {
  val ZOOKEEPER_PORT = 21810
  var testingServer = new TestingServer(ZOOKEEPER_PORT)
  implicit var curator: CuratorFramework = _

  override def beforeAll() = {
    testingServer.start()
    curator = CuratorFrameworkFactory.builder()
      .retryPolicy(new ExponentialBackoffRetry(1000, 3))
      .namespace("tests")
      .connectString(s"127.0.0.1:$ZOOKEEPER_PORT").build()
    curator.start()
  }

  it must "do nothing" in {}

  override def afterAll() = {
    curator.close()
    testingServer.stop()
  }

} 
开发者ID:bwsw,项目名称:imp,代码行数:32,代码来源:CuratorTests.scala


示例8: StreamDatabaseZK

//设置package包名称以及导入依赖的类
package com.bwsw.tstreamstransactionserver.netty.server.db.zk


import java.util.concurrent.ConcurrentHashMap

import com.bwsw.tstreamstransactionserver.netty.server.streamService.{StreamCRUD}
import com.bwsw.tstreamstransactionserver.netty.server.streamService
import org.apache.curator.framework.CuratorFramework

final class StreamDatabaseZK(client: CuratorFramework, path: String)
  extends StreamCRUD
{
  private val streamCache =
    new ConcurrentHashMap[streamService.StreamKey, streamService.StreamValue]()

  private val streamNamePath = new StreamNamePath(client, s"$path/names")
  private val streamIDPath   = new StreamIDPath(client, s"$path/ids")

  override def putStream(streamValue: streamService.StreamValue): streamService.StreamKey = {
    if (!streamNamePath.checkExists(streamValue.name)) {
      val streamRecord = streamIDPath.put(streamValue)
      streamNamePath.put(streamRecord)
      streamCache.put(streamRecord.key, streamRecord.stream)
      streamRecord.key
    } else streamService.StreamKey(-1)
  }

  override def checkStreamExists(name: String): Boolean =
    streamNamePath.checkExists(name)


  override def delStream(name: String): Boolean =
    streamNamePath.delete(name)


  override def getStream(name: String): Option[streamService.StreamRecord] =
    streamNamePath.get(name)


  override def getStream(streamKey: streamService.StreamKey): Option[streamService.StreamRecord] = {
    Option(streamCache.get(streamKey))
      .map(steamValue => streamService.StreamRecord(streamKey, steamValue))
      .orElse{
        val streamRecordOpt = streamIDPath.get(streamKey)
        streamRecordOpt.foreach(streamRecord =>
          streamCache.put(streamRecord.key, streamRecord.stream)
        )
        streamRecordOpt
      }
  }
} 
开发者ID:bwsw,项目名称:tstreams-transaction-server,代码行数:52,代码来源:StreamDatabaseZK.scala


示例9: SubscriberUtils

//设置package包名称以及导入依赖的类
package util

import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode

object SubscriberUtils {

  def putSubscriberInStream(client: CuratorFramework,
                            path: String,
                            partition: Int,
                            subscriber: String
                           ): Unit = {
    client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.PERSISTENT)
      .forPath(
        s"$path/subscribers/$partition/$subscriber",
        Array.emptyByteArray
      )
  }

  def deleteSubscriberInStream(client: CuratorFramework,
                               path: String,
                               partition: Int,
                               subscriber: String
                              ): Unit = {
    client.delete()
      .forPath(
        s"$path/subscribers/$partition/$subscriber"
      )
  }
} 
开发者ID:bwsw,项目名称:tstreams-transaction-server,代码行数:33,代码来源:SubscriberUtils.scala


示例10: DLock

//设置package包名称以及导入依赖的类
package yamrcraft.etlite.utils

import java.util.concurrent.TimeUnit

import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory


class DLock(zkConnect: String, lockFile: String, waitForLockSeconds: Int) {

  val logger = LoggerFactory.getLogger(this.getClass)

  private var zkClient: Option[CuratorFramework] = None
  private var lock: Option[InterProcessSemaphoreMutex] = None

  def tryLock(): Boolean = {
    require(lock.isEmpty, "lock can't be reused")
    logger.info("acquiring lock...")
    zkClient = Some(CuratorFrameworkFactory.newClient(zkConnect, new ExponentialBackoffRetry(1000, 3)))
    zkClient.get.start()
    lock = Some(new InterProcessSemaphoreMutex(zkClient.get, lockFile))
    lock.get.acquire(waitForLockSeconds, TimeUnit.SECONDS)
  }

  def release() = {
    require(lock.nonEmpty, "lock wasn't acquired")
    logger.info("releasing lock")
    lock.foreach(_.release())
    zkClient.foreach(_.close())
  }

}

class FakeLock extends DLock("", "", 0) {
  override def tryLock() = true

  override def release() = {}
} 
开发者ID:yamrcraft,项目名称:etl-light,代码行数:41,代码来源:DLock.scala



注:本文中的org.apache.curator.framework.CuratorFramework类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala RandomForest类代码示例发布时间:2022-05-23
下一篇:
Scala Text类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap