• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala Executors类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中java.util.concurrent.Executors的典型用法代码示例。如果您正苦于以下问题:Scala Executors类的具体用法?Scala Executors怎么用?Scala Executors使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Executors类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Server

//设置package包名称以及导入依赖的类
package sh.webserver

import java.net.ServerSocket
import java.util.concurrent.{ExecutorService, Executors}

import sh.webserver.request.Request

import scala.annotation.tailrec

class Server(port: Int) {
  def start() {
    val server = new ServerSocket(port)
    val pool = Executors.newFixedThreadPool(8)
    listen(server, pool)
  }

  @tailrec
  private def listen(server : ServerSocket,pool : ExecutorService) {
    val socket = server.accept()
    pool.execute(new RequestHandler(socket))
    listen(server, pool)
  }
} 
开发者ID:stefan-hering,项目名称:scalaserver,代码行数:24,代码来源:Server.scala


示例2: KafkaClient

//设置package包名称以及导入依赖的类
package services.Kafka

import java.util.Properties
import java.util.concurrent.Executors

import akka.actor.{Props, DeadLetter, ActorSystem}
import kafka.consumer.{Consumer, ConsumerConfig}
import scala.concurrent.{ExecutionContext, Future}

object KafkaClient {

  val config = {
    val properties = new Properties()
    properties.put("bootstrap.servers", "localhost:9092")
    properties.put("group.id", "pogo_consumer")
    properties.put("auto.offset.reset", "largest")
    properties.put("zookeeper.connect", "localhost:2181")
    properties.put("zookeeper.session.timeout.ms", "400")
    properties.put("zookeeper.sync.time.ms", "200")
    properties.put("auto.commit.interval.ms", "500")
    new ConsumerConfig(properties)
  }

  // Our actor system managing our actors
  val system = ActorSystem("es-sharpshooter")

  // Taking care of dead letters
  system.eventStream.subscribe(system.actorOf(Props[IndexService], "dead-letters"), classOf[DeadLetter])

  // Dedicated Kafka Execution context
  implicit val KafkaContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))

  def start = {
    // Starting our consumer
    val consumer = Consumer.create(config)

    val topics = Map(
      "pokemons" -> 1,
      "spawnpoints" -> 1
    )

    val streams = consumer.createMessageStreams(topics)

    // Start the consumer asynchronously
    Future {
      streams.get("pokemons").get.foreach(PokemonService.cycle(system))
    } onFailure { case ec => println(ec) }
    Future {
      streams.get("spawnpoints").get.foreach(SpawnService.cycle(system))
    } onFailure { case ec => println(ec) }
  }
} 
开发者ID:fiahil,项目名称:Talks,代码行数:53,代码来源:KafkaClient.scala


示例3: createThreadPool

//设置package包名称以及导入依赖的类
package com.bwsw.sj.engine.core.engine

import java.util.concurrent.{ExecutorService, ExecutorCompletionService, Executors}

import com.bwsw.sj.common.utils.EngineLiterals
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.LoggerFactory


trait TaskRunner {

  private val logger = LoggerFactory.getLogger(this.getClass)
  protected val threadName: String
  private val threadPool: ExecutorService = createThreadPool(threadName)
  protected val executorService = new ExecutorCompletionService[Unit](threadPool)
  protected val blockingQueue: PersistentBlockingQueue = new PersistentBlockingQueue(EngineLiterals.persistentBlockingQueue)

  private def createThreadPool(factoryName: String) = {
    val countOfThreads = 3
    val threadFactory = createThreadFactory(factoryName)

    Executors.newFixedThreadPool(countOfThreads, threadFactory)
  }

  private def createThreadFactory(name: String) = {
    new ThreadFactoryBuilder()
      .setNameFormat(name)
      .build()
  }

  def handleException(exception: Throwable) = {
    logger.error("Runtime exception", exception)
    exception.printStackTrace()
    threadPool.shutdownNow()
    System.exit(-1)
  }
} 
开发者ID:bwsw,项目名称:sj-platform,代码行数:38,代码来源:TaskRunner.scala


示例4: Server

//设置package包名称以及导入依赖的类
package com.scala.examples.datagurn.lesson08

