• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java TopicProcessor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中reactor.core.publisher.TopicProcessor的典型用法代码示例。如果您正苦于以下问题:Java TopicProcessor类的具体用法?Java TopicProcessor怎么用?Java TopicProcessor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



TopicProcessor类属于reactor.core.publisher包,在下文中一共展示了TopicProcessor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: setupFakeProtocolListener

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupFakeProtocolListener() throws Exception {
	broadcaster = TopicProcessor.create();
	final Processor<List<String>, List<String>> processor =
			WorkQueueProcessor.<List<String>>builder().autoCancel(false).build();
	Flux.from(broadcaster)
	    .buffer(5)
	    .subscribe(processor);

	httpServer = HttpServer.create(0)
	                       .newRouter(r -> r.get("/data",
			                       (req, resp) -> resp.options(NettyPipeline.SendOptions::flushOnEach)
			                                          .send(Flux.from(processor)
			                                                    .log("server")
			                                                    .timeout(Duration.ofSeconds(
					                                                    2),
					                                                    Flux.empty())
			                                                    .concatWith(Flux.just(
					                                                    new ArrayList<>()))
			                                                    .map(new DummyListEncoder(
					                                                    resp.alloc()
			                                                    )))))
	                       .block(Duration.ofSeconds(30));
}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:24,代码来源:ClientServerHttpTests.java


示例2: RobotWSService

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public RobotWSService(PositionsFlux positions, MovementCommandSubscriber movementSubscriber)
		throws UnknownHostException {
	this.positions = positions;
	this.movements = movementSubscriber;

	try {
		ipAddress = findMyInetAddress();
	} catch (SocketException e1) {
		e1.printStackTrace();
	}

	try {
		setup();
		movementCommands = TopicProcessor.create();
		movementCommands.subscribe(movementSubscriber);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	
}
 
开发者ID:iproduct,项目名称:course-social-robotics,代码行数:21,代码来源:RobotWSService.java


示例3: afterPropertiesSet

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Override
public void afterPropertiesSet() throws Exception {
	if (!isShared()) {
		this.dispatcher = TopicProcessor.create(
		  getName(),
		  getBacklog(),
		  (null != waitStrategy ? waitStrategy : WaitStrategy.blocking())
		);
	} else {
		this.dispatcher = TopicProcessor.share(
		  getName(),
		  getBacklog(),
		  (null != waitStrategy ? waitStrategy : WaitStrategy.blocking())
		);
	}
	if (isAutoStartup()) {
		start();
	}
}
 
开发者ID:reactor,项目名称:reactor-spring,代码行数:20,代码来源:RingBufferAsyncTaskExecutor.java


示例4: sensorOdd

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public Flux<SensorData> sensorOdd() {
	if (sensorOdd == null) {
		// this is the stream we publish odd-numbered events to
		this.sensorOdd = TopicProcessor.<SensorData>builder().name("odd").build();

		// add substream to "master" list
		//allSensors().add(sensorOdd.reduce(this::computeMin).timeout(1000));
	}

	return sensorOdd.log("odd");
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:12,代码来源:CombinationTests.java


示例5: sensorEven

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public Flux<SensorData> sensorEven() {
	if (sensorEven == null) {
		// this is the stream we publish even-numbered events to
		this.sensorEven = TopicProcessor.<SensorData>builder().name("even").build();

		// add substream to "master" list
		//allSensors().add(sensorEven.reduce(this::computeMin).timeout(1000));
	}
	return sensorEven.log("even");
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:11,代码来源:CombinationTests.java


示例6: indexBugTest

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
	public void indexBugTest() throws InterruptedException {
		int numOfItems = 20;

		//this line causes an java.lang.ArrayIndexOutOfBoundsException unless there is a break point in ZipAction
		// .createSubscriber()
		TopicProcessor<String> ring = TopicProcessor.<String>builder().name("test").bufferSize(1024).build();

//        EmitterProcessor<String> ring = EmitterProcessor.create();


		Flux<String> stream2 = ring
				.zipWith(Mono.fromCallable(System::currentTimeMillis).repeat(), (t1, t2) ->
				String.format("%s : %s", t1, t2));

		Mono<List<String>> p = stream2
		  .doOnNext(System.out::println)
		  .buffer(numOfItems)
		  .next();

		for (int curr = 0; curr < numOfItems; curr++) {
			if (curr % 5 == 0 && curr % 3 == 0) ring.onNext("FizBuz"+curr);
			else if (curr % 3 == 0) ring.onNext("Fiz"+curr);
			else if (curr % 5 == 0) ring.onNext("Buz"+curr);
			else ring.onNext(String.valueOf(curr));
		}

		Assert.assertTrue("Has not returned list", p.block(Duration.ofSeconds(5)) !=
				null);

	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FizzBuzzTests.java


示例7: testDiamond

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
@Ignore
public void testDiamond() throws InterruptedException, IOException {
	Flux<Point> points = Flux.<Double, Random>generate(Random::new, (r, sub) -> {
		sub.next(r.nextDouble());
		return r;
	}).log("points")
	  .buffer(2)
	  .map(pairs -> new Point(pairs.get(0), pairs.get(1)))
	  .subscribeWith(TopicProcessor.<Point>builder().name("tee").bufferSize(32).build());

	Flux<InnerSample> innerSamples = points.log("inner-1")
	                                          .filter(Point::isInner)
	                                          .map(InnerSample::new)
	                                          .log("inner-2");

	Flux<OuterSample> outerSamples = points.log("outer-1")
	                                          .filter(p -> !p.isInner())
	                                          .map(OuterSample::new)
	                                          .log("outer-2");

	Flux.merge(innerSamples, outerSamples)
	       .publishOn(asyncGroup)
	       .scan(new SimulationState(0l, 0l), SimulationState::withNextSample)
	       .log("result")
	       .map(s -> System.out.printf("After %8d samples π is approximated as %.5f", s.totalSamples, s.pi()))
	       .take(10000)
	       .subscribe();

	System.in.read();
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:32,代码来源:FluxTests.java


示例8: multiplexUsingProcessors

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test(timeout = TIMEOUT)
public void multiplexUsingProcessors() throws Exception {

	final Flux<Integer> forkStream = Flux.just(1, 2, 3)
	                                           .log("begin-computation");
	final Flux<Integer> forkStream2 = Flux.just(1, 2, 3)
	                                            .log("begin-persistence");

	final TopicProcessor<Integer> computationEmitterProcessor = TopicProcessor.<Integer>builder()
			.name("computation")
			.bufferSize(BACKLOG)
			.build();
	final Flux<String> computationStream = computationEmitterProcessor
	                                                 .map(i -> Integer.toString(i));

	final TopicProcessor<Integer> persistenceEmitterProcessor = TopicProcessor.<Integer>builder()
			.name("persistence")
			.bufferSize(BACKLOG)
			.build();
	final Flux<String> persistenceStream = persistenceEmitterProcessor
	                                                 .map(i -> "done " + i);

	forkStream.subscribe(computationEmitterProcessor);
	forkStream2.subscribe(persistenceEmitterProcessor);

	final Semaphore doneSemaphore = new Semaphore(0);

	final Flux<List<String>> joinStream =
			Flux.zip(computationStream.log("log1"), persistenceStream.log("log2"), (a, b) -> Arrays.asList(a,b));

	// Method chaining doesn't compile.
	joinStream.log("log-final")
	          .subscribe(list -> println("Joined: ", list), t -> println("Join failed: ", t.getMessage()), () -> {
		          println("Join complete.");
		          doneSemaphore.release();
	          });

	doneSemaphore.acquire();

}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:41,代码来源:FluxTests.java


示例9: createIdentityProcessor

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Override
	public Processor<Long, Long> createIdentityProcessor(int bufferSize) {
		Flux<String> otherStream = Flux.just("test", "test2", "test3");
//		System.out.println("Providing new downstream");
		FluxProcessor<Long, Long> p =
				WorkQueueProcessor.<Long>builder().name("fluxion-raw-fork").bufferSize(bufferSize).build();

		cumulated.set(0);
		cumulatedJoin.set(0);

		BiFunction<Long, String, Long> combinator = (t1, t2) -> t1;
		return FluxProcessor.wrap(p,
				p.groupBy(k -> k % 2 == 0)
				 .flatMap(stream -> stream.scan((prev, next) -> next)
				                          .map(integer -> -integer)
				                          .filter(integer -> integer <= 0)
				                          .map(integer -> -integer)
				                          .bufferTimeout(1024, Duration.ofMillis(50))
				                          .flatMap(Flux::fromIterable)
				                          .doOnNext(array -> cumulated.getAndIncrement())
				                          .flatMap(i -> Flux.zip(Flux.just(i),
						                          otherStream,
						                          combinator)))
				 .doOnNext(array -> cumulatedJoin.getAndIncrement())
				 .subscribeWith(TopicProcessor.<Long>builder().name("fluxion-raw-join").bufferSize(bufferSize).build())
				 .doOnError(Throwable::printStackTrace));
	}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:28,代码来源:FluxWithProcessorVerification.java


示例10: GroupedTickSubscriber

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public GroupedTickSubscriber(
    TickerManager tickerManager,
    TopicProcessor<IndicatorJson> indicators,
    TopicProcessor<StrategyJson> strategies) {
  this.tickerManager = tickerManager;
  this.indicators = indicators;
  this.strategies = strategies;
  logger.info("created GroupedTickSubscriber:{}", toString());
}
 
开发者ID:the-james-burton,项目名称:the-turbine,代码行数:10,代码来源:GroupedTickSubscriber.java


示例11: ReactorTickSubscriber

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public ReactorTickSubscriber(
    String name,
    TickerManager tickerManager,
    TopicProcessor<IndicatorJson> indicators,
    TopicProcessor<StrategyJson> strategies) {
  super(name);
  this.tickerManager = tickerManager;
  this.indicators = indicators;
  this.strategies = strategies;
}
 
开发者ID:the-james-burton,项目名称:the-turbine,代码行数:11,代码来源:ReactorTickSubscriber.java


示例12: RingBufferApplicationEventPublisher

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public RingBufferApplicationEventPublisher(int backlog, boolean autoStartup) {
	this.autoStartup = autoStartup;

	this.processor = TopicProcessor.share("ringBufferAppEventPublisher", backlog);

	if(autoStartup) {
		start();
	}
}
 
开发者ID:reactor,项目名称:reactor-spring,代码行数:10,代码来源:RingBufferApplicationEventPublisher.java


示例13: ReactorSubscribableChannel

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
/**
 * Create a {@literal ReactorSubscribableChannel} with a {@code ProducerType.SINGLE} if {@code
 * singleThreadedProducer} is {@code true}, otherwise use {@code ProducerType.MULTI}.
 *
 * @param singleThreadedProducer whether to create a single-threaded producer or not
 */
public ReactorSubscribableChannel(boolean singleThreadedProducer) {
	this.beanName = String.format("%[email protected]%s", getClass().getSimpleName(), ObjectUtils.getIdentityHexString(this));
	if (singleThreadedProducer) {
		this.processor = TopicProcessor.create();
	} else {
		this.processor = TopicProcessor.share();
	}
}
 
开发者ID:reactor,项目名称:reactor-spring,代码行数:15,代码来源:ReactorSubscribableChannel.java


示例14: topicProcessorProxy

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy topicProcessorProxy() {
    return new ReactorProcProxy(TopicProcessor.create(), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例15: serializedTopicProcessorProxy

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy serializedTopicProcessorProxy() {
    return new ReactorProcProxy(TopicProcessor.create().serialize(), PASS);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例16: safeTopicProcessorProxy

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeTopicProcessorProxy() {
    return new ReactorProcProxy(TopicProcessor.create(), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例17: safeSerializedTopicProcessorProxy

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
public static ReactorProcProxy safeSerializedTopicProcessorProxy() {
    return new ReactorProcProxy(TopicProcessor.create().serialize(), WRAP);
}
 
开发者ID:apptik,项目名称:RHub,代码行数:4,代码来源:ReactorProxies.java


示例18: setupFakeProtocolListener

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupFakeProtocolListener() throws Exception {

		processor = TopicProcessor.<ByteBuf>builder().autoCancel(false).build();
		workProcessor = WorkQueueProcessor.<ByteBuf>builder().autoCancel(false).build();
		Flux<ByteBuf> bufferStream = Flux.from(processor)
		                                 .window(windowBatch)
		                                 .doOnNext(d -> windows.getAndIncrement())
		                                 .flatMap(s -> s.take(Duration.ofSeconds(2))
		                                                .reduceWith(Unpooled::buffer,
				                                                ByteBuf::writeBytes))
		                                 .doOnNext(d -> postReduce.getAndIncrement())
		                                 //.log("log", LogOperator.REQUEST)
		                                 .subscribeWith(workProcessor);

		httpServer = HttpServer.create(opts -> opts.port(port))
		                       .newHandler((request, response) -> {
			                       response.chunkedTransfer(false);

			                       return response.addHeader("Content-type", "text/plain")
			                                      .addHeader("Expires", "0")
			                                      .addHeader("X-GPFDIST-VERSION",
					                                      "Spring XD")
			                                      .addHeader("X-GP-PROTO", "1")
			                                      .addHeader("Cache-Control", "no-cache")
			                                      .addHeader("Connection", "close")
			                                      .options(NettyPipeline.SendOptions::flushOnEach)
			                                      .send(bufferStream.doOnNext(d -> integer.getAndIncrement())
			                                                        .take(takeCount)
			                                                        .doOnNext(d -> integerPostTake.getAndIncrement())
			                                                        .timeout(Duration.ofSeconds(2), Flux.empty())
			                                                        .doOnNext(d -> integerPostTimeout.getAndIncrement())
			                                                        .concatWith(Flux.just(
					                                                        dummy ?
							                                                        Unpooled.copiedBuffer(
									                                                        new byte[0]) :
							                                                        Unpooled.copiedBuffer(
									                                                        "END".getBytes()))
			                                                                        .doOnComplete(
					                                                                        integerPostConcat::decrementAndGet))//END
			                                                        .doOnNext(d -> integerPostConcat.getAndIncrement())
			                                                        .map(dummy ?
					                                                        Function.identity() :
					                                                        new GpdistCodec(
							                                                        response
							                                                               .alloc())));
		                       })
		                       .block(Duration.ofSeconds(30));

	}
 
开发者ID:reactor,项目名称:reactor-netty,代码行数:50,代码来源:SmokeTests.java


示例19: konamiCode

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
@Test
public void konamiCode() throws InterruptedException {
	final TopicProcessor<Integer> keyboardStream = TopicProcessor.create();

	Mono<List<Boolean>> konamis = keyboardStream
	                                     .skipWhile(key -> KeyEvent.VK_UP != key)
	                                     .buffer(10, 1)
	                                     .map(keys -> keys.size() == 10 &&
			                                     keys.get(0) == KeyEvent.VK_UP &&
			                                     keys.get(1) == KeyEvent.VK_UP &&
			                                     keys.get(2) == KeyEvent.VK_DOWN &&
			                                     keys.get(3) == KeyEvent.VK_DOWN &&
			                                     keys.get(4) == KeyEvent.VK_LEFT &&
			                                     keys.get(5) == KeyEvent.VK_RIGHT &&
			                                     keys.get(6) == KeyEvent.VK_LEFT &&
			                                     keys.get(7) == KeyEvent.VK_RIGHT &&
			                                     keys.get(8) == KeyEvent.VK_B &&
			                                     keys.get(9) == KeyEvent.VK_A)
	                                     .collectList();

	keyboardStream.onNext(KeyEvent.VK_UP);
	keyboardStream.onNext(KeyEvent.VK_UP);
	keyboardStream.onNext(KeyEvent.VK_UP);
	keyboardStream.onNext(KeyEvent.VK_DOWN);
	keyboardStream.onNext(KeyEvent.VK_DOWN);
	keyboardStream.onNext(KeyEvent.VK_LEFT);
	keyboardStream.onNext(KeyEvent.VK_RIGHT);
	keyboardStream.onNext(KeyEvent.VK_LEFT);
	keyboardStream.onNext(KeyEvent.VK_RIGHT);
	keyboardStream.onNext(KeyEvent.VK_B);
	keyboardStream.onNext(KeyEvent.VK_A);
	keyboardStream.onNext(KeyEvent.VK_C);
	keyboardStream.onComplete();

	List<Boolean> res = konamis.block();

	Assert.assertTrue(res.size() == 12);
	Assert.assertFalse(res.get(0));
	Assert.assertTrue(res.get(1));
	Assert.assertFalse(res.get(2));
	Assert.assertFalse(res.get(3));
	Assert.assertFalse(res.get(4));
	Assert.assertFalse(res.get(5));
	Assert.assertFalse(res.get(6));
	Assert.assertFalse(res.get(7));
	Assert.assertFalse(res.get(8));
	Assert.assertFalse(res.get(9));
	Assert.assertFalse(res.get(10));
	Assert.assertFalse(res.get(11));
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:51,代码来源:FluxTests.java


示例20: setupPipeline

import reactor.core.publisher.TopicProcessor; //导入依赖的package包/类
private void setupPipeline() {
	processor = TopicProcessor.<String>builder().autoCancel(false).build();
	workProcessor = WorkQueueProcessor.<String>builder().autoCancel(false).build();
	processor.subscribe(workProcessor);
}
 
开发者ID:reactor,项目名称:reactor-core,代码行数:6,代码来源:ConsistentProcessorTests.java



注:本文中的reactor.core.publisher.TopicProcessor类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EdgeFilteringMode类代码示例发布时间:2022-05-23
下一篇:
Java SecondaryTables类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap