本文整理汇总了Scala中akka.persistence.query.EventEnvelope类的典型用法代码示例。如果您正苦于以下问题:Scala EventEnvelope类的具体用法?Scala EventEnvelope怎么用?Scala EventEnvelope使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了EventEnvelope类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: Reporter
//设置package包名称以及导入依赖的类
package com.udemy.akka.persistence.fsm
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
object Reporter extends App{
private val system: ActorSystem = ActorSystem("persistent-query")
val queries: LeveldbReadJournal = PersistenceQuery(system).readJournalFor(
LeveldbReadJournal.Identifier
)
val source:Source[EventEnvelope,NotUsed]= queries.eventsByPersistenceId("Account",0L,Long.MaxValue)
implicit val mat=ActorMaterializer()(system)
source.runForeach{
evt=>println(s"Event : $evt")
}
Thread.sleep(2000)
system.terminate()
}
开发者ID:akshay-harale,项目名称:udemy-akka,代码行数:29,代码来源:Reporter.scala
示例2: JdbcJournalReader
//设置package包名称以及导入依赖的类
package homeworkzen.domain.query
import java.time.Instant
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.scaladsl.Source
import homeworkzen.domain.command.message.Event
class JdbcJournalReader(implicit actorSystem: ActorSystem) extends JournalReader {
override def currentEventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
journal.currentEventsByTag(tag, 0)
override def eventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
journal.eventsByTag(tag, 0)
private def tryGetEventTime(event: Any): Option[Instant] = event match {
case e: Event => Some(e.timestamp)
case _ => None
}
override def newEventsByTag(tag: String): Source[EventEnvelope, NotUsed] = {
val now = Instant.now()
val isNotNew = (envelope: EventEnvelope) => !tryGetEventTime(envelope.event).exists(now.compareTo(_) < 0)
eventsByTag(tag).dropWhile(isNotNew)
}
private def journal(implicit actorSystem: ActorSystem) =
PersistenceQuery(actorSystem).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
}
开发者ID:anopse,项目名称:HomeworkZen,代码行数:33,代码来源:JdbcJournalReader.scala
示例3: LiveEventsByPersistenceIdPublisher
//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query
import akka.persistence.JournalProtocol.{ReplayMessages, ReplayedMessage}
import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope
import scala.concurrent.duration.FiniteDuration
class LiveEventsByPersistenceIdPublisher(persistenceId: String,
fromOffset: Long,
toOffset: Long,
refreshInterval: FiniteDuration,
maxBufSize: Int,
writeJournalPluginId: String)
extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {
override def subscribe(): Unit = {
journal ! SubscribePersistenceId(persistenceId)
}
def requestReplayFromJournal(limit: Int): Unit = {
journal ! ReplayMessages(currOffset, toOffset, limit, persistenceId, self)
}
override def replaying: Receive = super.replaying orElse {
case ReplayedMessage(p) =>
buf :+= EventEnvelope(
offset = p.sequenceNr,
persistenceId = persistenceId,
sequenceNr = p.sequenceNr,
event = p.payload)
currOffset = p.sequenceNr + 1
deliverBuf()
}
}
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:40,代码来源:LiveEventsByPersistenceIdPublisher.scala
示例4: LiveEventsByTagsPublisher
//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query
import akka.persistence.pg.EventTag
import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope
import scala.concurrent.duration.FiniteDuration
class LiveEventsByTagsPublisher(tags: Set[EventTag],
fromOffset: Long,
toOffset: Long,
refreshInterval: FiniteDuration,
maxBufSize: Int,
writeJournalPluginId: String)
extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {
override def subscribe(): Unit = {
journal ! SubscribeTags(tags)
}
def requestReplayFromJournal(limit: Int): Unit = {
journal ! ReplayTaggedMessages(currOffset, toOffset, limit, tags, self)
}
override def replaying: Receive = super.replaying orElse {
case ReplayedTaggedMessage(persistentRepr, _, offset) =>
log.debug(s"Received replayed message: ${persistentRepr.persistenceId}")
buf :+= EventEnvelope(
offset = offset,
persistenceId = persistentRepr.persistenceId,
sequenceNr = persistentRepr.sequenceNr,
event = persistentRepr.payload
)
currOffset = offset + 1
deliverBuf()
}
}
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:42,代码来源:LiveEventsByTagsPublisher.scala
示例5: LiveEventsPublisher
//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query
import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope
import scala.concurrent.duration.FiniteDuration
class LiveEventsPublisher(fromOffset: Long,
toOffset: Long,
refreshInterval: FiniteDuration,
maxBufSize: Int,
writeJournalPluginId: String)
extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {
override def subscribe(): Unit = {
journal ! SubscribeAllEvents
}
override def requestReplayFromJournal(limit: Int): Unit = {
journal ! ReplayMessages(currOffset, toOffset, limit, self)
}
override def replaying: Receive = super.replaying orElse {
case ReplayedEventMessage(persistentRepr, offset) =>
log.debug(s"Received replayed message: ${persistentRepr.persistenceId}")
buf :+= EventEnvelope(
offset = offset,
persistenceId = persistentRepr.persistenceId,
sequenceNr = persistentRepr.sequenceNr,
event = persistentRepr.payload
)
currOffset = offset + 1
deliverBuf()
}
}
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:40,代码来源:LiveEventsPublisher.scala
示例6: StatisticsService
//设置package包名称以及导入依赖的类
package service
import actors.LunchActor.LunchCreated
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.EventEnvelope
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import application.Application.EventReadJournal
import model.UserId
import util.{Formatting, Logging}
import scala.concurrent.{ExecutionContext, Future}
class StatisticsService(actorSystem: ActorSystem,
eventReadJournal: EventReadJournal)
extends Logging
with Formatting {
implicit val mat: ActorMaterializer = ActorMaterializer()(actorSystem)
def getLunchmasterStatistics(implicit executionContext: ExecutionContext,
actorSystem: ActorSystem): Future[Map[UserId, Int]] = {
val src: Source[EventEnvelope, NotUsed] = eventReadJournal.currentEventsByPersistenceId("LunchActor", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
events.runFold(Map.empty[UserId, Int]) {
case (acc, LunchCreated(lunchmaster, _)) =>
acc + (lunchmaster -> (acc.getOrElse(lunchmaster, 0) + 1))
case (acc, _) =>
acc
}
}
}
开发者ID:mturlo,项目名称:lunchbot,代码行数:39,代码来源:StatisticsService.scala
示例7: indexRoot
//设置package包名称以及导入依赖的类
package com.packt.masteringakka.bookstore.user
import java.util.Date
import com.packt.masteringakka.bookstore.common.ReadModelObject
import com.packt.masteringakka.bookstore.common.ViewBuilder
import akka.actor.Props
import com.packt.masteringakka.bookstore.common.BookstoreActor
import com.packt.masteringakka.bookstore.common.ElasticsearchSupport
import akka.persistence.query.EventEnvelope
trait BookstoreUserReadModel{
def indexRoot = "user"
def entityType = BookstoreUser.EntityType
}
object BookstoreUserViewBuilder{
val Name = "user-view-builder"
case class BookstoreUserRM(email:String, firstName:String, lastName:String,
createTs:Date, deleted:Boolean = false) extends ReadModelObject {
def id = email
}
def props = Props[BookstoreUserViewBuilder]
}
class BookstoreUserViewBuilder extends ViewBuilder[BookstoreUserViewBuilder.BookstoreUserRM] with BookstoreUserReadModel{
import BookstoreUser.Event._
import ViewBuilder._
import BookstoreUserViewBuilder._
def projectionId = Name
def actionFor(id:String, env:EventEnvelope) = env.event match {
case UserCreated(user) =>
val rm = BookstoreUserRM(user.email, user.firstName, user.lastName, user.createTs, user.deleted)
InsertAction(id, rm)
case PersonalInfoUpdated(first, last) =>
UpdateAction(id, List("firstName = fn", "lastName = ln"), Map("fn" -> first, "ln" -> last))
case UserDeleted(email) =>
UpdateAction(id, "deleted = true", Map.empty[String,Any])
}
}
object BookstoreUserView{
val Name = "bookstore-user-view"
case class FindUsersByName(name:String)
def props = Props[BookstoreUserView]
}
class BookstoreUserView extends BookstoreActor with ElasticsearchSupport with BookstoreUserReadModel{
import BookstoreUserView._
import context.dispatcher
def receive = {
case FindUsersByName(name) =>
val results = queryElasticsearch(s"firstName:$name OR lastName:$name")
pipeResponse(results)
}
}
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:62,代码来源:BookstoreUserReadModel.scala
示例8: EventPublisher
//设置package包名称以及导入依赖的类
package backend
import akka.NotUsed
import akka.actor.{ ActorContext, ActorSystem }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import io.funcqrs.Tag
import io.funcqrs.projections.PublisherFactory
import model.write.{ Order, OrderEvent }
import org.reactivestreams.Publisher
object EventPublisher {
def source(offset: Long, tag: Tag)(implicit actorSys: ActorSystem): Source[EventEnvelope, NotUsed] = {
val readJournal =
PersistenceQuery(actorSys)
.readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
readJournal.eventsByTag(tag.value, offset)
}
def orderEvents(implicit actorSys: ActorSystem) = {
new PublisherFactory[Long, OrderEvent] {
implicit val mat = ActorMaterializer()
override def from(offset: Option[Long]): Publisher[(Long, OrderEvent)] = {
val offsetNum = offset.getOrElse(0L)
EventPublisher
.source(offsetNum, Order.tag)
.collect {
case EventEnvelope(eventOffset, _, _, event: OrderEvent) =>
(eventOffset, event)
}
.runWith(Sink.asPublisher(false))
}
}
}
}
开发者ID:strongtyped,项目名称:fun-cqrs-workshop-lite,代码行数:49,代码来源:EventPublisher.scala
注:本文中的akka.persistence.query.EventEnvelope类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论