I am trying to make parallel queues for video processing.
However, I've faced problem in doing so. Below is the diagram of what i am trying to achieve.
Flow is:
- User sends
GET
request to the /process
endpoint (actually, GET is only for testing, it rather uses @MessagePattern
to receive data from other service)
- This request contains
ModelDTO
as well as sequence
which is used for internal tracking
- Controller imports
private readonly _queueService: QueueService
via constructor
- It then calls
this._queueService.process({ model, sequence })
QueueService
imports @InjectQueue('video_processor') private readonly _processorQueue: Queue
via constructor
QueueService
simply calls this._processorQueue.add('process', data);
VideoProcessor
imports private readonly _videoService: VideoService
via constructor
- Inside
VideoProcessor
there is a method with @Process('process')
decorator
- Inside this method I am awaiting for the result from the service with
await this._videoService.configure(job.data).process()
And here is the problem:
- Whenever I am trying to run 1 job at a time (sending single request and actually waiting for job to complete) everything works just fine
- If I am queueing two jobs at the same time, for some reason, the
console.log(this._videoData.id)
inside VideoService
will now return the ID of the second model rather than actual ID.
So far I have tried adding scope: Scope.TRANSIENT
to almost all services with no luck. Seems like i just can't figure out where this scope should be added.
I am expecting for 10 jobs to be able to run in parallel, however, if I add more than 1 job to the queue, they start mixing in data from the other jobs.
question from:
https://stackoverflow.com/questions/65903284/create-separate-instance-of-service-for-nestjs-bull 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…