The problem is that no major platform (as of mid-2013) will let you create anywhere near this number of threads. There are a wide variety of different limitations you could run into, and without knowing your platform, its configuration, and the exact error you got, it's impossible to know which one you ran into. But here are two examples:
- On 32-bit Windows, the default thread stack is 1MB, and all of your thread stacks have to fit into the same 2GB of virtual memory space as everything else in your program, so you will run out long before 60000.
- On 64-bit linux, you will likely exhaust one of your session's soft
ulimit
values before you get anywhere near running out of page space. (Linux has a variety of different limits beyond the ones required by POSIX.)
So, how can i control number to threads to get created or any other way to make it work like timeout or whatever?
Using as many threads as possible is very unlikely to be what you actually want to do. Running 800 threads on an 8-core machine means that you're spending a whole lot of time context-switching between the threads, and the cache keeps getting flushed before it ever gets primed, and so on.
Most likely, what you really want is one of the following:
- One thread per CPU, serving a pool of 60000 tasks.
- Maybe processes instead of threads (if the primary work is in Python, or in C code that doesn't explicitly release the GIL).
- Maybe a fixed number of threads (e.g., a web browsers may do, say, 12 concurrent requests at a time, whether you have 1 core or 64).
- Maybe a pool of, say, 600 batches of 100 tasks apiece, instead of 60000 single tasks.
- 60000 cooperatively-scheduled fibers/greenlets/microthreads all sharing one real thread.
- Maybe explicit coroutines instead of a scheduler.
- Or "magic" cooperative greenlets via, e.g.
gevent
.
- Maybe one thread per CPU, each running 1/Nth of the fibers.
But it's certainly possible.
Once you've hit whichever limit you're hitting, it's very likely that trying again will fail until a thread has finished its job and been joined, and it's pretty likely that trying again will succeed after that happens. So, given that you're apparently getting an exception, you could handle this the same way as anything else in Python: with a try
/except
block. For example, something like this:
threads = []
for n in range(0, 60000):
while True:
t = threading.Thread(target=function,args=(x, n))
try:
t.start()
threads.append(t)
except WhateverTheExceptionIs as e:
if threads:
threads[0].join()
del threads[0]
else:
raise
else:
break
for t in threads:
t.join()
Of course this assumes that the first task?launched is likely to be the one of the first tasks finished. If this is not true, you'll need some way to explicitly signal doneness (condition, semaphore, queue, etc.), or you'll need to use some lower-level (platform-specific) library that gives you a way to wait on a whole list until at least one thread is finished.
Also, note that on some platforms (e.g., Windows XP), you can get bizarre behavior just getting near the limits.
On top of being a lot better, doing the right thing will probably be a lot simpler as well. For example, here's a process-per-CPU pool:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
… and a fixed-thread-count pool:
with concurrent.futures.ThreadPoolExecutor(12) as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
concurrent.futures.wait(fs)
…?and a balancing-CPU-parallelism-with-numpy-vectorization batching pool:
with concurrent.futures.ThreadPoolExecutor() as executor:
batchsize = 60000 // os.cpu_count()
fs = [executor.submit(np.vector_function, x,
np.arange(n, min(n+batchsize, 60000)))
for n in range(0, 60000, batchsize)]
concurrent.futures.wait(fs)
In the examples above, I used a list comprehension to submit all of the jobs and gather their futures, because we're not doing anything else inside the loop. But from your comments, it sounds like you do have other stuff you want to do inside the loop. So, let's convert it back into an explicit for
statement:
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = []
for n in range(60000):
fs.append(executor.submit(function, x, n))
concurrent.futures.wait(fs)
And now, whatever you want to add inside that loop, you can.
However, I don't think you actually want to add anything inside that loop. The loop just submits all the jobs as fast as possible; it's the wait
function that sits around waiting for them all to finish, and it's probably there that you want to exit early.
To do this, you can use wait
with the FIRST_COMPLETED
flag, but it's much simpler to use as_completed
.
Also, I'm assuming error
is some kind of value that gets set by the tasks. In that case, you will need to put a Lock
around it, as with any other mutable value shared between threads. (This is one place where there's slightly more than a one-line difference between a ProcessPoolExecutor
and a ThreadPoolExecutor
—if you use processes, you need multiprocessing.Lock
instead of threading.Lock
.)
So:
error_lock = threading.Lock
error = []
def function(x, n):
# blah blah
try:
# blah blah
except Exception as e:
with error_lock:
error.append(e)
# blah blah
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
for f in concurrent.futures.as_completed(fs):
do_something_with(f.result())
with error_lock:
if len(error) > 1: exit()
However, you might want to consider a different design. In general, if you can avoid sharing between threads, your life gets a lot easier. And futures are designed to make that easy, by letting you return a value or raise an exception, just like a regular function call. That f.result()
will give you the returned value or raise the raised exception. So, you can rewrite that code as:
def function(x, n):
# blah blah
# don't bother to catch exceptions here, let them propagate out
with concurrent.futures.ProcessPoolExecutor() as executor:
fs = [executor.submit(function, x, n) for n in range(60000)]
error = []
for f in concurrent.futures.as_completed(fs):
try:
result = f.result()
except Exception as e:
error.append(e)
if len(error) > 1: exit()
else:
do_something_with(result)
Notice how similar this looks to the ThreadPoolExecutor Example in the docs. This simple pattern is enough to handle almost anything without locks, as long as the tasks don't need to interact with each other.