import com.scala.examples.datagurn.lesson07.Teacher
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import scala.concurrent.impl.Future
import java.util.concurrent.FutureTask
import scala.concurrent.impl.Future
import com.scala.examples.datagurn.lesson07.Teacher

object Server {
  
  // ????????????
  val executorService: ExecutorService = Executors.newFixedThreadPool(10);
  
  val teachers: List[Teacher] = List(
    new Teacher("t1", 20),
    new Teacher("t2", 30),
    new Teacher("t1", 25),
    new Teacher("t1", 21)
  );
  
  def concurrentFilterByName(name: String): FutureTask[List[Teacher]] = {
    val futures = new FutureTask[List[Teacher]](new Callable[List[Teacher]]() {
      def call(): List[Teacher] = {
        return filterByName(name);
      }
    })
    executorService.execute(futures);
    return futures;  // ?? Future
  }
  
  def filterByName(name: String) : List[Teacher] = {
    teachers.synchronized {
      for {
        item <- teachers
        if item.name == name
      } yield item
    }
  }
  
  def main(args: Array[String]): Unit = {
    val teachers : List[Teacher] = concurrentFilterByName("t1").get
    println(teachers)
    executorService.shutdown();
  }
  
  
} 
开发者ID:walle-liao,项目名称:scala-examples,代码行数:51,代码来源:Server.scala


示例5: RemoteConnection

//设置package包名称以及导入依赖的类
package akka.remote.testconductor

import org.jboss.netty.channel.{ Channel, ChannelPipeline, ChannelPipelineFactory, ChannelUpstreamHandler, SimpleChannelUpstreamHandler, DefaultChannelPipeline }
import org.jboss.netty.channel.socket.nio.{ NioClientSocketChannelFactory, NioServerSocketChannelFactory }
import org.jboss.netty.bootstrap.{ ClientBootstrap, ServerBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import java.net.InetSocketAddress
import java.util.concurrent.Executors
import akka.event.Logging
import akka.util.Helpers


private[akka] object RemoteConnection {
  def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler: ChannelUpstreamHandler): Channel = {
    role match {
      case Client ?
        val socketfactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool,
          poolSize)
        val bootstrap = new ClientBootstrap(socketfactory)
        bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
        bootstrap.setOption("tcpNoDelay", true)
        bootstrap.connect(sockaddr).getChannel
      case Server ?
        val socketfactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool,
          poolSize)
        val bootstrap = new ServerBootstrap(socketfactory)
        bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
        bootstrap.setOption("reuseAddress", !Helpers.isWindows)
        bootstrap.setOption("child.tcpNoDelay", true)
        bootstrap.bind(sockaddr)
    }
  }

  def getAddrString(channel: Channel) = channel.getRemoteAddress match {
    case i: InetSocketAddress ? i.toString
    case _                    ? "[unknown]"
  }

  def shutdown(channel: Channel) =
    try channel.close() finally try channel.getFactory.shutdown() finally channel.getFactory.releaseExternalResources()
} 
开发者ID:love1314sea,项目名称:akka-2.3.16,代码行数:45,代码来源:RemoteConnection.scala


示例6: UserCache

//设置package包名称以及导入依赖的类
package com.init6.db

import java.util.concurrent.{Executors, TimeUnit}

import com.init6.Config
import com.init6.utils.CaseInsensitiveHashMap

import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.util.Try


