Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
650 views
in Technique[技术] by (71.8m points)

mysql - How are reactive streams used in Slick for inserting data

In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:

val source = Source(0 to 100)

how can I crete a Sink with Slick that writes those values into a table with schema:

create table NumberTable (value INT)

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Serial Inserts

The easiest way would be to do inserts within a Sink.foreach.

Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig"

We can write a function that does the insertion

def insertIntoDb(num : Int) = 
  numberTableDB run (Numbertable += NumbertableRow(num))

And that function can be placed in the Sink

val insertSink = Sink[Int] foreach insertIntoDb

Source(0 to 100) runWith insertSink

Batched Inserts

You could further extend the Sink methodology by batching N inserts at a time:

def batchInsertIntoDb(nums : Seq[Int]) = 
  numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb

This batched Sink can be fed by a Flow which does the batch grouping:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(batchInsertSink)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...