• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Scala EventStreamElement类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Scala中com.lightbend.lagom.scaladsl.persistence.EventStreamElement的典型用法代码示例。如果您正苦于以下问题:Scala EventStreamElement类的具体用法?Scala EventStreamElement怎么用?Scala EventStreamElement使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了EventStreamElement类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。

示例1: HelloServiceImpl

//设置package包名称以及导入依赖的类
package sample.helloworld.impl

import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import sample.helloworld.api.model.GreetingMessage
import sample.helloworld.api.HelloService


class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {

  override def hello(id: String) = ServiceCall { _ =>
    // Look up the Hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id, None))
  }

  override def useGreeting(id: String) = ServiceCall { request =>
    // Look up the Hello entity for the given ID.
         val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }

  override def greetingsTopic(): Topic[GreetingMessage] = {
    TopicProducer.singleStreamWithOffset {
      offset =>
        persistentEntityRegistry.eventStream(HelloEventTag.instance, offset)
          .map(ev => (convertEvent(ev), offset))
    }
  }

  private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => GreetingMessage(msg)
    }
  }

} 
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:44,代码来源:HelloServiceImpl.scala


示例2: MessageEventProcessor

//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl

import akka.Done
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}


class MessageEventProcessor(cassandraReadSide: CassandraReadSide, cassandraSession: CassandraSession)
                           (implicit ec: ExecutionContext) extends ReadSideProcessor[MessageEvent] {

  private var insertWordStmt: PreparedStatement = _

  override def buildHandler(): ReadSideHandler[MessageEvent] = {
    cassandraReadSide.builder[MessageEvent]("message_offset")
      .setGlobalPrepare(createTable)
      .setPrepare { tags =>
        prepareStatements
      }
      .setEventHandler[MessageSaved](insertWord)
      .build()
  }

  override def aggregateTags: Set[AggregateEventTag[MessageEvent]] = Set(MessageEventTag.INSTANCE)

  private def createTable(): Future[Done] = {
    for {
      _ <- cassandraSession.executeCreateTable(
        """        CREATE TABLE IF NOT EXISTS wordcounttest (
                      words text,
                      insertion_time timestamp,
                      PRIMARY KEY (words,insertion_time)
                    )WITH CLUSTERING ORDER BY (insertion_time DESC)
        """)
    } yield Done
  }

  private def prepareStatements(): Future[Done] = {
    for {
      insert <- cassandraSession.prepare(
        """insert into wordcounttest(words ,insertion_time) values(? ,toTimestamp(now())) """)
    } yield {
      insertWordStmt = insert
      Done
    }
  }

  private def insertWord(started: EventStreamElement[MessageSaved]): Future[List[BoundStatement]] = {
    Future.successful {
     val words = started.event.msg.replaceAll("[^\\p{L}\\p{Nd}]+", " ").split(" ").toList
     words.map{ word=> insertWordStmt.bind(word) }
    }
  }

} 
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:59,代码来源:MessageEventProcessor.scala


示例3: ReadSideTestDriver

//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl



import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}


class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
  private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]

  override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
    val processor = processorFactory
    val eventTags = processor.aggregateTags
    val handler = processor.buildHandler()
    val future = for {
      _ <- handler.globalPrepare()
      offset <- handler.prepare(eventTags.head)
    } yield {
      handler -> offset
    }
    synchronized {
      val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
      processors += (eventTags.head.eventType -> (handlers :+ future))
    }
  }

  def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
    processors.get(event.aggregateTag.eventType) match {
      case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
      case Some(handlerFutures) =>
        for {
          handlers <- Future.sequence(handlerFutures)
          _ <- Future.sequence(handlers.map {
            case (handler: ReadSideHandler[Event], _) =>
              Source.single(new EventStreamElement(entityId, event, offset))
                .via(handler.handle())
                .runWith(Sink.ignore)
          })
        } yield {
          Done
        }
    }
  }
} 
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:52,代码来源:ReadSideTestDriver.scala


示例4: BasketServiceImpl

//设置package包名称以及导入依赖的类
package demo.impl.basket

import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.InvalidCommandException
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, ExtraTransportExceptions, Item}

import scala.collection.immutable
import scala.concurrent.ExecutionContext