private[db] class UserCache(dbUsers: List[DbUser]) {

  private val cache = CaseInsensitiveHashMap[DbUser]()
  private val inserted = mutable.HashSet[String]()
  private val updated = mutable.HashSet[String]()

  private val executorService = Executors.newSingleThreadScheduledExecutor()
  private val updateInterval = Config().Database.batchUpdateInterval

  private val dbUpdateThread = new Runnable {
    override def run() = {
      Try {
        DAO.saveInserted(cache.filterKeys(inserted.contains).values.toSet)
        inserted.clear()
      }
      Try {
        DAO.saveUpdated(cache.filterKeys(updated.contains).values.toSet)
        updated.clear()
      }
    }
  }

  cache ++= dbUsers.map(dbUser => dbUser.username -> dbUser)
  executorService.scheduleWithFixedDelay(dbUpdateThread, updateInterval, updateInterval, TimeUnit.SECONDS)

  def close() = {
    executorService.shutdown()
    dbUpdateThread.run()
  }

  def get(username: String) = cache.get(username)

  def insert(username: String, password_hash: Array[Byte]) = {
    val now = System.currentTimeMillis
    val newUser = username.toLowerCase
    cache += newUser -> DbUser(username = newUser, password_hash = password_hash,
      created = now, last_logged_in = now)
    inserted += username.toLowerCase
  }

  def update(username: String, dbUser: DbUser) = {
    get(username).foreach(originalDbUser => {
      if (originalDbUser != dbUser) {
        cache += username -> dbUser
        updated += username.toLowerCase
      }
    })
  }
} 
开发者ID:fjaros,项目名称:init6,代码行数:62,代码来源:UserCache.scala


示例7: NettyBench

//设置package包名称以及导入依赖的类
package com.example

import java.time.Duration
import java.time.Instant
import java.util.concurrent.Executors

import com.naoh.beef.Client
import com.naoh.beef.Server
import com.naoh.beef.proto.echo.EchoGrpc
import com.naoh.beef.proto.echo.EchoGrpc.EchoBlockingStub
import com.naoh.beef.proto.echo.EchoReq
import io.grpc.CallOptions
import io.grpc.ManagedChannelBuilder
import io.grpc.ServerBuilder
import io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.NettyServerBuilder

import scala.concurrent.ExecutionContext
import scala.util.Try


class NettyBench {

  val serverCtx = ExecutionContext.fromExecutorService(Executors.newScheduledThreadPool(8))
  val clientCtx = ExecutionContext.fromExecutorService(Executors.newScheduledThreadPool(8))
  val server = NettyServerBuilder.forPort(8899).addService(EchoGrpc.bindService(EchoImpl, serverCtx)).build().start()
  val ch = NettyChannelBuilder.forAddress("localhost", 8899).usePlaintext(true).build()
  val client = new EchoBlockingStub(ch, CallOptions.DEFAULT.withExecutor(clientCtx))
  Thread.sleep(1000)

  val base = Instant.now
  Iterator.range(0, 3000).toSeq.toParArray.foreach { _ => Try(client.retEcho(EchoReq("12"))); print(".") }
  val record = Duration.between(base, Instant.now())

  println(s"\n\nDuration $record \n")

  Thread.sleep(2000)
  clientCtx.shutdown()
  serverCtx.shutdown()
  server.shutdown()
  ch.shutdown()
} 
开发者ID:naoh87,项目名称:grpc-scala-experiment,代码行数:43,代码来源:NettyBench.scala


示例8: BeefTest

//设置package包名称以及导入依赖的类
package com.example

import java.time.Duration
import java.time.Instant
import java.util.concurrent.Executors

import akka.actor.ActorSystem
import akka.cluster.Cluster
import com.naoh.beef.Auth
import com.naoh.beef.Beef
import com.naoh.beef.Client
import com.naoh.beef.Region
import com.naoh.beef.Server
import com.naoh.beef.proto.echo.EchoGrpc
import com.naoh.beef.proto.echo.EchoReq
import com.typesafe.config.ConfigFactory

import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.util.Try


class BeefTest {

  val serverSystem = ActorSystem("MyActorSystem", ConfigFactory.parseResources("server.conf").resolve())
  val clientSystem = ActorSystem("MyActorSystem", ConfigFactory.parseResources("client.conf").resolve())
  Cluster(serverSystem).join(Cluster(serverSystem).selfAddress)
  Cluster(clientSystem).join(Cluster(serverSystem).selfAddress)
  val serverCtx = ExecutionContext.fromExecutorService(Executors.newScheduledThreadPool(8))
  val clientCtx = ExecutionContext.fromExecutorService(Executors.newScheduledThreadPool(8))

  val region = Region("rg")
  val auth = Auth("au")

  Thread.sleep(1000)

  Beef(serverSystem)(
    Server(region)
      << EchoGrpc.bindService(EchoImpl, serverCtx))
  Thread.sleep(1000)

  val builder = Client(region, auth, clientCtx) connect Beef(clientSystem)
  val client = builder.build(new EchoGrpc.EchoBlockingStub(_, _))
  Thread.sleep(1000)

  val base = Instant.now
  Iterator.range(0, 3000).toSeq.toParArray.foreach{_ => Try(client.retEcho(EchoReq("12"))); print(".")}
  val record = Duration.between(base, Instant.now())

  println(s"\n\nDuration $record \n")

  Thread.sleep(2000)
  clientCtx.shutdown()
  serverCtx.shutdown()
  clientSystem.shutdown()
  serverSystem.shutdown()
} 
开发者ID:naoh87,项目名称:grpc-scala-experiment,代码行数:58,代码来源:BeefTest.scala


示例9: Server

//设置package包名称以及导入依赖的类
package pubsub

import java.net.ServerSocket
import java.net.Socket
import java.io.BufferedReader
import java.io.InputStreamReader
import java.net.URL

import java.util.concurrent.Executors
import scala.concurrent.JavaConversions._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import pubsub.collection._
import pubsub.command._
import pubsub.network.TCPReader

object Server extends App {
  val port = 7676
  val maxWorkers = 12
  val bufferSize = 20
  val socket = new ServerSocket(port)
  try {
    val whatismyip = new URL("http://checkip.amazonaws.com")
    val in = new BufferedReader(new InputStreamReader(whatismyip.openStream()));
    val serverIP = in.readLine()
    println(s"Connect to $serverIP (or `localhost`), port $port with `telnet` to join this server")
  } catch  {
    case e: Exception =>
      println("There is a problem with your internet connection, you can only access it via localhost")
  }

  val buffer = new BoundedBuffer[Command](20)
  val commandHandlers = for{
    i <- 0 until maxWorkers
  } yield {
    Future {
      new CommandHandler(buffer).handle()
    }
  }
  val threadPool = Executors.newFixedThreadPool(maxWorkers)

  var clientId = 0
  while(true) {
    val client = socket.accept();
    val cid = clientId
    clientId += 1
    Future{
      new TCPReader(clientId, client, buffer).read()
    }(threadPool)
  }
} 
开发者ID:vincenzobaz,项目名称:Parallelism-and-Concurrency-Assignments,代码行数:53,代码来源:Server.scala


示例10: HttpSecondariesExecutor

//设置package包名称以及导入依赖的类
package com.uglylabs.httpduplicator

import com.typesafe.scalalogging.Logger
import Utils._
import org.apache.http.util.EntityUtils

class HttpSecondariesExecutor(poolSize: Int, config: ServerConfig) {
	private val log = Logger[HttpSecondariesExecutor]
	
	private val client = createHttpClient(config)
	
	import java.util.concurrent.Executors
	import scala.concurrent._

	implicit private val ec = new ExecutionContext {
		val threadPool = Executors.newFixedThreadPool(poolSize)

		def execute(runnable: Runnable) =
			threadPool.submit(runnable)
		
		def reportFailure(t: Throwable) = 
			log.error("Thread pool error", t)
	}
	
	def enqueue(target: HttpTarget, request: HttpRequest) {
		Future {
			log.debug("Send request to secondary target: {}\n{}", target, formatRequest(request))
			val response = timing(s"Processing time for secondary $target : %dms", log.debug(_)) {
				client.execute(target.toHttpHost, request)
			}
				
			log.debug("Receive response from secondary target: {}\n{}", target, formatResponse(response))
				
			// release connection resources allocated to receive entity content
			EntityUtils.consume(response.getEntity())
		}.onFailure {
			case e => log.error(s"Error serve request to target: $target", e)
		}
	}
} 
开发者ID:lis0x90,项目名称:httpduplicator,代码行数:41,代码来源:HttpSecondariesExecutor.scala


示例11: CSVWriteBufOwnParallelTest

//设置package包名称以及导入依赖的类
import java.io.{BufferedWriter, FileWriter, PrintWriter}
import java.util.concurrent.Executors

import org.scalatest._

import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration.Duration
import scala.util.{Success, Failure}

