本文整理汇总了Java中akka.stream.javadsl.Source类的典型用法代码示例。如果您正苦于以下问题:Java Source类的具体用法?Java Source怎么用?Java Source使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Source类属于akka.stream.javadsl包,在下文中一共展示了Source类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: sendEmail
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public CompletableFuture<String> sendEmail(String to, String subject, String content) {
return ws.url(String.format("https://api.mailgun.net/v3/%s/messages", mailgunDomain))
.setAuth("api", mailgunKey, WSAuthScheme.BASIC)
.post(Source.from(Arrays.asList(
new DataPart("from", mailgunFrom),
new DataPart("to", to),
new DataPart("subject", subject),
new DataPart("html", content))))
.handleAsync((response, throwable) -> {
if(throwable != null) {
log.error("sendEmail: Exception", throwable);
} else if(response.getStatus() != 200) {
log.error("sendEmail: Non-200 response, status={}, body={}", response.getStatus(), response.getBody());
} else {
log.error("sendEmail: OK, status={}, body={}", response.getStatus(), response.getBody());
return response.asJson().get("id").textValue();
}
return null;
}).toCompletableFuture();
}
开发者ID:bekce,项目名称:oauthly,代码行数:22,代码来源:MailgunService.java
示例2: main
import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("KafkaProducerSystem");
final Materializer materializer = ActorMaterializer.create(system);
final ProducerSettings<byte[], String> producerSettings =
ProducerSettings
.create(system, new ByteArraySerializer(), new StringSerializer())
.withBootstrapServers("localhost:9092");
CompletionStage<Done> done =
Source.range(1, 100)
.map(n -> n.toString())
.map(elem ->
new ProducerRecord<byte[], String>(
"topic1-ts",
0,
Instant.now().getEpochSecond(),
null,
elem))
.runWith(Producer.plainSink(producerSettings), materializer);
done.whenComplete((d, ex) -> System.out.println("sent"));
}
开发者ID:jeqo,项目名称:talk-kafka-messaging-logs,代码行数:25,代码来源:KafkaProducer.java
示例3: getLiveChirps
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public ServiceCall<LiveChirpsRequest, Source<Chirp, ?>> getLiveChirps() {
return req -> chirps.getRecentChirps(req.userIds).thenApply(recentChirps -> {
List<Source<Chirp, ?>> sources = new ArrayList<>();
for (String userId : req.userIds) {
sources.add(topic.subscriber(userId));
}
HashSet<String> users = new HashSet<>(req.userIds);
Source<Chirp, ?> publishedChirps = Source.from(sources).flatMapMerge(sources.size(), s -> s)
.filter(c -> users.contains(c.userId));
// We currently ignore the fact that it is possible to get duplicate chirps
// from the recent and the topic. That can be solved with a de-duplication stage.
return Source.from(recentChirps).concat(publishedChirps);
});
}
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:17,代码来源:ChirpServiceImpl.java
示例4: shouldIncludeSomeOldChirpsInLiveFeed
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldIncludeSomeOldChirpsInLiveFeed() throws Exception {
ChirpService chirpService = server.client(ChirpService.class);
Chirp chirp1 = new Chirp("usr3", "hi 1");
chirpService.addChirp("usr3").invoke(chirp1).toCompletableFuture().get(3, SECONDS);
Chirp chirp2 = new Chirp("usr4", "hi 2");
chirpService.addChirp("usr4").invoke(chirp2).toCompletableFuture().get(3, SECONDS);
LiveChirpsRequest request = new LiveChirpsRequest(TreePVector.<String>empty().plus("usr3").plus("usr4"));
eventually(FiniteDuration.create(10, SECONDS), () -> {
Source<Chirp, ?> chirps = chirpService.getLiveChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
probe.request(10);
probe.expectNextUnordered(chirp1, chirp2);
Chirp chirp3 = new Chirp("usr4", "hi 3");
chirpService.addChirp("usr4").invoke(chirp3).toCompletableFuture().get(3, SECONDS);
probe.expectNext(chirp3);
probe.cancel();
});
}
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:26,代码来源:ChirpServiceTest.java
示例5: shouldRetrieveOldChirps
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldRetrieveOldChirps() throws Exception {
ChirpService chirpService = server.client(ChirpService.class);
Chirp chirp1 = new Chirp("usr5", "msg 1");
chirpService.addChirp("usr5").invoke(chirp1).toCompletableFuture().get(3, SECONDS);
Chirp chirp2 = new Chirp("usr6", "msg 2");
chirpService.addChirp("usr6").invoke(chirp2).toCompletableFuture().get(3, SECONDS);
HistoricalChirpsRequest request = new HistoricalChirpsRequest(Instant.now().minusSeconds(20),
TreePVector.<String>empty().plus("usr5").plus("usr6"));
eventually(FiniteDuration.create(10, SECONDS), () -> {
Source<Chirp, ?> chirps = chirpService.getHistoricalChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
probe.request(10);
probe.expectNextUnordered(chirp1, chirp2);
probe.expectComplete();
});
}
开发者ID:lagom,项目名称:lagom-java-chirper-example,代码行数:22,代码来源:ChirpServiceTest.java
示例6: testReactiveStream
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void testReactiveStream() throws InterruptedException, TimeoutException {
final ActorSystem system =
ActorSystem.create("MySystem");
final Materializer mat =
ActorMaterializer.create(system);
Source<Integer, NotUsed> source = Source.range(1, 5);
Flow<Integer, Integer, NotUsed> flow = Flow
.fromFunction(x -> x + 1);
Source<Integer, NotUsed> source2 = source.via(flow);
Sink<Integer, CompletionStage<Integer>> fold =
Sink.<Integer, Integer>fold(0, (next, total) -> total + next);
CompletionStage<Integer> integerCompletionStage = source2.runWith(fold, mat);
integerCompletionStage.thenAccept(System.out::println);
Thread.sleep(3000);
Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:24,代码来源:ReactiveStreamsTest.java
示例7: testReactiveStreamRefined
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void testReactiveStreamRefined() throws InterruptedException, TimeoutException {
final ActorSystem system =
ActorSystem.create("MySystem");
final Materializer mat =
ActorMaterializer.create(system);
Source<Integer, NotUsed> source = Source.range(1, 5).map(x -> x + 1)
.fold(0, (next, total) -> total + next);
CompletionStage<Integer> integerCompletionStage =
source.runWith(Sink.head(), mat);
integerCompletionStage.thenAccept(System.out::println);
Thread.sleep(3000);
Await.ready(system.terminate(), Duration.apply(10, TimeUnit.SECONDS));
}
开发者ID:dhinojosa,项目名称:intro_to_reactive,代码行数:19,代码来源:ReactiveStreamsTest.java
示例8: getHistoricalChirps
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
public ServiceCall<HistoricalChirpsRequest, Source<Chirp, ?>> getHistoricalChirps() {
return req -> {
List<Source<Chirp, ?>> sources = new ArrayList<>();
for (String uid : req.getUserIds()) {
Source<Chirp, NotUsed> select = db
.select("SELECT * FROM chirp WHERE userId = ? AND timestamp >= ? ORDER BY timestamp ASC", uid,
req.getFromTime().toEpochMilli())
.map(this::mapChirp);
sources.add(select);
}
// Chirps from one user are ordered by timestamp, but chirps from different
// users are not ordered. That can be improved by implementing a smarter
// merge that takes the timestamps into account.
Source<Chirp, ?> result = Source.from(sources).flatMapMerge(sources.size(), s -> s);
return CompletableFuture.completedFuture(result);
};
}
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:19,代码来源:ChirpServiceImpl.java
示例9: shouldIncludeSomeOldChirpsInLiveFeed
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldIncludeSomeOldChirpsInLiveFeed() throws Exception {
ChirpService chirpService = server.client(ChirpService.class);
Chirp chirp1 = Chirp.of("usr3", "hi 1");
chirpService.addChirp("usr3").invoke(chirp1).toCompletableFuture().get(3, SECONDS);
Chirp chirp2 = Chirp.of("usr4", "hi 2");
chirpService.addChirp("usr4").invoke(chirp2).toCompletableFuture().get(3, SECONDS);
LiveChirpsRequest request = LiveChirpsRequest.of(TreePVector.<String>empty().plus("usr3").plus("usr4"));
Source<Chirp, ?> chirps = chirpService.getLiveChirps("user3").invoke(request).toCompletableFuture().get(3, SECONDS);
Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
probe.request(10);
probe.expectNextUnordered(chirp1, chirp2);
Chirp chirp3 = Chirp.of("usr4", "hi 3");
chirpService.addChirp("usr4").invoke(chirp3).toCompletableFuture().get(3, SECONDS);
probe.expectNext(chirp3);
probe.cancel();
}
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:23,代码来源:ChirpServiceTest.java
示例10: shouldRetrieveOldChirps
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void shouldRetrieveOldChirps() throws Exception {
ChirpService chirpService = server.client(ChirpService.class);
Chirp chirp1 = Chirp.of("usr5", "msg 1");
chirpService.addChirp("usr5").invoke(chirp1).toCompletableFuture().get(3, SECONDS);
Chirp chirp2 = Chirp.of("usr6", "msg 2");
chirpService.addChirp("usr6").invoke(chirp2).toCompletableFuture().get(3, SECONDS);
HistoricalChirpsRequest request = HistoricalChirpsRequest.of(Instant.now().minusSeconds(20),
TreePVector.<String>empty().plus("usr5").plus("usr6"));
Source<Chirp, ?> chirps = chirpService.getHistoricalChirps().invoke(request).toCompletableFuture().get(3, SECONDS);
Probe<Chirp> probe = chirps.runWith(TestSink.probe(server.system()), server.materializer());
probe.request(10);
probe.expectNextUnordered(chirp1, chirp2);
probe.expectComplete();
}
开发者ID:negokaz,项目名称:lagom-hands-on-development,代码行数:19,代码来源:ChirpServiceTest.java
示例11: importAll
import akka.stream.javadsl.Source; //导入依赖的package包/类
public CompletableFuture<Done> importAll() {
AtomicLong batch = new AtomicLong(0);
long start = System.currentTimeMillis();
final Materializer materializer = ActorMaterializer.create(ActorSystem.create("actor-system", ConfigFactory.load()));
Source<Page, NotUsed> channelSource = Source.fromIterator(() -> reader);
Flow<Page, List<Page>, NotUsed> processing = Flow.of(Page.class)
.filter(x -> x != null && x.getNamespace() == 0)
.buffer(cores * 100, OverflowStrategy.backpressure())
.mapAsyncUnordered(cores, x -> cleaner.clean(x))
.groupedWithin(BATCH_SIZE,
FiniteDuration.apply(1, TimeUnit.SECONDS))
.mapAsyncUnordered(cores, x -> datastore.save(x));
final CompletionStage<Done> promise = channelSource.via(processing).runForeach(x -> {
System.out.printf("Inserted: %d, Time: %d sec\n",
batch.incrementAndGet() * BATCH_SIZE,
(System.currentTimeMillis() - start) / 1000);
}, materializer);
return promise.toCompletableFuture();
}
开发者ID:crtomirmajer,项目名称:wiki2mongo,代码行数:27,代码来源:Wiki2MongoImporter.java
示例12: main
import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void main(String[] args) throws IOException {
final ActorSystem system = ActorSystem.create("Sys");
final ActorMaterializer materializer = ActorMaterializer.create(system);
final String text =
"Lorem Ipsum is simply dummy text of the printing and typesetting industry. " +
"Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, " +
"when an unknown printer took a galley of type and scrambled it to make a type " +
"specimen book.";
Source.from(Arrays.asList(text.split("\\s"))).
// transform
map(e -> e.toUpperCase()).
// print to console
runForeach(System.out::println, materializer).
handle((done, failure) -> {
system.terminate();
return NotUsed.getInstance();
});
}
开发者ID:typesafehub,项目名称:activator-akka-stream-java8,代码行数:21,代码来源:BasicTransformation.java
示例13: client
import akka.stream.javadsl.Source; //导入依赖的package包/类
public static void client(ActorSystem system, InetSocketAddress serverAddress) {
final ActorMaterializer materializer = ActorMaterializer.create(system);
final List<ByteString> testInput = new ArrayList<>();
for (char c = 'a'; c <= 'z'; c++) {
testInput.add(ByteString.fromString(String.valueOf(c)));
}
Source<ByteString, NotUsed> responseStream =
Source.from(testInput).via(Tcp.get(system).outgoingConnection(serverAddress.getHostString(), serverAddress.getPort()));
CompletionStage<ByteString> result = responseStream.runFold(
ByteString.empty(), (acc, in) -> acc.concat(in), materializer);
result.handle((success, failure) -> {
if (failure != null) {
System.err.println("Failure: " + failure.getMessage());
} else {
System.out.println("Result: " + success.utf8String());
}
System.out.println("Shutting down client");
system.shutdown();
return NotUsed.getInstance();
});
}
开发者ID:typesafehub,项目名称:activator-akka-stream-java8,代码行数:26,代码来源:TcpEcho.java
示例14: stream
import akka.stream.javadsl.Source; //导入依赖的package包/类
public Result stream() {
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("kiki"), null);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sourceActor.tell(ByteString.fromString("foo"), null);
sourceActor.tell(ByteString.fromString("bar"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
new CreateTraceEntry().traceEntryMarker();
return null;
});
return ok().chunked(source);
}
开发者ID:glowroot,项目名称:glowroot,代码行数:18,代码来源:StreamController.java
示例15: streamForTag
import akka.stream.javadsl.Source; //导入依赖的package包/类
private Source<Pair<BasketEvent, Offset>, ?> streamForTag(AggregateEventTag<PEBasketEvent> tag, Offset offset) {
return registry
.eventStream(tag, offset)
.filter(eventOffset ->
eventOffset.first() instanceof ItemAdded ||
eventOffset.first() instanceof ItemDeleted ||
eventOffset.first() instanceof CheckedOut
)
.map(this::toTopic);
}
开发者ID:ignasi35,项目名称:lagom-java-workshop,代码行数:11,代码来源:BasketServiceImpl.java
示例16: run1
import akka.stream.javadsl.Source; //导入依赖的package包/类
private void run1() {
final ActorSystem actorSystem = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(actorSystem);
final Source<Integer, NotUsed> source = Source.range(0, 10);
final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer i) -> i.toString());
final Sink<String, CompletionStage<Done>> sink = Sink.foreach(s -> System.out.println("Example1 - Number: " + s));
final RunnableGraph runnable = source.via(flow).to(sink);
runnable.run(materializer);
actorSystem.terminate();
}
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:11,代码来源:RSExample1.java
示例17: run2
import akka.stream.javadsl.Source; //导入依赖的package包/类
private void run2() {
final ActorSystem actorSystem = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(actorSystem);
Source.range(0, 10)
.map(Object::toString)
.runForeach(s -> System.out.println("Example 2 - Number: " + s), materializer);
actorSystem.terminate();
}
开发者ID:henrikengstrom,项目名称:ujug2017,代码行数:9,代码来源:RSExample1.java
示例18: helloStream
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Test
public void helloStream() throws Exception {
// Important to concat our source with a maybe, this ensures the connection doesn't get closed once we've
// finished feeding our elements in, and then also to take 3 from the response stream, this ensures our
// connection does get closed once we've received the 3 elements.
Source<String, ?> response = await(streamService.stream().invoke(
Source.from(Arrays.asList("a", "b", "c"))
.concat(Source.maybe())));
List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
}
开发者ID:MarioAriasC,项目名称:lagomkotlin,代码行数:12,代码来源:StreamIT.java
示例19: hide
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
protected Publisher hide(Processor processor) {
return (Publisher) Source
.fromPublisher(processor)
.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
开发者ID:apptik,项目名称:RHub,代码行数:8,代码来源:AkkaProcProxy.java
示例20: filter
import akka.stream.javadsl.Source; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
protected <T> Publisher<T> filter(Processor processor, final Class<T> filterClass) {
Source src = Source.fromPublisher(processor)
.filter(o -> filterClass.isAssignableFrom(o.getClass()));
return (Publisher<T>) src.runWith(Sink.asPublisher(AsPublisher.WITH_FANOUT), mat);
}
开发者ID:apptik,项目名称:RHub,代码行数:8,代码来源:AkkaProcProxy.java
注:本文中的akka.stream.javadsl.Source类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论