I am currently implementing a simplistic pub/sub system in ruby using only the standard lib, where a client can subscribe to a topic for messages or publish messages to a topic, which will be sent to all subscribers. I'm using just standard TCPServer and assuming all messages are on a single line read using gets. I'm using ruby Queues to communicate between threads and avoid accessing shared memory resources unless they are thread safe. I have reached a point where I can't see any other way to continue other than share my TCPSocket client between threads as one thread needs to block in a loop while awaiting new data on the socket, while the other needs to await new messages published and write them to the client. Are TCPSockets in ruby thread safe? If not would a simple dup or clone call suffice and have each thread with it's own reference to the socket?
For code reference I have the following basic implementation
require 'socket'
socket = TCPServer.new(4242)
processing_queue = Queue.new
Thread.start({}) do |subscriptions|
while event = processing_queue.pop
command, *args = event
case command
when 'publish'
topic, message = args
subscriptions[topic].each do |subscription_queue|
subscription_queue.push(message)
end
when 'subscribe'
topic, subscription_queue = args
subscriptions[topic] = [] if subscriptions[topic].nil?
subscriptions[topic] << subscription_queue
end
end
end
loop do
Thread.start(socket.accept, Queue.new) do |client, queue|
writer_queue = Queue.new
Thread.start do
while response = writer_queue.pop
client.puts(response)
end
end
while request = client.gets
command, *args = request.split(' ')
case command
when 'subscribe'
topic = args[0]
Thread.start(Queue.new) do |subscription_queue|
processing_queue << ['subscribe', topic, subscription_queue]
while message = subscription_queue.pop
writer_queue << message
end
end
writer_queue << 'OK'
when 'publish'
topic = args.shift
message = args.join(' ')
processing_queue << ['publish', topic, message]
writer_queue << 'OK'
end
end
client.close
end
end
socket.close
So to avoid the whole headache of worrying about accessing shared resources with multiple threads, I have created a single threaded approach based on what this great simple example.
This solution can be improved and should really use non blocking read and writes to the sockets, instead of using
putsandgets, but I am just implementing a something basic for demonstration purposes.I would still however like to hear more about the multithreading approach as I feel there has to be a solution out there that people have used which is more complete. If anyone has suggestions please answer.