class CSVWriteBufOwnParallelTest extends FlatSpec with Matchers {
  "Hello" should "have tests" in {


    lazy val doit: Int => Unit = i => {
      val bw = new PrintWriter(new BufferedWriter(new FileWriter(s"""/tmp/csv${i}.txt"""), 8192 * 256 * 4))
      (1 to 10000000).foreach(i => {
        bw.print(List(1, 2, 3, 4, 5, 6).mkString(","))
        bw.print("\r\n")
      })
      bw.close()
    }

    def timeSpentDoing(f: => Unit) = {
      val start = System.currentTimeMillis
      println(start)
      f
      System.currentTimeMillis - start
    }


    implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))

    val futures = (9 to 12).map(n => Future{ timeSpentDoing( doit(n) ) })
    val combined = Future.sequence(futures)
    Await.ready(combined, Duration.Inf)
    combined onComplete {
      case Success(duration) => println(duration)
      case Failure(ex) => println(ex)
    }




  }
} 
开发者ID:ralreiroe,项目名称:embarcadero,代码行数:46,代码来源:CSVWriteBufOwnParallelTest.scala


示例12: BankAccountEventHandler

//设置package包名称以及导入依赖的类
package org.styx.mongo

import java.util.concurrent.Executors

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.styx.bank.example.events._
import org.styx.bank.example.state.BankAccount
import org.styx.handler.{EventFetcher, EventHandler}
import org.styx.model.Event

import scala.concurrent.ExecutionContext


object BankAccountEventHandler {
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(30))

  val mapper = new ObjectMapper
  mapper.registerModule(DefaultScalaModule)

  def converter: MongoDBEvent => Event[BankAccount] = { mongoDBEvent =>
    val event = mongoDBEvent match {
      case MongoDBEvent("BankAccountCreated", eventDate, version, _, _) => BankAccountCreated(version, eventDate)
      case MongoDBEvent("BankAccountClosed", eventDate, version, _, _) => BankAccountClosed(version, eventDate)
      case MongoDBEvent("OwnerChanged", eventDate, version, _, _) => OwnerChanged(version, eventDate)
      case MongoDBEvent("DepositPerformed", eventDate, version, _, _) => DepositPerformed(version, eventDate)
      case MongoDBEvent("WithdrawalPerformed", eventDate, version, _, _) => WithdrawalPerformed(version, eventDate)
    }

    event.data = mapper.readValue(mongoDBEvent.data.toJson(), classOf[Map[String, Any]])
    event
  }

  implicit val eventHandler: EventHandler[BankAccount] with EventFetcher[BankAccount] = MongoDBEventHandlerFetcher(MongoD.collection, mapper, converter)
} 
开发者ID:gabfssilva,项目名称:styx,代码行数:36,代码来源:BankAccountEventHandler.scala


示例13: MessageRelayManager

//设置package包名称以及导入依赖的类
package io.grhodes.mcm.server.gcm

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{DelayQueue, Executors}

import com.gilt.gfc.logging.Loggable

import scala.util.control.NonFatal


class MessageRelayManager(val gcmMessageStore: GCMMessageStore) extends Loggable {

  private val executorService = Executors.newFixedThreadPool(50)
  private val queue = new DelayQueue[ScheduledJob]()
  private val keepRunning = new AtomicBoolean(true)
  private val schedulerFuture = executorService.submit(new JobScheduler(this))

  def shutdown(): Boolean = {
    this.keepRunning.set(false)
    gcmMessageStore.shutdown()
    queue.clear()
    schedulerFuture.cancel(true)
    true
  }

  def addJob(job: ScheduledJob): Boolean = queue.add(job)

  class JobScheduler(relayManager: MessageRelayManager) extends Runnable {
    override def run(): Unit = {
      while (keepRunning.get()) {
        try {
          val scheduledJob = queue.take()
          executorService.execute(new Runnable() {
            override def run(): Unit = scheduledJob.execute(relayManager)
          });
        } catch {
          case NonFatal(e) =>
            error("JobScheduler.run() got exception:", e)
        }
      }
    }
  }

} 
开发者ID:grahamar,项目名称:mcm-server,代码行数:45,代码来源:MessageRelayManager.scala


示例14: ExchangeSpec

//设置package包名称以及导入依赖的类
package ru.tolsi.matcher.naive

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import ru.tolsi.matcher.{AbstractExchangeSpec, ClientInfo, UnitSpec}

