Wait until value appears in the hash

241 Views Asked by At

Recently, I was given a task to build a REST API request that is responsible for sending messages to Kafka's inbound channel and then waiting for output from an outbound one. All went well until I encountered the issue related to waiting for this particular message.

It's worth pointing out that, after successful arrival, messages are written to the global message holders, which is just a ruby hash under the hood. Below is the function that monitors hash, until the latter is filled with some value.

def monitor_payment_hash(key)
 while @uuid.payment_create.get_message(key).nil?
   next
 end
 @uuid.payment_create.get_message(key)
end

Is it even appropriate to implement it that way? What should I attempt at this point?
NOTICE: Kafka consumer runs in a separate thread.

Update

I've just headed over to ruby docs and stumbled upon some interesting sections on channels. As far as I'm aware, channels are the best choice for communication between rubytines (just a fancy name for goroutines, but in ruby ecosystem :) )
1

There are 1 best solutions below

0
Lam Phan On BEST ANSWER

I think you need timeout and a way to force stop polling process, moreover, you maybe need an abstract to improve in future.

class Poller
  def self.poll(key:, from_source:, options: {})
    start_time = Time.now
    catch(:stop_polling) do
      loop do
        message = from_source.get_message(key)
        if message.nil?
          wait_time = Time.now - start_time
          throw :stop_polling if wait_time > options[:timeout]
        else
          yield(message) if block_given?
          throw :stop_polling
        end
      end
    end
  end
end

def monitor_payment_hash(key)
  Poller.poll key: key, from_source: @uuid.payment_create, options: {timeout: 60} do |message|
    # write to the global message holders
    # or handle message by block
    yield(message) if block_given?
  end
end

You maybe need to add more logic such as retry if timeout, polling a list of keys, log... I recommend you learn how to build a long polling from this source : https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb