It seems to me you're complicating your life unnecessarily with the generator.
This is how I'd implement your input pipeline:
def parse_file_tf(filename):
return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])
# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()
To test it, I define a dummy parse_file
as:
i=0
def parse_file(f):
global i
i += 1
return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y
which I feed into a basic loop that shows what the iterator returns:
sess = tf.Session()
try:
while True:
x, y = it.get_next()
vx, vy = sess.run([x,y])
print(vx)
print(vy)
except tf.errors.OutOfRangeError:
pass
sess.close()
Running the code above prints:
[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]
Explanation of the pipeline
Essentially, I leave the parallelization issue to map
, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.
I chose map over parallel_interleave
because the latter requires you to generate a Dataset
instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file
.
parallel_interleave
makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset
to a list of filenames), but if your dataset fits in memory go for map
.
About the tf.py_func
limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.
Version with Generator
If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with.
The idea is, the generator goes through the JSON file and yield
s one record at a time. Then, the generator has to be your parse_file
function. As an example, let's assume you have the following parse_file
generator:
i = 3
def parse_file(filename):
global i
i += 1
ctr = 0
while ctr < i:
yield ctr, ctr
In this case, the pipeline would look as follows:
def wrap_generator(filename):
return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()
Note that here we need to use parallel_interleave
because we turn the generators into Dataset
instances from which we extract values.
The rest stays the same.
Feeding this to the same sample loop as above prints:
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]