class ExchangeSpec extends UnitSpec
  with AbstractExchangeSpec[ThreadUnsafeClientRepository, SingleThreadOrderBook, SingleThreadOrderExecutor] {
  override def ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
  override def buildClientRepo(clients: Seq[ClientInfo]): ThreadUnsafeClientRepository = ThreadUnsafeClientRepository(
    clients.map(ThreadUnsafeClient.fromClientInfo))
  override def orderBook: SingleThreadOrderBook = new SingleThreadOrderBook
  override def ordersExecutor: SingleThreadOrderExecutor = new SingleThreadOrderExecutor

  override def extraCheck(
      repo: ThreadUnsafeClientRepository,
      orderBook: SingleThreadOrderBook,
      executor: SingleThreadOrderExecutor): Unit = {
    forAll(orderBook.instrumentsOrderBook.values.flatMap(_.values.map(_.isEmpty))) { isEmpty =>
      isEmpty should be(true)
    }
  }
} 
开发者ID:Tolsi,项目名称:matcher,代码行数:24,代码来源:ExchangeSpec.scala


示例15: SwaveIdentityProcessorVerification

//设置package包名称以及导入依赖的类
package swave.core.tck

import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{IdentityProcessorVerification, TestEnvironment}
import org.scalatest.testng.TestNGSuiteLike
import org.testng.SkipException
import org.testng.annotations.AfterClass
import swave.core._

abstract class SwaveIdentityProcessorVerification[T](val testEnv: TestEnvironment, publisherShutdownTimeout: Long)
    extends IdentityProcessorVerification[T](testEnv, publisherShutdownTimeout) with TestNGSuiteLike
    with StreamEnvShutdown {

  def this(printlnDebug: Boolean) =
    this(
      new TestEnvironment(Timeouts.defaultTimeout.toMillis, printlnDebug),
      Timeouts.publisherShutdownTimeout.toMillis)

  def this() = this(false)

  override def createFailedPublisher(): Publisher[T] =
    Spout.failing[T](new Exception("Nope")).drainTo(Drain.toPublisher()).get

  // Publishers created by swave don't support fanout by default
  override def maxSupportedSubscribers: Long = 1L

  override def required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber(): Unit =
    throw new SkipException("Not relevant for publisher w/o fanout support")

  override lazy val publisherExecutorService: ExecutorService =
    Executors.newFixedThreadPool(3)

  @AfterClass
  def shutdownPublisherExecutorService(): Unit = {
    publisherExecutorService.shutdown()
    publisherExecutorService.awaitTermination(3, TimeUnit.SECONDS)
  }
} 
开发者ID:sirthias,项目名称:swave,代码行数:40,代码来源:SwaveIdentityProcessorVerification.scala


示例16: PooledContexts

//设置package包名称以及导入依赖的类
package support

import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

import scala.concurrent.ExecutionContext


object PooledContexts extends LoggerSupport {
  private def newService(threadCount:Int):ExecutorService =
    ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(threadCount))
  private def ctx(service:ExecutorService) = ExecutionContext.fromExecutor(service)

  private val db = newService(10)
  implicit val dbContext:ExecutionContext = ctx(db)

  private val app = newService(5)
  implicit val appContext:ExecutionContext = ctx(app)

  def shutdown(): Unit = {
    Seq(db, app).foreach {svc =>
      try {
        svc.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
      } catch {
        case ex:Exception => Logger.warn("termination failed", ex)
      } finally {
        svc.shutdownNow()
      }
    }
  }

} 
开发者ID:tockri,项目名称:hello-next,代码行数:32,代码来源:PooledContexts.scala


示例17: ExecutionContextSchedulerTest

//设置package包名称以及导入依赖的类
package reactor.core.scala.scheduler

import java.util.concurrent.{Executors, ThreadFactory}

import org.scalatest.{FreeSpec, Matchers}
import reactor.core.scala.publisher.Mono
import reactor.test.StepVerifier

import scala.concurrent.ExecutionContext


