Basically what you need first is rechunk input as bigger chunks, 1024 * 1024 bytes.
First let's have an Iteratee
that will consume up to 1m of bytes (ok to have the last chunk smaller)
val consumeAMB =
Traversable.takeUpTo[Array[Byte]](1024*1024) &>> Iteratee.consume()
Using that, we can construct an Enumeratee
(adapter) that will regroup chunks, using an API called grouped:
val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
Enumeratee.grouped(consumeAMB)
Here grouped uses an Iteratee
to determine how much to put in each chunk. It uses the our consumeAMB for that. Which means the result is an Enumeratee
that rechunks input into Array[Byte]
of 1MB.
Now we need to write the BodyParser
, which will use the Iteratee.foldM
method to send each chunk of bytes:
val writeToStore: Iteratee[Array[Byte],_] =
Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) =>
// write bytes and return next handle, probable in a Future
}
foldM passes a state along and uses it in its passed function (S,Input[Array[Byte]]) => Future[S]
to return a new Future of state. foldM will not call the function again until the Future
is completed and there is an available chunk of input.
And the body parser will be rechunking input and pushing it into the store:
BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))
Returning a Right indicates that you are returning a body by the end of the body parsing (which happens to be the handler here).
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…