Add keypoint Extraction Post-Processing Layer to TensorFlow Model

262 Views Asked by At

I have PoseNet Tensorflow saved model that takes in an image and outputs heatmap and offset tensors.

PoseNet is an already trained model from Google, and I've very little control over it. the model works fine, but I just want to add a layer to it that performs the post-processing.

Currently, I'm extracting the final keypoints in Python code. How can I add post-processing layer to the model itself so it outputs the final keypoints?

There are models, such as Movenet, that output the final keypoint, and I want to do the same thing for PoseNet.

This image illustrates what I'm trying to accomplish:

enter image description here

I've looked at the following posts about adding post-processing layer, but I don't know how to apply it for my problem:

  1. How to add post-processing into a Tensorflow Model?
  2. Cannot add layers to saved Keras Model. 'Model' object has no attribute 'add'
  3. How to add another layer on a pre-loaded network?
  4. Add layer between two layers in saved model tensorflow

I fully understand the post-processing algorithm and have implemented it in Python. Now, I want to integrate this functionality directly into the model itself:

"""
heatmap shape [9, 9, 17]
offset shape [9, 9, 34]
"""
def parse_output(heatmap, offset):
    # Get the number of joints - value is 17 for Posenet
    joint_num = heatmap.shape[-1]
    # Initialize an array to store the keypoints
    pose_kps = np.zeros((joint_num, 3), np.uint32)

    # Iterate over each joint
    for i in range(heatmap.shape[-1]):
        # select heatmap for the i-th joint    
        joint_heatmap = heatmap[..., i]

        # Find the maximum probability and its position
        max_prob = np.max(joint_heatmap)
        # get the x, y coordinates of the max_prob position. eg: [4,7]
        max_val_pos = np.squeeze(np.argwhere(joint_heatmap == max_prob))
       
        # scale keypoints to the model input coordinates
        remap = np.array(max_val_pos/8*257, dtype=np.int32)
        
        # Assign the calculated values to the keypoints array
        pose_kps[i, 0] = remap[0] + offset[max_val_pos[0], max_val_pos[1], i]
        pose_kps[i, 1] = remap[1] + offset[max_val_pos[0], max_val_pos[1], i + joint_num]
        pose_kps[i, 2] = max_prob

    return pose_kps

The above parse_output make sense to me and similar implementation is also done in the following projects:

  1. How to parse the heatmap output for the pose estimation tflite model?
  2. decodeSinglePose

I have created a sample PosetNetDemo project to show my current implementation.

I appreciate it if you point me to a resource or help me solve it. Thank you!

1

There are 1 best solutions below

4
On

Your decoding function can be vectorized relatively straightforwardly. There's a lot of casting around because of TensorFlow stricter type requirement, to keep the result between your decoding function and my TensorFlow version similar. I also took the liberty to add a batch dimension to the function, as it will play better if you want to incorporate it in a Keras model.

Vectorized function:

def parse_output_tf(hm, offset):
    batch, height, width, nkpts = tf.unstack(tf.shape(hm))
    # We flatten and transpose to look for the max value for each joint, as topk works on the last dimension
    scores = tf.transpose(tf.reshape(hm, (batch, -1, nkpts)), (0, 2, 1))
    max_prob, max_inds = tf.math.top_k(scores, k=1)

    # getting back x,y coordinates
    max_xs = tf.cast(max_inds % width, tf.float32)
    max_ys = tf.cast(max_inds // width, tf.float32)
    # to index, it's easier if the offset array has the same shape as the score, so we can use
    # tf.gather transparently
    offset = tf.transpose(
        tf.reshape(offset, (batch, -1, tf.shape(offset)[-1])), (0, 2, 1)
    )
    offset_ys = tf.gather(offset[:, :nkpts, ...], max_inds, batch_dims=2)
    offset_xs = tf.gather(offset[:, nkpts:, ...], max_inds, batch_dims=2)
    # now we can calculate the final coordinates with the offset
    xs = tf.cast(tf.math.floor(max_xs / 8 * 257) + offset_xs, tf.int32)
    ys = tf.cast(tf.math.floor(max_ys / 8 * 257) + offset_ys, tf.int32)
    max_prob = tf.cast(max_prob, tf.int32)
    return tf.concat([ys, xs, max_prob], axis=-1)

You can check that the output of the two functions are similar (Due to casting/precision issues, there might be slightly different results, so I would encourage to test with a floating type instead, i.e remove the cast/flooring functions and do the same in your numpy implementation):

for _ in range(100):
    hm = np.random.uniform(size=(16, 9, 9, 17)).astype(np.float32)
    offset = np.random.uniform(size=(16, 9, 9, 34)).astype(np.float32)

    out_tf = parse_output_tf(hm, offset)
    out_np = np.stack([parse_output(h, o) for h,o in zip(hm, offset)],axis=0)
    assert np.allclose(out_tf, out_np)

Using it in a Keras model:

You can simply use a Lambda Layer:

model = Model(inputs, [out_heatmap, out_offset])
decoding_layer = Lambda(parse_output_tf)
decoded_pred = decoding_layer(model.outputs)
model_with_post = Model(model.inputs, decoded_pred)

If you want to run the model on accelerated hardware, you might run into issues due to the use of tf.shape which is not always handled well by static folding optimizations. If that's the case, you might either want to create an actual Keras layer where you pre-compute those values during the building phase, or simply hardcode the values in your function.


Implementation Details:

I chose to use tf.math.topk to get the maximum scores for each keypoints and tf.gather to do the indexing, but there are other methods that would work. Using topk has the advantage to scale to multi-joint detection if needed.