本文整理汇总了Scala中akka.persistence.query.PersistenceQuery类的典型用法代码示例。如果您正苦于以下问题:Scala PersistenceQuery类的具体用法?Scala PersistenceQuery怎么用?Scala PersistenceQuery使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了PersistenceQuery类的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: PlayerServiceImpl
//设置package包名称以及导入依赖的类
package com.chriswk.gameranker.player.impl
import java.util.UUID
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.chriswk.gameranker.player.api
import com.chriswk.gameranker.player.api.PlayerService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import scala.concurrent.ExecutionContext
class PlayerServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends PlayerService {
private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
override def createPlayer = ServiceCall { createPlayer =>
val playerId = UUID.randomUUID()
refFor(playerId).ask(CreatePlayer(createPlayer.name)).map { _ =>
api.Player(playerId, createPlayer.name)
}
}
override def getPlayer(playerId: UUID) = ServiceCall { _ =>
refFor(playerId).ask(GetPlayer).map {
case Some(player) => api.Player(playerId, player.name)
case None => throw NotFound(s"Player with id $playerId")
}
}
private def refFor(playerId: UUID) = registry.refFor[PlayerEntity](playerId.toString)
override def getPlayers = ServiceCall { _ =>
currentIdsQuery.currentPersistenceIds()
.filter(_.startsWith("PlayerEntity|"))
.mapAsync(4) { id =>
val entityId = id.split("\\|", 2).last
registry.refFor[PlayerEntity](entityId)
.ask(GetPlayer)
.map(_.map(player => api.Player(UUID.fromString(entityId), player.name)))
}
.collect {
case Some(p) => p
}
.runWith(Sink.seq)
}
}
开发者ID:chriswk,项目名称:gameranker,代码行数:53,代码来源:PlayerServiceImpl.scala
示例2: Reporter
//设置package包名称以及导入依赖的类
package com.udemy.akka.persistence.fsm
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object Reporter extends App{
private val system: ActorSystem = ActorSystem("persistent-query")
val queries: LeveldbReadJournal = PersistenceQuery(system).readJournalFor(
LeveldbReadJournal.Identifier
)
val source:Source[EventEnvelope,NotUsed]= queries.eventsByPersistenceId("Account",0L,Long.MaxValue)
implicit val mat=ActorMaterializer()(system)
source.runForeach{
evt=>println(s"Event : $evt")
}
Thread.sleep(2000)
system.terminate()
}
开发者ID:akshay-harale,项目名称:udemy-akka,代码行数:29,代码来源:Reporter.scala
示例3: Step4PersistenceQuery
//设置package包名称以及导入依赖的类
package scaladays.akka.stream
import akka.NotUsed
import akka.actor.{Actor, ActorSystem, Props}
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
import scaladays.akka.actors.PersistGreetingActor
import scaladays.akka.actors.PersistGreetingActor.WhoToGreet
import scaladays.akka.http.MyJsonProtocol
import scaladays.akka.support.MakingUpData
object Step4PersistenceQuery extends App
with MakingUpData with MyJsonProtocol {
implicit val system = ActorSystem()
implicit val ec = system.dispatcher
implicit val mat = ActorMaterializer()
val greetPersister = system.actorOf(PersistGreetingActor.props("greetPersister"), "greetPersister")
// TODO: sending data here ----------------------------------------------------------------------
system.actorOf(Props(new Actor {
context.system.scheduler.schedule(1.second, 10.seconds, self, "PING")
override def receive: Receive = {
case _ =>
greetPersister ! WhoToGreet("Johan") // who had an accident last week :'-(
greetPersister ! WhoToGreet("Patrik")
greetPersister ! WhoToGreet("Endre")
greetPersister ! WhoToGreet("Martynas")
greetPersister ! WhoToGreet("ktoso")
}
}), "sender")
// TODO: Query side starts here -----------------------------------------------------------------
private val pq = PersistenceQuery(system)
val journal = pq.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val ids: Source[String, NotUsed] = journal.allPersistenceIds()
ids.runForeach { id =>
journal.currentEventsByPersistenceId(id).runForeach { e =>
println(s"ID($id), event = $e")
}
}
}
开发者ID:ktoso,项目名称:scaladays-berlin-akka-streams,代码行数:53,代码来源:Step4PersistenceQuery.scala
示例4: UserServiceImpl
//设置package包名称以及导入依赖的类
package com.example.auction.user.impl
import java.util.UUID
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.example.auction.user.api
import com.example.auction.user.api.UserService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import scala.concurrent.ExecutionContext
class UserServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends UserService {
private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
override def createUser = ServiceCall { createUser =>
val userId = UUID.randomUUID()
refFor(userId).ask(CreateUser(createUser.name)).map { _ =>
api.User(userId, createUser.name)
}
}
override def getUser(userId: UUID) = ServiceCall { _ =>
refFor(userId).ask(GetUser).map {
case Some(user) =>
api.User(userId, user.name)
case None =>
throw NotFound(s"User with id $userId")
}
}
override def getUsers = ServiceCall { _ =>
// Note this should never make production....
currentIdsQuery.currentPersistenceIds()
.filter(_.startsWith("UserEntity|"))
.mapAsync(4) { id =>
val entityId = id.split("\\|", 2).last
registry.refFor[UserEntity](entityId)
.ask(GetUser)
.map(_.map(user => api.User(UUID.fromString(entityId), user.name)))
}.collect {
case Some(user) => user
}
.runWith(Sink.seq)
}
private def refFor(userId: UUID) = registry.refFor[UserEntity](userId.toString)
}
开发者ID:lagom,项目名称:online-auction-scala,代码行数:55,代码来源:UserServiceImpl.scala
示例5: HmdaQuery
//设置package包名称以及导入依赖的类
package hmda.persistence.processing
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl._
import akka.stream.scaladsl.Source
import hmda.persistence.PersistenceConfig._
import hmda.persistence.messages.CommonMessages._
object HmdaQuery {
type RJ = ReadJournal with AllPersistenceIdsQuery with CurrentPersistenceIdsQuery with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery with EventsByTagQuery2 with CurrentEventsByTagQuery2
case class EventWithSeqNr(seqNr: Long, event: Event)
val journalId = configuration.getString("akka.persistence.query.journal.id")
def readJournal(system: ActorSystem) = {
PersistenceQuery(system).readJournalFor[RJ](journalId)
}
def events(persistenceId: String)(implicit system: ActorSystem): Source[Event, NotUsed] = {
readJournal(system).currentEventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
.map(_.event.asInstanceOf[Event])
}
def liveEvents(persistenceId: String)(implicit system: ActorSystem): Source[Event, NotUsed] = {
readJournal(system).eventsByPersistenceId(persistenceId, 0L, Long.MaxValue)
.map(_.event.asInstanceOf[Event])
}
def eventsWithSequenceNumber(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long)(implicit system: ActorSystem): Source[EventWithSeqNr, NotUsed] = {
readJournal(system)
.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
.map(x => EventWithSeqNr(x.sequenceNr, x.event.asInstanceOf[Event]))
}
}
开发者ID:cfpb,项目名称:hmda-platform,代码行数:40,代码来源:HmdaQuery.scala
示例6: JdbcJournalReader
//设置package包名称以及导入依赖的类
package homeworkzen.domain.query
import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.scaladsl.Source
import homeworkzen.domain.command.message.Event
class JdbcJournalReader(implicit actorSystem: ActorSystem) extends JournalReader {
override def currentEventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
journal.currentEventsByTag(tag, 0)
override def eventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
journal.eventsByTag(tag, 0)
private def tryGetEventTime(event: Any): Option[Instant] = event match {
case e: Event => Some(e.timestamp)
case _ => None
}
override def newEventsByTag(tag: String): Source[EventEnvelope, NotUsed] = {
val now = Instant.now()
val isNotNew = (envelope: EventEnvelope) => !tryGetEventTime(envelope.event).exists(now.compareTo(_) < 0)
eventsByTag(tag).dropWhile(isNotNew)
}
private def journal(implicit actorSystem: ActorSystem) =
PersistenceQuery(actorSystem).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:33,代码来源:JdbcJournalReader.scala
示例7: FriendJournalReader
//设置package包名称以及导入依赖的类
package com.packt.chapter6
import akka.actor.ActorSystem
import akka.persistence.Recovery
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import scala.concurrent.duration._
object FriendJournalReader extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val mat = ActorMaterializer()(system)
val queries = PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val laura = system.actorOf(FriendActor.props("Laura", Recovery()))
val maria = system.actorOf(FriendActor.props("Maria", Recovery()))
laura ! AddFriend(Friend("Hector"))
laura ! AddFriend(Friend("Nancy"))
maria ! AddFriend(Friend("Oliver"))
maria ! AddFriend(Friend("Steve"))
system.scheduler.scheduleOnce(5 second, maria, AddFriend(Friend("Steve")))
system.scheduler.scheduleOnce(10 second, maria, RemoveFriend(Friend("Oliver")))
Thread.sleep(2000)
queries.allPersistenceIds().map(id => system.log.info(s"Id received [$id]")).to(Sink.ignore).run()
queries.eventsByPersistenceId("Laura").map(e => log(e.persistenceId, e.event)).to(Sink.ignore).run()
queries.eventsByPersistenceId("Maria").map(e => log(e.persistenceId, e.event)).to(Sink.ignore).run()
def log(id: String, evt: Any) = system.log.info(s"Id [$id] Event [$evt]")
}
开发者ID:PacktPublishing,项目名称:Akka-Cookbook,代码行数:33,代码来源:FriendJournalReader.scala
示例8: Guardian
//设置package包名称以及导入依赖的类
package com.todos
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.routing.RoundRobinPool
import com.todos.repository.{TodoRepositoryProcessor, TodoRepositoryView}
class Guardian() extends Actor with ActorLogging {
log.info("TodoService up and running...")
val todoRepositoryProcessor: ActorRef = context.actorOf(
TodoRepositoryProcessor.props(),
name = TodoRepositoryProcessor.name
)
val readJournal: CassandraReadJournal = PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val todoRepositoryView: ActorRef = context.actorOf(
TodoRepositoryView.props(readJournal).withRouter(RoundRobinPool(5)),
name = TodoRepositoryView.name
)
context.actorOf(
Api.props(
todoRepositoryViewPath = todoRepositoryView.path,
todoRepositoryProcessorPath = todoRepositoryProcessor.path
),
name = Api.name
)
def receive: Receive = Actor.emptyBehavior
}
object Guardian {
val name: String = "guardian"
def props(): Props = {
Props(
classOf[Guardian]
)
}
}
开发者ID:benniekrijger,项目名称:todo-service,代码行数:46,代码来源:Guardian.scala
示例9: JdbcPersistentEntityRegistry
//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.scaladsl.persistence.jdbc
import akka.actor.ActorSystem
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.scaladsl.EventsByTagQuery
import akka.persistence.query.{ NoOffset, Offset, PersistenceQuery, Sequence }
import com.lightbend.lagom.internal.persistence.jdbc.SlickProvider
import com.lightbend.lagom.internal.scaladsl.persistence.AbstractPersistentEntityRegistry
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
private[lagom] final class JdbcPersistentEntityRegistry(system: ActorSystem, slickProvider: SlickProvider)
extends AbstractPersistentEntityRegistry(system) {
private lazy val ensureTablesCreated = slickProvider.ensureTablesCreated()
override def register(entityFactory: => PersistentEntity): Unit = {
ensureTablesCreated
super.register(entityFactory)
}
override protected val journalId: String = JdbcReadJournal.Identifier
private val jdbcReadJournal = PersistenceQuery(system).readJournalFor[JdbcReadJournal](journalId)
override protected val eventsByTagQuery: Option[EventsByTagQuery] = Some(jdbcReadJournal)
override protected def mapStartingOffset(storedOffset: Offset): Offset = storedOffset match {
case NoOffset => NoOffset
case Sequence(value) => Sequence(value + 1)
case other =>
throw new IllegalArgumentException(s"JDBC does not support ${other.getClass.getSimpleName} offsets")
}
}
开发者ID:lagom,项目名称:lagom,代码行数:34,代码来源:JdbcPersistentEntityRegistry.scala
示例10: Module
//设置package包名称以及导入依赖的类
package com.github.dnvriend
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.scaladsl.ReadJournal
import com.google.inject.{ AbstractModule, Inject, Provider }
class Module extends AbstractModule {
protected def configure(): Unit = {
bind(classOf[ReadJournal])
.toProvider(classOf[CassandraReadJournalProvider])
}
}
class CassandraReadJournalProvider @Inject() (system: ActorSystem) extends Provider[ReadJournal] {
override def get(): ReadJournal =
PersistenceQuery.apply(system).readJournalFor(CassandraReadJournal.Identifier)
}
开发者ID:dnvriend,项目名称:akka-persistence-cassandra-test,代码行数:20,代码来源:Module.scala
示例11: PaymentHistory
//设置package包名称以及导入依赖的类
package aia.persistence
import akka.actor._
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
object PaymentHistory {
def props(shopperId: Long) = Props(new PaymentHistory(shopperId))
def name(shopperId: Long) = s"payment_history_${shopperId}"
case object GetHistory
case class History(items: List[Item] = Nil) {
def paid(paidItems: List[Item]) = {
History(paidItems ++ items)
}
}
}
class PaymentHistory(shopperId: Long) extends Actor
with ActorLogging {
import PaymentHistory._
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
implicit val materializer = ActorMaterializer()
queries.eventsByPersistenceId(Wallet.name(shopperId)).runWith(Sink.actorRef(self, None))
var history = History()
def receive = {
case Wallet.Paid(items, _) => history = history.paid(items)
case GetHistory => sender() ! history
}
}
开发者ID:gilbutITbook,项目名称:006877,代码行数:40,代码来源:PaymentHistory.scala
示例12: CalculatorHistory
//设置package包名称以及导入依赖的类
package aia.persistence.calculator
import akka.actor._
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
object CalculatorHistory {
def props = Props(new CalculatorHistory)
def name = "calculator-history"
case object GetHistory
case class History(added: Int = 0, subtracted: Int = 0, divided: Int = 0, multiplied: Int = 0) {
def incrementAdded = copy(added = added + 1)
def incrementSubtracted= copy(subtracted = subtracted + 1)
def incrementDivided = copy(divided = divided + 1)
def incrementMultiplied = copy(multiplied = multiplied + 1)
}
}
class CalculatorHistory extends Actor {
import Calculator._
import CalculatorHistory._
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
implicit val materializer = ActorMaterializer()
queries.eventsByPersistenceId(Calculator.name).runWith(Sink.actorRef(self, None))
var history = History()
def receive = {
case _ : Added => history = history.incrementAdded
case _ : Subtracted => history = history.incrementSubtracted
case _ : Divided => history = history.incrementDivided
case _ : Multiplied => history = history.incrementMultiplied
case GetHistory => sender() ! history
}
}
开发者ID:gilbutITbook,项目名称:006877,代码行数:42,代码来源:CalculatorHistory.scala
示例13: LeagueProjection
//设置package包名称以及导入依赖的类
package eu.reactivesystems.league.impl
import akka.actor.{Actor, ActorLogging, Props, Status}
import akka.pattern.pipe
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.{EventEnvelope2, PersistenceQuery}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import com.lightbend.lagom.scaladsl.persistence.jdbc.JdbcSession
class LeagueProjection(jdbcSession: JdbcSession)
extends Actor
with ActorLogging {
import DBOperations._
override def receive: Receive = {
case Status.Failure(ex) =>
log.error(ex, "read side generation terminated")
context.stop(self)
}
override def preStart(): Unit = {
val materializer = ActorMaterializer.create(context.system)
val readJournal = PersistenceQuery
.get(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
import context.dispatcher
val result = getOffset(jdbcSession)
.flatMap(
offset =>
readJournal
.eventsByTag(LeagueEvent.Tag.tag, offset)
.mapAsync(1)(e => projectEvent(e))
.runWith(Sink.ignore)(materializer))
result pipeTo self
()
}
private def projectEvent(event: EventEnvelope2) =
event.event match {
case ClubRegistered(club) => addClub(jdbcSession, event.offset, club)
case GamePlayed(game) => addGame(jdbcSession, event.offset, game)
case ResultRevoked(game) => revokeResult(jdbcSession, event.offset, game)
}
}
object LeagueProjection {
val readSideId = "leagueProjection"
def props(jdbcSession: JdbcSession) =
Props(new LeagueProjection(jdbcSession))
}
开发者ID:reactivesystems-eu,项目名称:eventsourcing-intro,代码行数:56,代码来源:LeagueProjection.scala
示例14: QueryByMailActor
//设置package包名称以及导入依赖的类
package com.giampaolotrapasso
import akka.actor.Actor
import akka.persistence.inmemory.query.journal.scaladsl.InMemoryReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.ActorMaterializer
import com.giampaolotrapasso.QueryByMailActor.{Response, UsersByMail}
import com.giampaolotrapasso.UserActor.{UserAdded, User}
import com.typesafe.scalalogging.LazyLogging
class QueryByMailActor extends Actor with LazyLogging {
val readJournal =
PersistenceQuery(context.system).readJournalFor[InMemoryReadJournal](InMemoryReadJournal.Identifier)
var usersByDomain: Map[String, List[String]] = Map.empty
val min = 0
val max = 1000
implicit val materializer = ActorMaterializer()
readJournal.allPersistenceIds().runForeach { persistenceId =>
readJournal.eventsByPersistenceId(persistenceId, min, max).runForeach { eventEnvelope =>
eventEnvelope.event match {
case event @ UserAdded(user, mail, age) =>
logger.info(s"received $event for $persistenceId")
val domain = mail.split("@").last
logger.info("Current domain " + domain)
val list: List[String] = usersByDomain.getOrElse(domain, List.empty[String])
logger.info("Current list " + list)
val newList: List[String] = list :+ mail
usersByDomain = usersByDomain + (domain -> newList)
}
}
}
override def receive = {
case UsersByMail(domain) => {
sender() ! Response(domain, usersByDomain(domain))
}
}
}
object QueryByMailActor {
case class UsersByMail(mail: String)
case class Response(domain: String, mails: List[String])
}
开发者ID:giampaolotrapasso,项目名称:persistence-query-example,代码行数:52,代码来源:QueryByMailActor.scala
示例15: UserServiceImpl
//设置package包名称以及导入依赖的类
package com.example.auction.user.impl
import java.util.UUID
import akka.actor.ActorSystem
import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
import akka.persistence.query.PersistenceQuery
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import com.example.auction.user.api
import com.example.auction.user.api.UserService
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.transport.NotFound
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import scala.concurrent.ExecutionContext
class UserServiceImpl(registry: PersistentEntityRegistry, system: ActorSystem)(implicit ec: ExecutionContext, mat: Materializer) extends UserService {
private val currentIdsQuery = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
override def createUser = ServiceCall { createUser =>
val userId = UUID.randomUUID()
refFor(userId).ask(CreateUser(createUser.name, createUser.hash)).map { _ =>
api.User(userId, createUser.name)
}
}
override def getUser(userId: UUID) = ServiceCall { _ =>
refFor(userId).ask(GetUser).map {
case Some(user) =>
api.User(userId, user.name)
case None =>
throw NotFound(s"User with id $userId")
}
}
override def getUsers = ServiceCall { _ =>
// Note this should never make production....
currentIdsQuery.currentPersistenceIds()
.filter(_.startsWith("UserEntity|"))
.mapAsync(4) { id =>
val entityId = id.split("\\|", 2).last
registry.refFor[UserEntity](entityId)
.ask(GetUser)
.map(_.map(user => api.User(UUID.fromString(entityId), user.name)))
}.collect {
case Some(user) => user
}
.runWith(Sink.seq)
}
private def refFor(userId: UUID) = registry.refFor[UserEntity](userId.toString)
}
开发者ID:kdevrou,项目名称:overflow,代码行数:55,代码来源:UserServiceImpl.scala
示例16: EventPublisher
//设置package包名称以及导入依赖的类
package backend
import akka.NotUsed
import akka.actor.{ ActorContext, ActorSystem }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import io.funcqrs.Tag
import io.funcqrs.projections.PublisherFactory
import model.write.{ Order, OrderEvent }
import org.reactivestreams.Publisher
object EventPublisher {
def source(offset: Long, tag: Tag)(implicit actorSys: ActorSystem): Source[EventEnvelope, NotUsed] = {
val readJournal =
PersistenceQuery(actorSys)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
readJournal.eventsByTag(tag.value, offset)
}
def orderEvents(implicit actorSys: ActorSystem) = {
new PublisherFactory[Long, OrderEvent] {
implicit val mat = ActorMaterializer()
override def from(offset: Option[Long]): Publisher[(Long, OrderEvent)] = {
val offsetNum = offset.getOrElse(0L)
EventPublisher
.source(offsetNum, Order.tag)
.collect {
case EventEnvelope(eventOffset, _, _, event: OrderEvent) =>
(eventOffset, event)
}
.runWith(Sink.asPublisher(false))
}
}
}
}
开发者ID:strongtyped,项目名称:fun-cqrs-workshop-lite,代码行数:49,代码来源:EventPublisher.scala
注:本文中的akka.persistence.query.PersistenceQuery类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论