本文整理汇总了Java中akka.stream.Materializer类的典型用法代码示例。如果您正苦于以下问题:Java Materializer类的具体用法?Java Materializer怎么用?Java Materializer使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Materializer类属于akka.stream包,在下文中一共展示了Materializer类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import akka.stream.Materializer; //导入依赖的package包/类
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("KafkaProducerSystem");
final Materializer materializer = ActorMaterializer.create(system);
final ProducerSettings<byte[], String> producerSettings =
ProducerSettings
.create(system, new ByteArraySerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092");
CompletionStage<Done> done =
Source.range(1, 100)
.map(n -> n.toString())
.map(elem ->
new ProducerRecord<byte[], String>(
"topic1-ts",
0,
Instant.now().getEpochSecond(),
null,
elem))
.runWith(Producer.plainSink(producerSettings), materializer);
done.whenComplete((d, ex) -> System.out.println("sent"));
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:25,代码来源:KafkaProducer.java
示例2: testReactiveStream
import akka.stream.Materializer; //导入依赖的package包/类
@Test
public void testReactiveStream() throws InterruptedException, TimeoutException {
final ActorSystem system =
ActorSystem.create("MySystem");
final Materializer mat =
ActorMaterializer.create(system);
Source<Integer, NotUsed> source = Source.range(1, 5);
Flow<Integer, Integer, NotUsed> flow = Flow
.fromFunction(x -> x + 1);
Source<Integer, NotUsed> source2 = source.via(flow);
Sink<Integer, CompletionStage<Integer>> fold =
Sink.<Integer, Integer>fold(0, (next, total) -> total + next);
CompletionStage<Integer> integerCompletionStage = source2.runWith(fold, mat);
integerCompletionStage.thenAccept(System.out::println);
Thread.sleep(3000);
Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:24,代码来源:ReactiveStreamsTest.java
示例3: testReactiveStreamRefined
import akka.stream.Materializer; //导入依赖的package包/类
@Test
public void testReactiveStreamRefined() throws InterruptedException, TimeoutException {
final ActorSystem system =
ActorSystem.create("MySystem");
final Materializer mat =
ActorMaterializer.create(system);
Source<Integer, NotUsed> source = Source.range(1, 5).map(x -> x + 1)
.fold(0, (next, total) -> total + next);
CompletionStage<Integer> integerCompletionStage =
source.runWith(Sink.head(), mat);
integerCompletionStage.thenAccept(System.out::println);
Thread.sleep(3000);
Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:19,代码来源:ReactiveStreamsTest.java
示例4: startAll
import akka.stream.Materializer; //导入依赖的package包/类
/**
* Starts a DataCenterForwarder for each of the known data centers in the {@link DataCenterRepository}.
* @param system Actor system to create the DataCenterForwarder actors in
* @param dataRepo Repository that knows about all data centers
* @param materializer Akka streams materializer to use
* @param visibilityRepo Repository that stores the current visiblity of aggregates
* @param eventRepo Classifier that determines which additional datacenters an event should trigger replication for
* @param eventsByTagQuery Query to use to find a continuous stream of all events
* @param tag Tag to pass to {@link EventsByTagQuery} (all events must be tagged by this)
* @param currentEventsByPersistenceIdQuery Query to find all current events for a specific persistenceId
*/
public static <E> void startAll(ActorSystem system, Materializer materializer, DataCenterRepository dataRepo, VisibilityRepository visibilityRepo, Class<E> eventType,
EventsByTagQuery eventsByTagQuery, CurrentEventsByPersistenceIdQuery currentEventsByPersistenceIdQuery) {
String tag = Replication.get(system).getEventTag(eventType);
for (DataCenter dataCenter: dataRepo.getRemotes().values()) {
system.actorOf(ClusterSingletonManager.props(
BackoffSupervisor.props(
Backoff.onFailure(
Props.create(DataCenterForwarder.class, () -> new DataCenterForwarder<>(materializer, dataCenter, visibilityRepo, eventType,
eventsByTagQuery, currentEventsByPersistenceIdQuery)),
"f",
Duration.create(1, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), // TODO make these 3 configurable
0.2)
),
Done.getInstance(),
ClusterSingletonManagerSettings.create(system).withSingletonName("s")), "forwarder_" + dataCenter.getName() + "_" + tag);
}
}
开发者ID:Tradeshift,项目名称:ts-reaktive,代码行数:32,代码来源:DataCenterForwarder.java
示例5: DataCenterForwarder
import akka.stream.Materializer; //导入依赖的package包/类
/**
* Creates a new DataCenterForwarder and starts to forward events to a data center.
* @param materializer Akka streams materializer to use
* @param dataCenter Target data center to forward events to.
* @param visibilityRepo Repository that stores the current visiblity of aggregates
* @param eventRepo Classifier that determines which additional datacenters an event should trigger replication for
* @param eventsByTagQuery Query to use to find a continuous stream of all events
* @param tag Tag to pass to {@link EventsByTagQuery} (all events must be tagged by this)
* @param currentEventsByPersistenceIdQuery Query to find all current events for a specific persistenceId
*/
public DataCenterForwarder(Materializer materializer, DataCenter dataCenter, VisibilityRepository visibilityRepo, Class<E> eventType,
EventsByTagQuery eventsByTagQuery, CurrentEventsByPersistenceIdQuery currentEventsByPersistenceIdQuery) {
final Replication replication = Replication.get(context().system());
this.eventsByTagQuery = eventsByTagQuery;
this.materializer = materializer;
this.visibilityRepo = visibilityRepo;
this.classifier = replication.getEventClassifier(eventType);
this.dataCenter = dataCenter;
this.tag = replication.getEventTag(eventType);
this.localDataCenterName = replication.getLocalDataCenterName();
this.currentEventsByPersistenceIdQuery = currentEventsByPersistenceIdQuery;
this.parallelism = context().system().settings().config().getInt("ts-reaktive.replication.parallellism");
pipe(visibilityRepo.getLastEventOffset(dataCenter, tag).thenApply(LastEventOffsetKnown::new), context().dispatcher()).to(self());
log.debug("Started");
}
开发者ID:Tradeshift,项目名称:ts-reaktive,代码行数:29,代码来源:DataCenterForwarder.java
示例6: importAll
import akka.stream.Materializer; //导入依赖的package包/类
public CompletableFuture<Done> importAll() {
AtomicLong batch = new AtomicLong(0);
long start = System.currentTimeMillis();
final Materializer materializer = ActorMaterializer.create(ActorSystem.create("actor-system", ConfigFactory.load()));
Source<Page, NotUsed> channelSource = Source.fromIterator(() -> reader);
Flow<Page, List<Page>, NotUsed> processing = Flow.of(Page.class)
.filter(x -> x != null && x.getNamespace() == 0)
.buffer(cores * 100, OverflowStrategy.backpressure())
.mapAsyncUnordered(cores, x -> cleaner.clean(x))
.groupedWithin(BATCH_SIZE,
FiniteDuration.apply(1, TimeUnit.SECONDS))
.mapAsyncUnordered(cores, x -> datastore.save(x));
final CompletionStage<Done> promise = channelSource.via(processing).runForeach(x -> {
System.out.printf("Inserted: %d, Time: %d sec\n",
batch.incrementAndGet() * BATCH_SIZE,
(System.currentTimeMillis() - start) / 1000);
}, materializer);
return promise.toCompletableFuture();
}
开发者ID:crtomirmajer,项目名称:wiki2mongo,代码行数:27,代码来源:Wiki2MongoImporter.java
示例7: HomeController
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public HomeController(ActorSystem actorSystem,
Materializer mat,
WebJarAssets webJarAssets) {
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
LoggingAdapter logging = Logging.getLogger(actorSystem.eventStream(), logger.getName());
//noinspection unchecked
Source<String, Sink<String, NotUsed>> source = MergeHub.of(String.class)
.log("source", logging)
.recoverWithRetries(-1, new PFBuilder().match(Throwable.class, e -> Source.empty()).build());
Sink<String, Source<String, NotUsed>> sink = BroadcastHub.of(String.class);
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkSourcePair = source.toMat(sink, Keep.both()).run(mat);
Sink<String, NotUsed> chatSink = sinkSourcePair.first();
Source<String, NotUsed> chatSource = sinkSourcePair.second();
this.userFlow = Flow.fromSinkAndSource(chatSink, chatSource).log("userFlow", logging);
this.webJarAssets = webJarAssets;
}
开发者ID:play2-maven-plugin,项目名称:play2-maven-test-projects,代码行数:21,代码来源:HomeController.java
示例8: HomeController
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public HomeController(ActorSystem actorSystem,
Materializer mat) {
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
LoggingAdapter logging = Logging.getLogger(actorSystem.eventStream(), logger.getName());
//noinspection unchecked
Source<String, Sink<String, NotUsed>> source = MergeHub.of(String.class)
.log("source", logging)
.recoverWithRetries(-1, new PFBuilder().match(Throwable.class, e -> Source.empty()).build());
Sink<String, Source<String, NotUsed>> sink = BroadcastHub.of(String.class);
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkSourcePair = source.toMat(sink, Keep.both()).run(mat);
Sink<String, NotUsed> chatSink = sinkSourcePair.first();
Source<String, NotUsed> chatSource = sinkSourcePair.second();
this.userFlow = Flow.fromSinkAndSource(chatSink, chatSource).log("userFlow", logging);
}
开发者ID:play2-maven-plugin,项目名称:play2-maven-test-projects,代码行数:18,代码来源:HomeController.java
示例9: HomeController
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public HomeController(ActorSystem actorSystem,
Materializer mat,
WebJarsUtil webJarsUtil) {
org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
LoggingAdapter logging = Logging.getLogger(actorSystem.eventStream(), logger.getName());
//noinspection unchecked
Source<String, Sink<String, NotUsed>> source = MergeHub.of(String.class)
.log("source", logging)
.recoverWithRetries(-1, new PFBuilder().match(Throwable.class, e -> Source.empty()).build());
Sink<String, Source<String, NotUsed>> sink = BroadcastHub.of(String.class);
Pair<Sink<String, NotUsed>, Source<String, NotUsed>> sinkSourcePair = source.toMat(sink, Keep.both()).run(mat);
Sink<String, NotUsed> chatSink = sinkSourcePair.first();
Source<String, NotUsed> chatSource = sinkSourcePair.second();
this.userFlow = Flow.fromSinkAndSource(chatSink, chatSource).log("userFlow", logging);
this.webJarsUtil = webJarsUtil;
}
开发者ID:play2-maven-plugin,项目名称:play2-maven-test-projects,代码行数:21,代码来源:HomeController.java
示例10: UserActor
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public UserActor(@Assisted String id,
@Named("stocksActor") ActorRef stocksActor,
Materializer mat) {
this.id = id;
this.stocksActor = stocksActor;
this.mat = mat;
Pair<Sink<JsonNode, NotUsed>, Source<JsonNode, NotUsed>> sinkSourcePair =
MergeHub.of(JsonNode.class, 16)
.toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both())
.run(mat);
this.hubSink = sinkSourcePair.first();
Source<JsonNode, NotUsed> hubSource = sinkSourcePair.second();
Sink<JsonNode, CompletionStage<Done>> jsonSink = Sink.foreach((JsonNode json) -> {
// When the user types in a stock in the upper right corner, this is triggered,
String symbol = json.findPath("symbol").asText();
addStocks(Collections.singleton(symbol));
});
// Put the source and sink together to make a flow of hub source as output (aggregating all
// stocks as JSON to the browser) and the actor as the sink (receiving any JSON messages
// from the browse), using a coupled sink and source.
this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource)
//.log("actorWebsocketFlow", logger)
.watchTermination((n, stage) -> {
// When the flow shuts down, make sure this actor also stops.
stage.thenAccept(f -> context().stop(self()));
return NotUsed.getInstance();
});
}
开发者ID:play2-maven-plugin,项目名称:play2-maven-test-projects,代码行数:34,代码来源:UserActor.java
示例11: AuthorizationFilter
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public AuthorizationFilter(
Materializer mat,
IClientAuthConfig config,
IJwtValidation jwtValidation) {
super(mat);
this.authRequired = config.isAuthRequired();
this.jwtValidation = jwtValidation;
}
开发者ID:Azure,项目名称:device-telemetry-java,代码行数:10,代码来源:AuthorizationFilter.java
示例12: run1
import akka.stream.Materializer; //导入依赖的package包/类
private void run1() {
final ActorSystem actorSystem = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Source<Integer, NotUsed> source = Source.range(0, 10);
final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer i) -> i.toString());
final Sink<String, CompletionStage<Done>> sink = Sink.foreach(s -> System.out.println("Example1 - Number: " + s));
final RunnableGraph runnable = source.via(flow).to(sink);
runnable.run(materializer);
actorSystem.terminate();
}
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:11,代码来源:RSExample1.java
示例13: run2
import akka.stream.Materializer; //导入依赖的package包/类
private void run2() {
final ActorSystem actorSystem = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(actorSystem);
Source.range(0, 10)
.map(Object::toString)
.runForeach(s -> System.out.println("Example 2 - Number: " + s), materializer);
actorSystem.terminate();
}
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:9,代码来源:RSExample1.java
示例14: Routes
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public Routes(Application application,
Assets assets, WebJarAssets webJars, Materializer materializer) {
this.application = application;
this.assets = assets;
this.webJars = webJars;
this.materializer = materializer;
this.router = buildRouter();
}
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:11,代码来源:Routes.java
示例15: LoadTestServiceImpl
import akka.stream.Materializer; //导入依赖的package包/类
@Inject
public LoadTestServiceImpl(FriendService friendService, ActivityStreamService activityService,
ChirpService chirpService, Materializer materializer) {
this.friendService = friendService;
this.activityService = activityService;
this.chirpService = chirpService;
this.materializer = materializer;
}
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:9,代码来源:LoadTestServiceImpl.java
示例16: launchActors
import akka.stream.Materializer; //导入依赖的package包/类
private void launchActors(final Injector injector) {
LOGGER.info().setMessage("Launching actors").log();
// Retrieve the actor system
final ActorSystem actorSystem = injector.getInstance(ActorSystem.class);
// Create the status actor
actorSystem.actorOf(Props.create(Status.class), "status");
// Create the telemetry connection actor
actorSystem.actorOf(Props.create(Telemetry.class, injector.getInstance(MetricsFactory.class)), "telemetry");
// Load supplemental routes
final ImmutableList.Builder<SupplementalRoutes> supplementalHttpRoutes = ImmutableList.builder();
_configuration.getSupplementalHttpRoutesClass().ifPresent(clazz -> {
supplementalHttpRoutes.add(injector.getInstance(clazz));
});
// Create and bind Http server
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Routes routes = new Routes(
actorSystem,
injector.getInstance(PeriodicMetrics.class),
_configuration.getHttpHealthCheckPath(),
_configuration.getHttpStatusPath(),
supplementalHttpRoutes.build());
final Http http = Http.get(actorSystem);
final akka.stream.javadsl.Source<IncomingConnection, CompletionStage<ServerBinding>> binding = http.bind(
ConnectHttp.toHost(
_configuration.getHttpHost(),
_configuration.getHttpPort()),
materializer);
binding.to(
akka.stream.javadsl.Sink.foreach(
connection -> connection.handleWith(routes.flow(), materializer)))
.run(materializer);
}
开发者ID:ArpNetworking,项目名称:metrics-aggregator-daemon,代码行数:38,代码来源:Main.java
示例17: AkkaHttpJavaClient
import akka.stream.Materializer; //导入依赖的package包/类
public AkkaHttpJavaClient(ActorSystem sys, Materializer mat) {
system = sys;
materializer = mat;
settings = Settings.SettingsProvider.get(system);
connectionFlow = Http.get(system).outgoingConnection(ConnectHttp.toHost(settings.HOST, settings.PORT));
poolClientFlow = Http.get(system).cachedHostConnectionPool(ConnectHttp.toHost(settings.HOST, settings.PORT), materializer);
}
开发者ID:ferhtaydn,项目名称:akka-http-java-client,代码行数:8,代码来源:AkkaHttpJavaClient.java
示例18: EventRoute
import akka.stream.Materializer; //导入依赖的package包/类
/**
* Creates a new EventRoute
* @param journal The cassandra journal to read from
* @param tagName The tag name of the events that this route should query
*/
public EventRoute(Materializer materializer, EventsByTagQuery journal, EventEnvelopeSerializer serializer, String tagName) {
this.materializer = materializer;
this.journal = journal;
this.tagName = tagName;
this.serializer = serializer;
}
开发者ID:Tradeshift,项目名称:ts-reaktive,代码行数:12,代码来源:EventRoute.java
示例19: S3
import akka.stream.Materializer; //导入依赖的package包/类
public S3(ActorSystem system, Materializer materializer, EventEnvelopeSerializer serializer, String bucket, String prefix) {
this.materializer = materializer;
this.serializer = serializer;
this.bucket = bucket;
this.bucketKeyPrefix = prefix.endsWith("/") ? prefix : prefix + "/";
this.client = S3Client.create(system, materializer);
}
开发者ID:Tradeshift,项目名称:ts-reaktive,代码行数:8,代码来源:S3.java
示例20: transform
import akka.stream.Materializer; //导入依赖的package包/类
public static CompletableFuture<InternalResponse> transform(HttpResponse response, Materializer mat) {
CompletableFuture<ByteString> body =
response.entity()
.getDataBytes()
.runWith(Sink.<ByteString>head(), mat)
.toCompletableFuture();
return body.thenApply((responseBody) -> {
int statusCode = response.status().intValue();
Map<String, String> headers = extractResponseHeaders(response.getHeaders());
return new InternalResponse(statusCode, responseBody.utf8String(), headers);
});
}
开发者ID:owainlewis,项目名称:akka-http-clj,代码行数:14,代码来源:ResponseTransformer.java
注:本文中的akka.stream.Materializer类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论