Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
985 views
in Technique[技术] by (71.8m points)

scala - Is it possible to install a callback after request processing is finished in Spray?

I'm trying to serve large temporary files from Spray. I need to delete those files once HTTP request is complete. I could not find a way to do this so far...

I'm using code similar to this or this:

          respondWithMediaType(`text/csv`) {
            path("somepath" / CsvObjectIdSegment) {
              id =>
                CsvExporter.export(id) { // loan pattern to provide temp file for this request
                  file =>
                    encodeResponse(Gzip) {
                      getFromFile(file)
                    }
                }
            }
          }

So essentially it calls getFromFile which completes the route in a Future. The problem is that even if that Future is complete the web request is not complete yet. I tried to write a function similar to getFromFile and I would call file.delete() in onComplete of that Future but it has the same problem - Future completes before the client finished downloading the file if the file is large enough.

Here is getFromFile from Spray for reference:

/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(file: File)(implicit settings: RoutingSettings,
                            resolver: ContentTypeResolver,
                            refFactory: ActorRefFactory): Route =
  getFromFile(file, resolver(file.getName))

I can't use file.deleteOnExit() because JVM might not be restarted for a while and temp files will be kept laying around wasting disk space.

On the other hand it's a more general question - is there a way to install a callback in Spray so that when request processing is complete resources can be released or statistics/logs can be updated, etc.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Thanks to @VladimirPetrosyan for the pointer. Here is how I implemented it:

The route has this:

trait MyService extends HttpService ... with CustomMarshallers {

   override def routeSettings = implicitly[RoutingSettings]

   ...

        get {
          respondWithMediaType(`text/csv`) {
            path("somepath" / CsvObjectIdSegment) {
              filterInstanceId => // just an ObjectId
                val tempResultsFile = CsvExporter.saveCsvResultsToTempFile(filterInstanceId)
                respondWithLastModifiedHeader(tempResultsFile.lastModified) {
                  encodeResponse(Gzip) {
                    complete(tempResultsFile)
                  }
                }
            }
          }

and the trait that I mix in that does the unmarshalling producing chunked response:

import akka.actor._
import spray.httpx.marshalling.{MarshallingContext, Marshaller}
import spray.http.{MessageChunk, ChunkedMessageEnd, HttpEntity, ContentType}
import spray.can.Http
import spray.http.MediaTypes._
import scala.Some
import java.io.{RandomAccessFile, File}
import spray.routing.directives.FileAndResourceDirectives
import spray.routing.RoutingSettings
import math._

trait CustomMarshallers extends FileAndResourceDirectives {

  implicit def actorRefFactory: ActorRefFactory
  implicit def routeSettings: RoutingSettings

  implicit val CsvMarshaller =
    Marshaller.of[File](`text/csv`) {
      (file: File, contentType: ContentType, ctx: MarshallingContext) =>

        actorRefFactory.actorOf {
          Props {
            new Actor with ActorLogging {
              val defaultChunkSize = min(routeSettings.fileChunkingChunkSize, routeSettings.fileChunkingThresholdSize).toInt

              private def getNumberOfChunks(file: File): Int = {
                val randomAccessFile = new RandomAccessFile(file, "r")
                try {
                  ceil(randomAccessFile.length.toDouble / defaultChunkSize).toInt
                } finally {
                  randomAccessFile.close
                }
              }

              private def readChunk(file: File, chunkIndex: Int): String = {
                val randomAccessFile = new RandomAccessFile(file, "r")
                val byteBuffer = new Array[Byte](defaultChunkSize)
                try {
                  val seek = chunkIndex * defaultChunkSize
                  randomAccessFile.seek(seek)
                  val nread = randomAccessFile.read(byteBuffer)
                  if(nread == -1) ""
                  else if(nread < byteBuffer.size) new String(byteBuffer.take(nread))
                  else new String(byteBuffer)
                } finally {
                  randomAccessFile.close
                }
              }

              val chunkNum = getNumberOfChunks(file)

              val responder: ActorRef = ctx.startChunkedMessage(HttpEntity(contentType, ""), Some(Ok(0)))(self)

              sealed case class Ok(seq: Int)

              def stop() = {
                log.debug("Stopped CSV download handler actor.")
                responder ! ChunkedMessageEnd
                file.delete()
                context.stop(self)
              }

              def sendCSV(seq: Int) =
                if (seq < chunkNum)
                  responder ! MessageChunk(readChunk(file, seq)).withAck(Ok(seq + 1))
                else
                  stop()

              def receive = {
                case Ok(seq) =>
                  sendCSV(seq)
                case ev: Http.ConnectionClosed =>
                  log.debug("Stopping response streaming due to {}", ev)
              }
            }
          }
        }
    }
}

The temp file is created and then actor starts streaming chunks. It sends a chunk whenever response from client is received. Whenever client disconnects temp file is deleted and actor is shut down.

This requires you to run your app in spray-can and I think will not work if you run it in container.

Some useful links: example1, example2, docs


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...