class ExecutionContextSchedulerTest extends FreeSpec with Matchers {
  "ExecutionContextScheduler" - {
    "should create a Scheduler using provided ExecutionContext" in {
      val executionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1, new ThreadFactory {
        override def newThread(r: Runnable): Thread = new Thread(r, "THREAD-NAME")
      }))
      val mono = Mono.just(1)
        .subscribeOn(ExecutionContextScheduler(executionContext))
        .doOnNext(i => Thread.currentThread().getName shouldBe "THREAD-NAME")
      StepVerifier.create(mono)
        .expectNext(1)
        .verifyComplete()
    }
  }
} 
开发者ID:sinwe,项目名称:reactor-core-scala,代码行数:27,代码来源:ExecutionContextSchedulerTest.scala


示例18: TaskManager

//设置package包名称以及导入依赖的类
package lert.core

import java.util.concurrent.{Executors, TimeUnit}

import lert.core.config.ConfigProvider
import lert.core.rule.RuleLoader
import com.google.inject.Inject
import com.typesafe.scalalogging.LazyLogging

class TaskManager(period: Long, task: Task) extends LazyLogging {
  private val scheduler = Executors.newSingleThreadScheduledExecutor()

  def start(): Unit = {
    scheduler.scheduleWithFixedDelay(task, 0, period, TimeUnit.MILLISECONDS)
    logger.info("Task manager has been started")
  }

  def stop(): Unit = scheduler.shutdownNow()
}

class Task @Inject()(configProvider: ConfigProvider,
                     ruleLoader: RuleLoader) extends Runnable with LazyLogging {
  override def run(): Unit = {
    try {
      val config = configProvider.config
      require(config != null, "Config is not specified")

      logger.debug("Task is being run")
      config
        .rules
        .foreach(ruleLoader.process)
    } catch {
      case ex: Any =>
        logger.error(ex.getLocalizedMessage)
        logger.debug(ex.getLocalizedMessage, ex)
    }
  }

} 
开发者ID:l3rt,项目名称:l3rt,代码行数:40,代码来源:TaskManager.scala


示例19: Futures

//设置package包名称以及导入依赖的类
package org.zalando.benchmarks

import java.util.concurrent.Executors

import akka.actor.ActorSystem

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._

class Futures(system: ActorSystem) {
  import ComputationFollowedByAsyncPublishing._

  def benchmark(coreFactor: Int): Unit = {
    import system.dispatcher

    // execution context only for the (cpu-bound) computation
    val ec = ExecutionContext fromExecutorService Executors.newFixedThreadPool(numWorkers(coreFactor))

    try {
      // `traverse` will distribute the tasks to the thread pool, the rest happens fully async
      printResult(Await.result(Future.traverse(1 to numTasks map Job) { job =>
        Future(Computer compute job)(ec) flatMap (Publisher.publish(_, system))
      }, 1 hour))

    } finally {
      // the execution context has to be shut down explicitly
      ec.shutdown()
    }
  }
} 
开发者ID:narayana-glassbeam,项目名称:scala-concurrency-playground,代码行数:31,代码来源:Futures.scala


示例20: Blocking

//设置package包名称以及导入依赖的类
package org.zalando.benchmarks

import java.util.concurrent.{Callable, Executors}

import akka.actor.ActorSystem

import scala.concurrent.Await
import scala.concurrent.duration._

class Blocking(system: ActorSystem) {
  import ComputationFollowedByAsyncPublishing._

  def benchmark(coreFactor: Int): Unit = {
    // let's do this Ye Olde Schoole Way
    val exec = Executors newFixedThreadPool numWorkers(coreFactor)

    try {
      val futures = 1 to numTasks map Job map { job =>
        exec.submit(new Callable[PublishResult] {
          // explicitly turn async publishing operation into a blocking operation
          override def call(): PublishResult = Await.result(Publisher publish (Computer compute job, system), 1 hour)
        })
      }
      printResult(futures map (_.get))

    } finally {
      // never forget
      exec.shutdown()
    }
  }
} 
开发者ID:narayana-glassbeam,项目名称:scala-concurrency-playground,代码行数:32,代码来源:Blocking.scala



注:本文中的java.util.concurrent.Executors类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala URI类代码示例发布时间:2022-05-23
下一篇:
Scala current类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap