Serial Inserts
The easiest way would be to do inserts within a Sink.foreach
.
Assuming you've used the schema code generation and further assuming your table is named "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
We can write a function that does the insertion
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
And that function can be placed in the Sink
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Batched Inserts
You could further extend the Sink methodology by batching N inserts at a time:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
This batched Sink can be fed by a Flow
which does the batch grouping:
val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…