class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
  extends BasketService with ExtraTransportExceptions{
  override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
  }

  override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
    persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item))
      .map(_ => NotUsed)
      .recoverWith {
        case e: InvalidCommandException => throw BadRequest(e.message)
      }
  }

  override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
  }

  override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll)
      .map(x => NotUsed)
      .recoverWith {
        case e: InvalidCommandException => throw BadRequest(e.message)
      }
  }

  override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder)
      .map(x => NotUsed)
      .recoverWith {
        case e: InvalidCommandException => throw BadRequest(e.message)
      }
  }

  override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
      TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
        persistentEntities.eventStream(tag, offset).collect {
          case EventStreamElement(t, OrderPlaced(id, basket), o)  => {
            demo.api.basket.OrderPlaced(id, basket) -> o
          }
        }
    }
} 
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:57,代码来源:BasketServiceImpl.scala


示例5: BasketServiceImpl

//设置package包名称以及导入依赖的类
package demo.impl.basket

import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.InvalidCommandException
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, ExtraTransportExceptions, Item}

import scala.collection.immutable
import scala.concurrent.ExecutionContext

class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
  extends BasketService with ExtraTransportExceptions{
  override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
  }

  override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
    persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item))
      .map(_ => NotUsed)
      .recoverWith {
        case e: InvalidCommandException => throw BadRequest(e.message)
      }
  }

  override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
  }

  override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll)
      .map(x => NotUsed)
      .recoverWith {
        case e: InvalidCommandException => throw BadRequest(e.message)
      }
  }

  override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder).map(_ => NotUsed)
  }

  override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
      TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
        persistentEntities.eventStream(tag, offset).collect {
          case EventStreamElement(t, OrderPlaced(id, basket), o)  => {
            demo.api.basket.OrderPlaced(id, basket) -> o
          }
        }
    }
} 
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:53,代码来源:BasketServiceImpl.scala


示例6: BasketServiceImpl

//设置package包名称以及导入依赖的类
package demo.impl.basket

import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, Item}

import scala.collection.immutable
import scala.concurrent.ExecutionContext

class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
  extends BasketService {
  override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
  }

  override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
    persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item)).map(_ => NotUsed)
  }

  override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
  }

  override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll).map(x => NotUsed)
  }

  override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
    persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder).map(_ => NotUsed)
  }

  override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
      TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
        persistentEntities.eventStream(tag, offset).collect {
          case EventStreamElement(t, OrderPlaced(id, basket), o)  => {
            demo.api.basket.OrderPlaced(id, basket) -> o
          }
        }
    }
} 
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:44,代码来源:BasketServiceImpl.scala


示例7: ReadSideTestDriver

//设置package包名称以及导入依赖的类
package com.example.auction.item.impl

import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}


class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
  private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]

  override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
    val processor = processorFactory
    val eventTags = processor.aggregateTags
    val handler = processor.buildHandler()
    val future = for {
      _ <- handler.globalPrepare()
      offset <- handler.prepare(eventTags.head)
    } yield {
      handler -> offset
    }
    synchronized {
      val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
      processors += (eventTags.head.eventType -> (handlers :+ future))
    }
  }

  def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
    processors.get(event.aggregateTag.eventType) match {
      case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
      case Some(handlerFutures) =>
        for {
          handlers <- Future.sequence(handlerFutures)
          _ <- Future.sequence(handlers.map {
            case (handler: ReadSideHandler[Event], _) =>
              Source.single(new EventStreamElement(entityId, event, offset))
                .via(handler.handle())
                .runWith(Sink.ignore)
          })
        } yield {
          Done
        }
    }
  }
} 
开发者ID:lagom,项目名称:online-auction-scala,代码行数:50,代码来源:ReadSideTestDriver.scala


示例8: MonitorServiceImpl

//设置package包名称以及导入依赖的类
package org.wex.cmsfs.monitor.impl

import java.util.UUID

import akka.persistence.query.Offset
import com.datastax.driver.core.utils.UUIDs
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import org.wex.cmsfs.monitor.api
import org.wex.cmsfs.monitor.api._

import scala.concurrent.{ExecutionContext, Future}

class MonitorServiceImpl(registry: PersistentEntityRegistry)(implicit ec: ExecutionContext) extends MonitorService {
  override def createMonitor: ServiceCall[Monitor, Monitor] = ServiceCall { m =>
    val id = UUIDs.timeBased()
    val monitor = Monitor(Some(id), m.cron, m.mode, m.monitorId, m.connectorId, MonitorStatus.Created)
    val command = CreateMonitor(monitor)
    entityRef(id).ask(command).map(_ => monitor)
  }

  override def monitorCollectEvents = TopicProducer.taggedStreamWithOffset(MonitorEvent.Tag.allTags.toList) { (tag, offset) =>
    println(11)
    registry.eventStream(tag, offset).mapAsync(1)(p => Future{
      println(offset);
      (api.MonitorCreated(UUIDs.random(), 1, 1), offset)
    })
  }


    private def convertEvent(eventStreamElement: EventStreamElement[MonitorEvent]): Future[(api.MonitorEvent, Offset)] = {
      eventStreamElement match {
        case EventStreamElement(_, MonitorCreated(m), offset) =>
          Future((api.MonitorCreated(m.id.get, m.monitorId, m.connectorId), offset))
      }
    }

  private def entityRef(id: UUID) = entityRefString(id.toString)

  private def entityRefString(id: String) = registry.refFor[MonitorEntity](id)
} 
开发者ID:shinhwazx160,项目名称:test,代码行数:43,代码来源:MonitorServiceImpl.scala


示例9: MessageRepository

//设置package包名称以及导入依赖的类
package com.example.lagompractice.impl

import java.sql.Connection

import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, ReadSideProcessor}
import com.lightbend.lagom.scaladsl.persistence.jdbc.{JdbcReadSide, JdbcSession}

import scala.concurrent.ExecutionContext

class MessageRepository(session: JdbcSession)(implicit ec: ExecutionContext) {

}

class MessageEventProcessor(readSide: JdbcReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[MessageEvent] {

  import JdbcSession.tryWith

  override def aggregateTags = MessageEvent.Tag.allTags

  override def buildHandler() = {
    readSide.builder[MessageEvent]("messageEventOffset")
      .setGlobalPrepare(createTables)
      .setEventHandler[MessageCreated](processMessageChanged)
      .build()
  }

  private def createTables(connection: Connection): Unit = {
    tryWith(connection.prepareStatement("""
      CREATE TABLE IF NOT EXISTS messages (
        messageId varchar(36),
        message TEXT,
        PRIMARY KEY (messageId)
      )
    """))(_.execute())
  }

  private def processMessageChanged(connection: Connection, eventElement: EventStreamElement[MessageCreated]): Unit = {
    tryWith(connection.prepareStatement("INSERT INTO messages (messageId, message) VALUES (?, ?)")) { ps =>
      ps.setString(1, eventElement.event.message.id.toString)
      ps.setString(2, eventElement.event.message.message)
      ps.execute()
    }
  }

  /*
  private def createTables(connection: Connection): Unit = {
    tryWith(connection.prepareStatement("INSERT INTO messages (message) VALUES (?)")) { ps =>
      ps.execute()
    }
  }
  */
} 
开发者ID:Saneyan,项目名称:lagom-practice,代码行数:53,代码来源:MessageRepository.scala


示例10: AddServiceImpl

//设置package包名称以及导入依赖的类
package com.orion.add.impl

import akka.Done
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import com.orion.add.api
import com.orion.add.api.AddService


class AddServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends AddService {
  override def add(): ServiceCall[api.AddNumber, String] = ServiceCall { request =>
    val ref = persistentEntityRegistry.refFor[NumberEntity]("constant")

    ref.ask(AddNumber(request.number))
  }

  override def curry(): ServiceCall[api.CurryWith, Done] = ServiceCall { request =>
    val ref = persistentEntityRegistry.refFor[NumberEntity]("constant")

    ref.ask(CurryWith(request.number))
  }

  override def additionTopic(): Topic[api.CurriedWith] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(NumberEvent.Tags, fromOffset)
        .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(numberEvent: EventStreamElement[NumberEvent]): api.CurriedWith = {
    numberEvent.event match {
      case CurriedWith(num) => api.CurriedWith(num)
    }
  }
} 
开发者ID:utsavgupta,项目名称:add-lagom,代码行数:38,代码来源:AddServiceImpl.scala


示例11: HelloServiceImpl

//设置package包名称以及导入依赖的类
package com.example.hello.impl

import com.example.hello.api
import com.example.hello.api.{HelloService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}


class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {

  override def hello(id: String) = ServiceCall { _ =>
    // Look up the Hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id))
  }

  override def useGreeting(id: String) = ServiceCall { request =>
    // Look up the Hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }


  override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(HelloEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): api.GreetingMessageChanged = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
    }
  }
} 
开发者ID:namelos,项目名称:lagom-spike,代码行数:43,代码来源:HelloServiceImpl.scala


示例12: IdentityEventProcessor

//设置package包名称以及导入依赖的类
package io.digitalcat.publictransportation.services.identity.impl

import java.util.UUID

import akka.Done
import com.datastax.driver.core.PreparedStatement
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}

import scala.concurrent.{ExecutionContext, Future}

class IdentityEventProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[IdentityEvent] {
  private var insertUserStatement: PreparedStatement = _
  private var reportIdToReservedUsernamesStatement: PreparedStatement = _
  private var reportIdToReservedEmailsStatement: PreparedStatement = _

  override def buildHandler(): ReadSideHandler[IdentityEvent] = {
    readSide.builder[IdentityEvent]("identityEventOffset")
      .setPrepare { tag =>
        prepareStatements()
      }.setEventHandler[UserCreated](insertUser)
      .build()
  }

  override def aggregateTags: Set[AggregateEventTag[IdentityEvent]] = {
    IdentityEvent.Tag.allTags
  }

  private def prepareStatements(): Future[Done] = {
    for {
      insertUser <- session.prepare("INSERT INTO users(id, client_id, username, email, first_name, last_name, hashed_password) VALUES (?, ?, ?, ?, ?, ?, ?)")
      reportIdToReservedUsernames <- session.prepare("UPDATE reserved_usernames SET user_id = ? WHERE username = ?")
      reportIdToReservedEmails <- session.prepare("UPDATE reserved_emails SET user_id = ? WHERE email = ?")
    } yield {
      insertUserStatement = insertUser
      reportIdToReservedUsernamesStatement = reportIdToReservedUsernames
      reportIdToReservedEmailsStatement = reportIdToReservedEmails
      Done
    }
  }

  private def insertUser(user: EventStreamElement[UserCreated]) = {
    Future.successful(
      List(
        insertUserStatement.bind(
          user.event.userId,
          UUID.fromString(user.entityId),
          user.event.username,
          user.event.email,
          user.event.firstName,
          user.event.lastName,
          user.event.hashedPassword
        ),
        reportIdToReservedUsernamesStatement.bind(user.event.userId, user.event.username),
        reportIdToReservedEmailsStatement.bind(user.event.userId, user.event.email)
      )
    )
  }
} 
开发者ID:dpalinic,项目名称:lagom-jwt-authentication,代码行数:61,代码来源:IdentityEventProcessor.scala


示例13: HelloServiceImpl

//设置package包名称以及导入依赖的类
package sample.helloworld.impl

import akka.{Done, NotUsed}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import sample.helloworld.api.model.GreetingMessage
import sample.helloworld.api.HelloService


class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {

  override def hello(id: String): ServiceCall[NotUsed, _root_.sample.helloworld.impl.Hello#ReplyType] = ServiceCall { _ =>
    // Look up the Hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id, None))
  }

  override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ServiceCall { request =>
    // Look up the Hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[HelloEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }

  override def greetingsTopic(): Topic[GreetingMessage] = {
    TopicProducer.singleStreamWithOffset {
      offset =>
        persistentEntityRegistry.eventStream(HelloEventTag.instance, offset)
          .map(ev => (convertEvent(ev), offset))
    }
  }

  private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => GreetingMessage(msg)
    }
  }

} 
开发者ID:knoldus,项目名称:lagom-play-spike.g8,代码行数:45,代码来源:HelloServiceImpl.scala


示例14: GithubServiceImpl

//设置package包名称以及导入依赖的类
package com.onedrop.github.impl

import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import com.onedrop.github
import com.onedrop.github.api.GithubService


class GithubServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends GithubService {

  override def getOrg(org: String) = ServiceCall { _ =>
    // Look up the github entity for the given ID.
    val ref = persistentEntityRegistry.refFor[GithubEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id))
  }

  override def getMembers(org: String) = ServiceCall { request =>
    // Look up the github entity for the given ID.
    val ref = persistentEntityRegistry.refFor[GithubEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }


  override def getRepos(org: String): Topic[github.api.GreetingMessageChanged] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(GithubEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(helloEvent: EventStreamElement[GithubEvent]): github.api.GreetingMessageChanged = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => github.api.GreetingMessageChanged(helloEvent.entityId, msg)
    }
  }
} 
开发者ID:jewelsjacobs,项目名称:one-drop-test,代码行数:43,代码来源:GithubServiceImpl.scala


示例15: LagomhelloServiceImpl

//设置package包名称以及导入依赖的类
package de.breitbandig.lagomhello.impl

import de.breitbandig.lagomhello.api
import de.breitbandig.lagomhello.api.{LagomhelloService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}


class LagomhelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends LagomhelloService {

  override def hello(id: String) = ServiceCall { _ =>
    // Look up the lagom-hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[LagomhelloEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id))
  }

  override def useGreeting(id: String) = ServiceCall { request =>
    // Look up the lagom-hello entity for the given ID.
    val ref = persistentEntityRegistry.refFor[LagomhelloEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }


  override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(LagomhelloEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(helloEvent: EventStreamElement[LagomhelloEvent]): api.GreetingMessageChanged = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
    }
  }
} 
开发者ID:aanno,项目名称:lagom-hello,代码行数:43,代码来源:LagomhelloServiceImpl.scala


示例16: newPosts

//设置package包名称以及导入依赖的类
package docs.home.scaladsl.persistence

import com.lightbend.lagom.scaladsl.persistence.ReadSide

import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.Service
import akka.stream.scaladsl.Source
import akka.persistence.query.NoOffset
import com.lightbend.lagom.scaladsl.persistence.EventStreamElement
import scala.concurrent.Future

trait BlogServiceImpl3 {
  trait BlogService extends Service {
    def newPosts(): ServiceCall[NotUsed, Source[PostSummary, _]]

    override def descriptor = ???
  }

  //#register-event-processor
  class BlogServiceImpl(
    persistentEntityRegistry: PersistentEntityRegistry,
    readSide: ReadSide,
    myDatabase: MyDatabase) extends BlogService {

    readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))
    //#register-event-processor

    //#event-stream
    override def newPosts(): ServiceCall[NotUsed, Source[PostSummary, _]] =
      ServiceCall { request =>
        val response: Source[PostSummary, NotUsed] =
          persistentEntityRegistry.eventStream(BlogEvent.Tag.forEntityId(""), NoOffset)
            .collect {
              case EventStreamElement(entityId, event: PostAdded, offset) =>
                PostSummary(event.postId, event.content.title)
            }
        Future.successful(response)
      }
    //#event-stream
  }
} 
开发者ID:lagom,项目名称:lagom,代码行数:44,代码来源:BlogServiceImpl3.scala


示例17: CassandraReadSideImpl

//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.scaladsl.persistence.cassandra

import akka.Done
import akka.actor.ActorSystem
import com.datastax.driver.core.BoundStatement
import com.lightbend.lagom.internal.persistence.cassandra.CassandraOffsetStore
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraReadSide.ReadSideHandlerBuilder
import com.lightbend.lagom.scaladsl.persistence.cassandra.{ CassandraReadSide, CassandraSession }
import com.lightbend.lagom.scaladsl.persistence.{ AggregateEvent, AggregateEventTag, EventStreamElement }

import scala.collection.immutable
import scala.concurrent.Future
import scala.reflect.ClassTag


private[lagom] final class CassandraReadSideImpl(
  system: ActorSystem, session: CassandraSession, offsetStore: CassandraOffsetStore
) extends CassandraReadSide {

  private val dispatcher = system.settings.config.getString("lagom.persistence.read-side.use-dispatcher")
  implicit val ec = system.dispatchers.lookup(dispatcher)

  override def builder[Event <: AggregateEvent[Event]](eventProcessorId: String): ReadSideHandlerBuilder[Event] = {
    new ReadSideHandlerBuilder[Event] {
      import CassandraAutoReadSideHandler.Handler
      private var prepareCallback: AggregateEventTag[Event] => Future[Done] = tag => Future.successful(Done)
      private var globalPrepareCallback: () => Future[Done] = () => Future.successful(Done)
      private var handlers = Map.empty[Class[_ <: Event], Handler[Event]]

      override def setGlobalPrepare(callback: () => Future[Done]): ReadSideHandlerBuilder[Event] = {
        globalPrepareCallback = callback
        this
      }

      override def setPrepare(callback: (AggregateEventTag[Event]) => Future[Done]): ReadSideHandlerBuilder[Event] = {
        prepareCallback = callback
        this
      }

      override def setEventHandler[E <: Event: ClassTag](handler: EventStreamElement[E] => Future[immutable.Seq[BoundStatement]]): ReadSideHandlerBuilder[Event] = {
        val eventClass = implicitly[ClassTag[E]].runtimeClass.asInstanceOf[Class[Event]]
        handlers += (eventClass -> handler.asInstanceOf[Handler[Event]])
        this
      }

      override def build(): ReadSideHandler[Event] = {
        new CassandraAutoReadSideHandler[Event](session, offsetStore, handlers, globalPrepareCallback, prepareCallback, eventProcessorId, dispatcher)
      }
    }
  }
} 
开发者ID:lagom,项目名称:lagom,代码行数:53,代码来源:CassandraReadSideImpl.scala


示例18: ReferentialwebcrawlerServiceImpl

//设置package包名称以及导入依赖的类
package com.andycot.referentialwebcrawler.impl

import com.andycot.referentialwebcrawler.api
import com.andycot.referentialwebcrawler.api.{ReferentialwebcrawlerService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}


class ReferentialwebcrawlerServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends ReferentialwebcrawlerService {

  override def hello(id: String) = ServiceCall { _ =>
    // Look up the referential-webcrawler entity for the given ID.
    val ref = persistentEntityRegistry.refFor[ReferentialwebcrawlerEntity](id)

    // Ask the entity the Hello command.
    ref.ask(Hello(id))
  }

  override def useGreeting(id: String) = ServiceCall { request =>
    // Look up the referential-webcrawler entity for the given ID.
    val ref = persistentEntityRegistry.refFor[ReferentialwebcrawlerEntity](id)

    // Tell the entity to use the greeting message specified.
    ref.ask(UseGreetingMessage(request.message))
  }


  override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(ReferentialwebcrawlerEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(helloEvent: EventStreamElement[ReferentialwebcrawlerEvent]): api.GreetingMessageChanged = {
    helloEvent.event match {
      case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
    }
  }
} 
开发者ID:frferrari,项目名称:referential-webcrawler,代码行数:43,代码来源:ReferentialwebcrawlerServiceImpl.scala


示例19: ReadSideTestDriver

//设置package包名称以及导入依赖的类
package optrak.lagomtest.utils

import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler

import scala.concurrent.{ExecutionContext, Future}


class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
  private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]

  override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
    val processor = processorFactory
    val eventTags = processor.aggregateTags
    val handler = processor.buildHandler()
    val future = for {
      _ <- handler.globalPrepare()
      offset <- handler.prepare(eventTags.head)
    } yield {
      handler -> offset
    }
    synchronized {
      val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
      processors += (eventTags.head.eventType -> (handlers :+ future))
    }
  }

  def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
    processors.get(event.aggregateTag.eventType) match {
      case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
      case Some(handlerFutures) =>
        for {
          handlers <- Future.sequence(handlerFutures)
          _ <- Future.sequence(handlers.map {
            case (handler: ReadSideHandler[Event], _) =>
              Source.single(new EventStreamElement(entityId, event, offset))
                .via(handler.handle())
                .runWith(Sink.ignore)
          })
        } yield {
          Done
        }
    }
  }
} 
开发者ID:TimPigden,项目名称:lagom-multitenant,代码行数:50,代码来源:ReadSideTestDriver.scala



注:本文中的com.lightbend.lagom.scaladsl.persistence.EventStreamElement类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Scala Actor类代码示例发布时间:2022-05-23
下一篇:
Scala AuthTokenService类代码示例发布时间:2022-05-23
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap