Ruby TCPServer sockets shared between threads

263 Views Asked by At

I am currently implementing a simplistic pub/sub system in ruby using only the standard lib, where a client can subscribe to a topic for messages or publish messages to a topic, which will be sent to all subscribers. I'm using just standard TCPServer and assuming all messages are on a single line read using gets. I'm using ruby Queues to communicate between threads and avoid accessing shared memory resources unless they are thread safe. I have reached a point where I can't see any other way to continue other than share my TCPSocket client between threads as one thread needs to block in a loop while awaiting new data on the socket, while the other needs to await new messages published and write them to the client. Are TCPSockets in ruby thread safe? If not would a simple dup or clone call suffice and have each thread with it's own reference to the socket?

For code reference I have the following basic implementation

require 'socket'

socket = TCPServer.new(4242)

processing_queue = Queue.new
Thread.start({}) do |subscriptions|
  while event = processing_queue.pop
    command, *args = event
    case command
    when 'publish'
      topic, message = args
      subscriptions[topic].each do |subscription_queue|
        subscription_queue.push(message)
      end
    when 'subscribe'
      topic, subscription_queue = args
      subscriptions[topic] = [] if subscriptions[topic].nil?
      subscriptions[topic] << subscription_queue
    end
  end
end

loop do
  Thread.start(socket.accept, Queue.new) do |client, queue|
    writer_queue = Queue.new
    Thread.start do
      while response = writer_queue.pop
        client.puts(response)
      end
    end

    while request = client.gets
      command, *args = request.split(' ')
      case command
      when 'subscribe'
        topic = args[0]
        Thread.start(Queue.new) do |subscription_queue|
          processing_queue << ['subscribe', topic, subscription_queue]
          while message = subscription_queue.pop
            writer_queue << message
          end
        end
        writer_queue << 'OK'
      when 'publish'
        topic = args.shift
        message = args.join(' ')
        processing_queue << ['publish', topic, message]
        writer_queue << 'OK'
      end
    end
    client.close
  end
end

socket.close
1

There are 1 best solutions below

0
Nick Hyland On

So to avoid the whole headache of worrying about accessing shared resources with multiple threads, I have created a single threaded approach based on what this great simple example.

This solution can be improved and should really use non blocking read and writes to the sockets, instead of using puts and gets, but I am just implementing a something basic for demonstration purposes.

I would still however like to hear more about the multithreading approach as I feel there has to be a solution out there that people have used which is more complete. If anyone has suggestions please answer.

require 'socket'

server = TCPServer.open("0.0.0.0", 4242)
read_fds = [server]
subscriptions = Hash.new([])
while true
  puts 'loop'
  if ios = select(read_fds, [], [])
    selected_reads = ios.first
    p selected_reads
    selected_reads.each do |client|
      if client == server
        puts 'Someone connected to server. Adding socket to read_fds.'
        client, sockaddr = server.accept
        read_fds << client
      elsif client.eof?
        puts "Client disconnected"
        read_fds.delete(client)
        client.close
      else
        # Perform a blocking-read until new-line is encountered.
        # We know the client is writing, so as long as it adheres to the
        # new-line protocol, we shouldn't block for long
        puts "Reading..."
        request = client.gets
        action, topic, *args = request.split(' ')
        if action == 'subscribe'
          subscriptions[topic] << client
          client.puts('OK')
        elsif action == 'publish'
          message = args.join(' ')
          # Should also be writing to socket in non blocking way using select
          # but again keeping simple assuming all messages to write are small
          subscriptions[topic].each { |client| client.puts(message) }
          client.puts('OK')
        else
          puts "Invalid request #{action}"
          client.puts('ERROR')
        end
      end
    end
  end
end