• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala EventEnvelope类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中akka.persistence.query.EventEnvelope的典型用法代码示例。如果您正苦于以下问题:Scala EventEnvelope类的具体用法?Scala EventEnvelope怎么用?Scala EventEnvelope使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了EventEnvelope类的8个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: Reporter

//设置package包名称以及导入依赖的类
package com.udemy.akka.persistence.fsm

import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object Reporter extends App{
  private val system: ActorSystem = ActorSystem("persistent-query")


  val queries: LeveldbReadJournal = PersistenceQuery(system).readJournalFor(
    LeveldbReadJournal.Identifier
  )
  val source:Source[EventEnvelope,NotUsed]= queries.eventsByPersistenceId("Account",0L,Long.MaxValue)


  implicit val mat=ActorMaterializer()(system)
  source.runForeach{
    evt=>println(s"Event : $evt")
  }

  Thread.sleep(2000)
  system.terminate()

} 
开发者ID:akshay-harale,项目名称:udemy-akka,代码行数:29,代码来源:Reporter.scala


示例2: JdbcJournalReader

//设置package包名称以及导入依赖的类
package homeworkzen.domain.query

import java.time.Instant

import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.stream.scaladsl.Source
import homeworkzen.domain.command.message.Event

class JdbcJournalReader(implicit actorSystem: ActorSystem) extends JournalReader {
  override def currentEventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
    journal.currentEventsByTag(tag, 0)

  override def eventsByTag(tag: String): Source[EventEnvelope, NotUsed] =
    journal.eventsByTag(tag, 0)

  private def tryGetEventTime(event: Any): Option[Instant] = event match {
    case e: Event => Some(e.timestamp)
    case _ => None
  }

  override def newEventsByTag(tag: String): Source[EventEnvelope, NotUsed] = {
    val now = Instant.now()
    val isNotNew = (envelope: EventEnvelope) => !tryGetEventTime(envelope.event).exists(now.compareTo(_) < 0)
    eventsByTag(tag).dropWhile(isNotNew)
  }

  private def journal(implicit actorSystem: ActorSystem) =
    PersistenceQuery(actorSystem).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
} 
开发者ID:anopse,项目名称:HomeworkZen,代码行数:33,代码来源:JdbcJournalReader.scala


示例3: LiveEventsByPersistenceIdPublisher

//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query

import akka.persistence.JournalProtocol.{ReplayMessages, ReplayedMessage}
import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope

import scala.concurrent.duration.FiniteDuration

class LiveEventsByPersistenceIdPublisher(persistenceId: String,
                                         fromOffset: Long,
                                         toOffset: Long,
                                         refreshInterval: FiniteDuration,
                                         maxBufSize: Int,
                                         writeJournalPluginId: String)
  extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {

  override def subscribe(): Unit = {
    journal ! SubscribePersistenceId(persistenceId)
  }

  def requestReplayFromJournal(limit: Int): Unit = {
    journal ! ReplayMessages(currOffset, toOffset, limit, persistenceId, self)
  }

  override def replaying: Receive = super.replaying orElse {

    case ReplayedMessage(p) =>
      buf :+= EventEnvelope(
        offset = p.sequenceNr,
        persistenceId = persistenceId,
        sequenceNr = p.sequenceNr,
        event = p.payload)
      currOffset = p.sequenceNr + 1
      deliverBuf()

  }


} 
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:40,代码来源:LiveEventsByPersistenceIdPublisher.scala


示例4: LiveEventsByTagsPublisher

//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query

import akka.persistence.pg.EventTag
import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope

import scala.concurrent.duration.FiniteDuration

class LiveEventsByTagsPublisher(tags: Set[EventTag],
                                fromOffset: Long,
                                toOffset: Long,
                                refreshInterval: FiniteDuration,
                                maxBufSize: Int,
                                writeJournalPluginId: String)
  extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {

  override def subscribe(): Unit = {
    journal ! SubscribeTags(tags)
  }

  def requestReplayFromJournal(limit: Int): Unit = {
    journal ! ReplayTaggedMessages(currOffset, toOffset, limit, tags, self)
  }

  override def replaying: Receive = super.replaying orElse {

    case ReplayedTaggedMessage(persistentRepr, _, offset) =>
      log.debug(s"Received replayed message: ${persistentRepr.persistenceId}")
      buf :+= EventEnvelope(
        offset = offset,
        persistenceId = persistentRepr.persistenceId,
        sequenceNr = persistentRepr.sequenceNr,
        event = persistentRepr.payload
      )
      currOffset = offset + 1
      deliverBuf()

  }


} 
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:42,代码来源:LiveEventsByTagsPublisher.scala


示例5: LiveEventsPublisher

//设置package包名称以及导入依赖的类
package akka.persistence.pg.journal.query

import akka.persistence.pg.journal.PgAsyncWriteJournal._
import akka.persistence.query.EventEnvelope

import scala.concurrent.duration.FiniteDuration

class LiveEventsPublisher(fromOffset: Long,
                          toOffset: Long,
                          refreshInterval: FiniteDuration,
                          maxBufSize: Int,
                          writeJournalPluginId: String)

  extends BaseEventsPublisher(fromOffset, toOffset, refreshInterval, maxBufSize, writeJournalPluginId) {

  override def subscribe(): Unit = {
    journal ! SubscribeAllEvents
  }

  override def requestReplayFromJournal(limit: Int): Unit = {
    journal ! ReplayMessages(currOffset, toOffset, limit, self)
  }

  override def replaying: Receive = super.replaying orElse {

    case ReplayedEventMessage(persistentRepr, offset) =>
      log.debug(s"Received replayed message: ${persistentRepr.persistenceId}")
      buf :+= EventEnvelope(
        offset = offset,
        persistenceId = persistentRepr.persistenceId,
        sequenceNr = persistentRepr.sequenceNr,
        event = persistentRepr.payload
      )
      currOffset = offset + 1
      deliverBuf()

  }

} 
开发者ID:WegenenVerkeer,项目名称:akka-persistence-postgresql,代码行数:40,代码来源:LiveEventsPublisher.scala


示例6: StatisticsService

//设置package包名称以及导入依赖的类
package service

import actors.LunchActor.LunchCreated
import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.query.EventEnvelope
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import application.Application.EventReadJournal
import model.UserId
import util.{Formatting, Logging}

import scala.concurrent.{ExecutionContext, Future}

class StatisticsService(actorSystem: ActorSystem,
                        eventReadJournal: EventReadJournal)
  extends Logging
    with Formatting {

  implicit val mat: ActorMaterializer = ActorMaterializer()(actorSystem)

  def getLunchmasterStatistics(implicit executionContext: ExecutionContext,
                               actorSystem: ActorSystem): Future[Map[UserId, Int]] = {

    val src: Source[EventEnvelope, NotUsed] = eventReadJournal.currentEventsByPersistenceId("LunchActor", 0L, Long.MaxValue)

    val events: Source[Any, NotUsed] = src.map(_.event)

    events.runFold(Map.empty[UserId, Int]) {
      case (acc, LunchCreated(lunchmaster, _)) =>
        acc + (lunchmaster -> (acc.getOrElse(lunchmaster, 0) + 1))
      case (acc, _) =>
        acc
    }

  }

} 
开发者ID:mturlo,项目名称:lunchbot,代码行数:39,代码来源:StatisticsService.scala


示例7: indexRoot

//设置package包名称以及导入依赖的类
package com.packt.masteringakka.bookstore.user

import java.util.Date
import com.packt.masteringakka.bookstore.common.ReadModelObject
import com.packt.masteringakka.bookstore.common.ViewBuilder
import akka.actor.Props
import com.packt.masteringakka.bookstore.common.BookstoreActor
import com.packt.masteringakka.bookstore.common.ElasticsearchSupport
import akka.persistence.query.EventEnvelope


trait BookstoreUserReadModel{
  def indexRoot = "user"
  def entityType = BookstoreUser.EntityType
}

object BookstoreUserViewBuilder{
  val Name = "user-view-builder"
  case class BookstoreUserRM(email:String, firstName:String, lastName:String, 
    createTs:Date, deleted:Boolean = false) extends ReadModelObject {
    def id = email
  }
  def props = Props[BookstoreUserViewBuilder]
}

class BookstoreUserViewBuilder extends ViewBuilder[BookstoreUserViewBuilder.BookstoreUserRM] with BookstoreUserReadModel{
  import BookstoreUser.Event._
  import ViewBuilder._
  import BookstoreUserViewBuilder._
  
  def projectionId = Name
  def actionFor(id:String, env:EventEnvelope) = env.event match {
    case UserCreated(user) =>
      val rm = BookstoreUserRM(user.email, user.firstName, user.lastName, user.createTs, user.deleted)
      InsertAction(id, rm)
      
    case PersonalInfoUpdated(first, last) =>
      UpdateAction(id, List("firstName = fn", "lastName = ln"), Map("fn" -> first, "ln" -> last))
      
      
    case UserDeleted(email) =>
      UpdateAction(id, "deleted = true", Map.empty[String,Any])
  }
}

object BookstoreUserView{
  val Name = "bookstore-user-view"
  case class FindUsersByName(name:String)
  def props = Props[BookstoreUserView]
}

class BookstoreUserView extends BookstoreActor with ElasticsearchSupport with BookstoreUserReadModel{
  import BookstoreUserView._
  import context.dispatcher
  
  def receive = {
    case FindUsersByName(name) =>
      val results = queryElasticsearch(s"firstName:$name OR lastName:$name")
      pipeResponse(results)      
  }
} 
开发者ID:PacktPublishing,项目名称:Mastering-Akka,代码行数:62,代码来源:BookstoreUserReadModel.scala


示例8: EventPublisher

//设置package包名称以及导入依赖的类
package backend

import akka.NotUsed
import akka.actor.{ ActorContext, ActorSystem }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import io.funcqrs.Tag
import io.funcqrs.projections.PublisherFactory
import model.write.{ Order, OrderEvent }
import org.reactivestreams.Publisher

object EventPublisher {

  
  def source(offset: Long, tag: Tag)(implicit actorSys: ActorSystem): Source[EventEnvelope, NotUsed] = {

    val readJournal =
      PersistenceQuery(actorSys)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)

    readJournal.eventsByTag(tag.value, offset)
  }

  def orderEvents(implicit actorSys: ActorSystem) = {

    new PublisherFactory[Long, OrderEvent] {

      implicit val mat = ActorMaterializer()

      override def from(offset: Option[Long]): Publisher[(Long, OrderEvent)] = {

        val offsetNum = offset.getOrElse(0L)

        EventPublisher
          .source(offsetNum, Order.tag)
          .collect {
            case EventEnvelope(eventOffset, _, _, event: OrderEvent) =>
              (eventOffset, event)
          }
          .runWith(Sink.asPublisher(false))
      }

    }
  }

} 
开发者ID:strongtyped,项目名称:fun-cqrs-workshop-lite,代码行数:49,代码来源:EventPublisher.scala



注:本文中的akka.persistence.query.EventEnvelope类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Lock类代码示例发布时间:2022-05-23
下一篇:
Scala GetObjectRequest类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap