本文整理汇总了Scala中com.lightbend.lagom.scaladsl.persistence.EventStreamElement类的典型用法代码示例。如果您正苦于以下问题:Scala EventStreamElement类的具体用法?Scala EventStreamElement怎么用?Scala EventStreamElement使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了EventStreamElement类的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Scala代码示例。
示例1: HelloServiceImpl
//设置package包名称以及导入依赖的类
package sample.helloworld.impl
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import sample.helloworld.api.model.GreetingMessage
import sample.helloworld.api.HelloService
class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {
override def hello(id: String) = ServiceCall { _ =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id, None))
}
override def useGreeting(id: String) = ServiceCall { request =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def greetingsTopic(): Topic[GreetingMessage] = {
TopicProducer.singleStreamWithOffset {
offset =>
persistentEntityRegistry.eventStream(HelloEventTag.instance, offset)
.map(ev => (convertEvent(ev), offset))
}
}
private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
helloEvent.event match {
case GreetingMessageChanged(msg) => GreetingMessage(msg)
}
}
}
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:44,代码来源:HelloServiceImpl.scala
示例2: MessageEventProcessor
//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl
import akka.Done
import com.datastax.driver.core.{BoundStatement, PreparedStatement}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}
import scala.concurrent.{ExecutionContext, Future}
class MessageEventProcessor(cassandraReadSide: CassandraReadSide, cassandraSession: CassandraSession)
(implicit ec: ExecutionContext) extends ReadSideProcessor[MessageEvent] {
private var insertWordStmt: PreparedStatement = _
override def buildHandler(): ReadSideHandler[MessageEvent] = {
cassandraReadSide.builder[MessageEvent]("message_offset")
.setGlobalPrepare(createTable)
.setPrepare { tags =>
prepareStatements
}
.setEventHandler[MessageSaved](insertWord)
.build()
}
override def aggregateTags: Set[AggregateEventTag[MessageEvent]] = Set(MessageEventTag.INSTANCE)
private def createTable(): Future[Done] = {
for {
_ <- cassandraSession.executeCreateTable(
""" CREATE TABLE IF NOT EXISTS wordcounttest (
words text,
insertion_time timestamp,
PRIMARY KEY (words,insertion_time)
)WITH CLUSTERING ORDER BY (insertion_time DESC)
""")
} yield Done
}
private def prepareStatements(): Future[Done] = {
for {
insert <- cassandraSession.prepare(
"""insert into wordcounttest(words ,insertion_time) values(? ,toTimestamp(now())) """)
} yield {
insertWordStmt = insert
Done
}
}
private def insertWord(started: EventStreamElement[MessageSaved]): Future[List[BoundStatement]] = {
Future.successful {
val words = started.event.msg.replaceAll("[^\\p{L}\\p{Nd}]+", " ").split(" ").toList
words.map{ word=> insertWordStmt.bind(word) }
}
}
}
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:59,代码来源:MessageEventProcessor.scala
示例3: ReadSideTestDriver
//设置package包名称以及导入依赖的类
package sample.helloworldconsumer.impl
import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}
import scala.concurrent.{ExecutionContext, Future}
class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]
override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
val processor = processorFactory
val eventTags = processor.aggregateTags
val handler = processor.buildHandler()
val future = for {
_ <- handler.globalPrepare()
offset <- handler.prepare(eventTags.head)
} yield {
handler -> offset
}
synchronized {
val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
processors += (eventTags.head.eventType -> (handlers :+ future))
}
}
def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
processors.get(event.aggregateTag.eventType) match {
case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
case Some(handlerFutures) =>
for {
handlers <- Future.sequence(handlerFutures)
_ <- Future.sequence(handlers.map {
case (handler: ReadSideHandler[Event], _) =>
Source.single(new EventStreamElement(entityId, event, offset))
.via(handler.handle())
.runWith(Sink.ignore)
})
} yield {
Done
}
}
}
}
开发者ID:knoldus,项目名称:lagom-scala-wordcount.g8,代码行数:52,代码来源:ReadSideTestDriver.scala
示例4: BasketServiceImpl
//设置package包名称以及导入依赖的类
package demo.impl.basket
import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.InvalidCommandException
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, ExtraTransportExceptions, Item}
import scala.collection.immutable
import scala.concurrent.ExecutionContext
class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
extends BasketService with ExtraTransportExceptions{
override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
}
override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item))
.map(_ => NotUsed)
.recoverWith {
case e: InvalidCommandException => throw BadRequest(e.message)
}
}
override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
}
override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll)
.map(x => NotUsed)
.recoverWith {
case e: InvalidCommandException => throw BadRequest(e.message)
}
}
override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder)
.map(x => NotUsed)
.recoverWith {
case e: InvalidCommandException => throw BadRequest(e.message)
}
}
override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
persistentEntities.eventStream(tag, offset).collect {
case EventStreamElement(t, OrderPlaced(id, basket), o) => {
demo.api.basket.OrderPlaced(id, basket) -> o
}
}
}
}
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:57,代码来源:BasketServiceImpl.scala
示例5: BasketServiceImpl
//设置package包名称以及导入依赖的类
package demo.impl.basket
import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.PersistentEntity.InvalidCommandException
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, ExtraTransportExceptions, Item}
import scala.collection.immutable
import scala.concurrent.ExecutionContext
class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
extends BasketService with ExtraTransportExceptions{
override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
}
override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item))
.map(_ => NotUsed)
.recoverWith {
case e: InvalidCommandException => throw BadRequest(e.message)
}
}
override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
}
override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll)
.map(x => NotUsed)
.recoverWith {
case e: InvalidCommandException => throw BadRequest(e.message)
}
}
override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder).map(_ => NotUsed)
}
override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
persistentEntities.eventStream(tag, offset).collect {
case EventStreamElement(t, OrderPlaced(id, basket), o) => {
demo.api.basket.OrderPlaced(id, basket) -> o
}
}
}
}
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:53,代码来源:BasketServiceImpl.scala
示例6: BasketServiceImpl
//设置package包名称以及导入依赖的类
package demo.impl.basket
import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import demo.api.basket.{Basket, BasketService, Item}
import scala.collection.immutable
import scala.concurrent.ExecutionContext
class BasketServiceImpl(persistentEntities: PersistentEntityRegistry)(implicit ec: ExecutionContext)
extends BasketService {
override def getBasket(basketId: String): ServiceCall[NotUsed, Basket] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetBasket)
}
override def addItem(basketId: String): ServiceCall[Item, NotUsed] = ServiceCall { item =>
persistentEntities.refFor[BasketEntity](basketId).ask(AddItem(item)).map(_ => NotUsed)
}
override def getTotal(basketId: String): ServiceCall[NotUsed, Int] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(GetTotal)
}
override def clearAll(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(ClearAll).map(x => NotUsed)
}
override def placeOrder(basketId: String): ServiceCall[NotUsed, NotUsed] = ServiceCall { req =>
persistentEntities.refFor[BasketEntity](basketId).ask(PlaceOrder).map(_ => NotUsed)
}
override def placedOrders: Topic[demo.api.basket.OrderPlaced] =
TopicProducer.taggedStreamWithOffset(BasketEntityEvent.Tag.allTags.to[immutable.Seq]) { (tag, offset) =>
persistentEntities.eventStream(tag, offset).collect {
case EventStreamElement(t, OrderPlaced(id, basket), o) => {
demo.api.basket.OrderPlaced(id, basket) -> o
}
}
}
}
开发者ID:tommpy,项目名称:demo-lagom-checkout,代码行数:44,代码来源:BasketServiceImpl.scala
示例7: ReadSideTestDriver
//设置package包名称以及导入依赖的类
package com.example.auction.item.impl
import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}
import scala.concurrent.{ExecutionContext, Future}
class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]
override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
val processor = processorFactory
val eventTags = processor.aggregateTags
val handler = processor.buildHandler()
val future = for {
_ <- handler.globalPrepare()
offset <- handler.prepare(eventTags.head)
} yield {
handler -> offset
}
synchronized {
val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
processors += (eventTags.head.eventType -> (handlers :+ future))
}
}
def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
processors.get(event.aggregateTag.eventType) match {
case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
case Some(handlerFutures) =>
for {
handlers <- Future.sequence(handlerFutures)
_ <- Future.sequence(handlers.map {
case (handler: ReadSideHandler[Event], _) =>
Source.single(new EventStreamElement(entityId, event, offset))
.via(handler.handle())
.runWith(Sink.ignore)
})
} yield {
Done
}
}
}
}
开发者ID:lagom,项目名称:online-auction-scala,代码行数:50,代码来源:ReadSideTestDriver.scala
示例8: MonitorServiceImpl
//设置package包名称以及导入依赖的类
package org.wex.cmsfs.monitor.impl
import java.util.UUID
import akka.persistence.query.Offset
import com.datastax.driver.core.utils.UUIDs
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import org.wex.cmsfs.monitor.api
import org.wex.cmsfs.monitor.api._
import scala.concurrent.{ExecutionContext, Future}
class MonitorServiceImpl(registry: PersistentEntityRegistry)(implicit ec: ExecutionContext) extends MonitorService {
override def createMonitor: ServiceCall[Monitor, Monitor] = ServiceCall { m =>
val id = UUIDs.timeBased()
val monitor = Monitor(Some(id), m.cron, m.mode, m.monitorId, m.connectorId, MonitorStatus.Created)
val command = CreateMonitor(monitor)
entityRef(id).ask(command).map(_ => monitor)
}
override def monitorCollectEvents = TopicProducer.taggedStreamWithOffset(MonitorEvent.Tag.allTags.toList) { (tag, offset) =>
println(11)
registry.eventStream(tag, offset).mapAsync(1)(p => Future{
println(offset);
(api.MonitorCreated(UUIDs.random(), 1, 1), offset)
})
}
private def convertEvent(eventStreamElement: EventStreamElement[MonitorEvent]): Future[(api.MonitorEvent, Offset)] = {
eventStreamElement match {
case EventStreamElement(_, MonitorCreated(m), offset) =>
Future((api.MonitorCreated(m.id.get, m.monitorId, m.connectorId), offset))
}
}
private def entityRef(id: UUID) = entityRefString(id.toString)
private def entityRefString(id: String) = registry.refFor[MonitorEntity](id)
}
开发者ID:shinhwazx160,项目名称:test,代码行数:43,代码来源:MonitorServiceImpl.scala
示例9: MessageRepository
//设置package包名称以及导入依赖的类
package com.example.lagompractice.impl
import java.sql.Connection
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, ReadSideProcessor}
import com.lightbend.lagom.scaladsl.persistence.jdbc.{JdbcReadSide, JdbcSession}
import scala.concurrent.ExecutionContext
class MessageRepository(session: JdbcSession)(implicit ec: ExecutionContext) {
}
class MessageEventProcessor(readSide: JdbcReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[MessageEvent] {
import JdbcSession.tryWith
override def aggregateTags = MessageEvent.Tag.allTags
override def buildHandler() = {
readSide.builder[MessageEvent]("messageEventOffset")
.setGlobalPrepare(createTables)
.setEventHandler[MessageCreated](processMessageChanged)
.build()
}
private def createTables(connection: Connection): Unit = {
tryWith(connection.prepareStatement("""
CREATE TABLE IF NOT EXISTS messages (
messageId varchar(36),
message TEXT,
PRIMARY KEY (messageId)
)
"""))(_.execute())
}
private def processMessageChanged(connection: Connection, eventElement: EventStreamElement[MessageCreated]): Unit = {
tryWith(connection.prepareStatement("INSERT INTO messages (messageId, message) VALUES (?, ?)")) { ps =>
ps.setString(1, eventElement.event.message.id.toString)
ps.setString(2, eventElement.event.message.message)
ps.execute()
}
}
/*
private def createTables(connection: Connection): Unit = {
tryWith(connection.prepareStatement("INSERT INTO messages (message) VALUES (?)")) { ps =>
ps.execute()
}
}
*/
}
开发者ID:Saneyan,项目名称:lagom-practice,代码行数:53,代码来源:MessageRepository.scala
示例10: AddServiceImpl
//设置package包名称以及导入依赖的类
package com.orion.add.impl
import akka.Done
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import com.orion.add.api
import com.orion.add.api.AddService
class AddServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends AddService {
override def add(): ServiceCall[api.AddNumber, String] = ServiceCall { request =>
val ref = persistentEntityRegistry.refFor[NumberEntity]("constant")
ref.ask(AddNumber(request.number))
}
override def curry(): ServiceCall[api.CurryWith, Done] = ServiceCall { request =>
val ref = persistentEntityRegistry.refFor[NumberEntity]("constant")
ref.ask(CurryWith(request.number))
}
override def additionTopic(): Topic[api.CurriedWith] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(NumberEvent.Tags, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(numberEvent: EventStreamElement[NumberEvent]): api.CurriedWith = {
numberEvent.event match {
case CurriedWith(num) => api.CurriedWith(num)
}
}
}
开发者ID:utsavgupta,项目名称:add-lagom,代码行数:38,代码来源:AddServiceImpl.scala
示例11: HelloServiceImpl
//设置package包名称以及导入依赖的类
package com.example.hello.impl
import com.example.hello.api
import com.example.hello.api.{HelloService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {
override def hello(id: String) = ServiceCall { _ =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id))
}
override def useGreeting(id: String) = ServiceCall { request =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(HelloEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): api.GreetingMessageChanged = {
helloEvent.event match {
case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
}
}
}
开发者ID:namelos,项目名称:lagom-spike,代码行数:43,代码来源:HelloServiceImpl.scala
示例12: IdentityEventProcessor
//设置package包名称以及导入依赖的类
package io.digitalcat.publictransportation.services.identity.impl
import java.util.UUID
import akka.Done
import com.datastax.driver.core.PreparedStatement
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.{CassandraReadSide, CassandraSession}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEventTag, EventStreamElement, ReadSideProcessor}
import scala.concurrent.{ExecutionContext, Future}
class IdentityEventProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[IdentityEvent] {
private var insertUserStatement: PreparedStatement = _
private var reportIdToReservedUsernamesStatement: PreparedStatement = _
private var reportIdToReservedEmailsStatement: PreparedStatement = _
override def buildHandler(): ReadSideHandler[IdentityEvent] = {
readSide.builder[IdentityEvent]("identityEventOffset")
.setPrepare { tag =>
prepareStatements()
}.setEventHandler[UserCreated](insertUser)
.build()
}
override def aggregateTags: Set[AggregateEventTag[IdentityEvent]] = {
IdentityEvent.Tag.allTags
}
private def prepareStatements(): Future[Done] = {
for {
insertUser <- session.prepare("INSERT INTO users(id, client_id, username, email, first_name, last_name, hashed_password) VALUES (?, ?, ?, ?, ?, ?, ?)")
reportIdToReservedUsernames <- session.prepare("UPDATE reserved_usernames SET user_id = ? WHERE username = ?")
reportIdToReservedEmails <- session.prepare("UPDATE reserved_emails SET user_id = ? WHERE email = ?")
} yield {
insertUserStatement = insertUser
reportIdToReservedUsernamesStatement = reportIdToReservedUsernames
reportIdToReservedEmailsStatement = reportIdToReservedEmails
Done
}
}
private def insertUser(user: EventStreamElement[UserCreated]) = {
Future.successful(
List(
insertUserStatement.bind(
user.event.userId,
UUID.fromString(user.entityId),
user.event.username,
user.event.email,
user.event.firstName,
user.event.lastName,
user.event.hashedPassword
),
reportIdToReservedUsernamesStatement.bind(user.event.userId, user.event.username),
reportIdToReservedEmailsStatement.bind(user.event.userId, user.event.email)
)
)
}
}
开发者ID:dpalinic,项目名称:lagom-jwt-authentication,代码行数:61,代码来源:IdentityEventProcessor.scala
示例13: HelloServiceImpl
//设置package包名称以及导入依赖的类
package sample.helloworld.impl
import akka.{Done, NotUsed}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import sample.helloworld.api.model.GreetingMessage
import sample.helloworld.api.HelloService
class HelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends HelloService {
override def hello(id: String): ServiceCall[NotUsed, _root_.sample.helloworld.impl.Hello#ReplyType] = ServiceCall { _ =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id, None))
}
override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ServiceCall { request =>
// Look up the Hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[HelloEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def greetingsTopic(): Topic[GreetingMessage] = {
TopicProducer.singleStreamWithOffset {
offset =>
persistentEntityRegistry.eventStream(HelloEventTag.instance, offset)
.map(ev => (convertEvent(ev), offset))
}
}
private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
helloEvent.event match {
case GreetingMessageChanged(msg) => GreetingMessage(msg)
}
}
}
开发者ID:knoldus,项目名称:lagom-play-spike.g8,代码行数:45,代码来源:HelloServiceImpl.scala
示例14: GithubServiceImpl
//设置package包名称以及导入依赖的类
package com.onedrop.github.impl
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
import com.onedrop.github
import com.onedrop.github.api.GithubService
class GithubServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends GithubService {
override def getOrg(org: String) = ServiceCall { _ =>
// Look up the github entity for the given ID.
val ref = persistentEntityRegistry.refFor[GithubEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id))
}
override def getMembers(org: String) = ServiceCall { request =>
// Look up the github entity for the given ID.
val ref = persistentEntityRegistry.refFor[GithubEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def getRepos(org: String): Topic[github.api.GreetingMessageChanged] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(GithubEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(helloEvent: EventStreamElement[GithubEvent]): github.api.GreetingMessageChanged = {
helloEvent.event match {
case GreetingMessageChanged(msg) => github.api.GreetingMessageChanged(helloEvent.entityId, msg)
}
}
}
开发者ID:jewelsjacobs,项目名称:one-drop-test,代码行数:43,代码来源:GithubServiceImpl.scala
示例15: LagomhelloServiceImpl
//设置package包名称以及导入依赖的类
package de.breitbandig.lagomhello.impl
import de.breitbandig.lagomhello.api
import de.breitbandig.lagomhello.api.{LagomhelloService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
class LagomhelloServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends LagomhelloService {
override def hello(id: String) = ServiceCall { _ =>
// Look up the lagom-hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[LagomhelloEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id))
}
override def useGreeting(id: String) = ServiceCall { request =>
// Look up the lagom-hello entity for the given ID.
val ref = persistentEntityRegistry.refFor[LagomhelloEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(LagomhelloEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(helloEvent: EventStreamElement[LagomhelloEvent]): api.GreetingMessageChanged = {
helloEvent.event match {
case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
}
}
}
开发者ID:aanno,项目名称:lagom-hello,代码行数:43,代码来源:LagomhelloServiceImpl.scala
示例16: newPosts
//设置package包名称以及导入依赖的类
package docs.home.scaladsl.persistence
import com.lightbend.lagom.scaladsl.persistence.ReadSide
import com.lightbend.lagom.scaladsl.persistence.PersistentEntityRegistry
import akka.NotUsed
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.Service
import akka.stream.scaladsl.Source
import akka.persistence.query.NoOffset
import com.lightbend.lagom.scaladsl.persistence.EventStreamElement
import scala.concurrent.Future
trait BlogServiceImpl3 {
trait BlogService extends Service {
def newPosts(): ServiceCall[NotUsed, Source[PostSummary, _]]
override def descriptor = ???
}
//#register-event-processor
class BlogServiceImpl(
persistentEntityRegistry: PersistentEntityRegistry,
readSide: ReadSide,
myDatabase: MyDatabase) extends BlogService {
readSide.register[BlogEvent](new BlogEventProcessor(myDatabase))
//#register-event-processor
//#event-stream
override def newPosts(): ServiceCall[NotUsed, Source[PostSummary, _]] =
ServiceCall { request =>
val response: Source[PostSummary, NotUsed] =
persistentEntityRegistry.eventStream(BlogEvent.Tag.forEntityId(""), NoOffset)
.collect {
case EventStreamElement(entityId, event: PostAdded, offset) =>
PostSummary(event.postId, event.content.title)
}
Future.successful(response)
}
//#event-stream
}
}
开发者ID:lagom,项目名称:lagom,代码行数:44,代码来源:BlogServiceImpl3.scala
示例17: CassandraReadSideImpl
//设置package包名称以及导入依赖的类
package com.lightbend.lagom.internal.scaladsl.persistence.cassandra
import akka.Done
import akka.actor.ActorSystem
import com.datastax.driver.core.BoundStatement
import com.lightbend.lagom.internal.persistence.cassandra.CassandraOffsetStore
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.cassandra.CassandraReadSide.ReadSideHandlerBuilder
import com.lightbend.lagom.scaladsl.persistence.cassandra.{ CassandraReadSide, CassandraSession }
import com.lightbend.lagom.scaladsl.persistence.{ AggregateEvent, AggregateEventTag, EventStreamElement }
import scala.collection.immutable
import scala.concurrent.Future
import scala.reflect.ClassTag
private[lagom] final class CassandraReadSideImpl(
system: ActorSystem, session: CassandraSession, offsetStore: CassandraOffsetStore
) extends CassandraReadSide {
private val dispatcher = system.settings.config.getString("lagom.persistence.read-side.use-dispatcher")
implicit val ec = system.dispatchers.lookup(dispatcher)
override def builder[Event <: AggregateEvent[Event]](eventProcessorId: String): ReadSideHandlerBuilder[Event] = {
new ReadSideHandlerBuilder[Event] {
import CassandraAutoReadSideHandler.Handler
private var prepareCallback: AggregateEventTag[Event] => Future[Done] = tag => Future.successful(Done)
private var globalPrepareCallback: () => Future[Done] = () => Future.successful(Done)
private var handlers = Map.empty[Class[_ <: Event], Handler[Event]]
override def setGlobalPrepare(callback: () => Future[Done]): ReadSideHandlerBuilder[Event] = {
globalPrepareCallback = callback
this
}
override def setPrepare(callback: (AggregateEventTag[Event]) => Future[Done]): ReadSideHandlerBuilder[Event] = {
prepareCallback = callback
this
}
override def setEventHandler[E <: Event: ClassTag](handler: EventStreamElement[E] => Future[immutable.Seq[BoundStatement]]): ReadSideHandlerBuilder[Event] = {
val eventClass = implicitly[ClassTag[E]].runtimeClass.asInstanceOf[Class[Event]]
handlers += (eventClass -> handler.asInstanceOf[Handler[Event]])
this
}
override def build(): ReadSideHandler[Event] = {
new CassandraAutoReadSideHandler[Event](session, offsetStore, handlers, globalPrepareCallback, prepareCallback, eventProcessorId, dispatcher)
}
}
}
}
开发者ID:lagom,项目名称:lagom,代码行数:53,代码来源:CassandraReadSideImpl.scala
示例18: ReferentialwebcrawlerServiceImpl
//设置package包名称以及导入依赖的类
package com.andycot.referentialwebcrawler.impl
import com.andycot.referentialwebcrawler.api
import com.andycot.referentialwebcrawler.api.{ReferentialwebcrawlerService}
import com.lightbend.lagom.scaladsl.api.ServiceCall
import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.broker.TopicProducer
import com.lightbend.lagom.scaladsl.persistence.{EventStreamElement, PersistentEntityRegistry}
class ReferentialwebcrawlerServiceImpl(persistentEntityRegistry: PersistentEntityRegistry) extends ReferentialwebcrawlerService {
override def hello(id: String) = ServiceCall { _ =>
// Look up the referential-webcrawler entity for the given ID.
val ref = persistentEntityRegistry.refFor[ReferentialwebcrawlerEntity](id)
// Ask the entity the Hello command.
ref.ask(Hello(id))
}
override def useGreeting(id: String) = ServiceCall { request =>
// Look up the referential-webcrawler entity for the given ID.
val ref = persistentEntityRegistry.refFor[ReferentialwebcrawlerEntity](id)
// Tell the entity to use the greeting message specified.
ref.ask(UseGreetingMessage(request.message))
}
override def greetingsTopic(): Topic[api.GreetingMessageChanged] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(ReferentialwebcrawlerEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(helloEvent: EventStreamElement[ReferentialwebcrawlerEvent]): api.GreetingMessageChanged = {
helloEvent.event match {
case GreetingMessageChanged(msg) => api.GreetingMessageChanged(helloEvent.entityId, msg)
}
}
}
开发者ID:frferrari,项目名称:referential-webcrawler,代码行数:43,代码来源:ReferentialwebcrawlerServiceImpl.scala
示例19: ReadSideTestDriver
//设置package包名称以及导入依赖的类
package optrak.lagomtest.utils
import akka.Done
import akka.persistence.query.Offset
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, EventStreamElement, ReadSide, ReadSideProcessor}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import scala.concurrent.{ExecutionContext, Future}
class ReadSideTestDriver(implicit val materializer: Materializer, ec: ExecutionContext) extends ReadSide {
private var processors = Map.empty[Class[_], Seq[Future[(ReadSideHandler[_], Offset)]]]
override def register[Event <: AggregateEvent[Event]](processorFactory: => ReadSideProcessor[Event]): Unit = {
val processor = processorFactory
val eventTags = processor.aggregateTags
val handler = processor.buildHandler()
val future = for {
_ <- handler.globalPrepare()
offset <- handler.prepare(eventTags.head)
} yield {
handler -> offset
}
synchronized {
val handlers = processors.getOrElse(eventTags.head.eventType, Nil)
processors += (eventTags.head.eventType -> (handlers :+ future))
}
}
def feed[Event <: AggregateEvent[Event]](entityId: String, event: Event, offset: Offset): Future[Done] = {
processors.get(event.aggregateTag.eventType) match {
case None => sys.error(s"No processor registered for Event ${event.aggregateTag.eventType.getCanonicalName}")
case Some(handlerFutures) =>
for {
handlers <- Future.sequence(handlerFutures)
_ <- Future.sequence(handlers.map {
case (handler: ReadSideHandler[Event], _) =>
Source.single(new EventStreamElement(entityId, event, offset))
.via(handler.handle())
.runWith(Sink.ignore)
})
} yield {
Done
}
}
}
}
开发者ID:TimPigden,项目名称:lagom-multitenant,代码行数:50,代码来源:ReadSideTestDriver.scala
注:本文中的com.lightbend.lagom.scaladsl.persistence.EventStreamElement类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论