CloudFx QueueListener doesn't seem to work for me. Why?

381 Views Asked by At

I'm trying to use the CloudFx library "released" by Microsoft with a set of samples. There is vanishingly little documentation of the library and how it should be used (except with service bus), but I'm trying to make do with the CHM references and the samples. I'm trying to replicate what they did in the simple Producer / Consumer worker roles that communicate over a storage queue, but I can't seem to get the CloudQueueListenerExtension class to work the way I was hoping. First, I wrote a simple wrapper class around the listener:

public class QueueListener<T>
{
    private readonly CloudQueueLocation _queueLocation;
    private readonly CloudQueueListenerExtension<T> _queueListenerExtension;
    private readonly IObserver<T> _observer;

    public static QueueListener<T> StartNew(IExtensibleComponent owner, string storageAccount, Action<T> action)
    {
        var location = new CloudQueueLocation()
                            {
                                StorageAccount = storageAccount,
                                QueueName = typeof(T).Name.ToLowerInvariant()
                            };
        return new QueueListener<T>(owner, location, action).Start();
    }

    protected QueueListener(IExtensibleComponent owner, CloudQueueLocation queueLocation, Action<T> action)
    {
        _queueLocation = queueLocation;
        _queueListenerExtension = new CloudQueueListenerExtension<T>(queueLocation, owner);
        _observer = Observer.Create(action);

        _queueListenerExtension.Subscribe(_observer);            
    }

    protected QueueListener<T> Start()
    {
        _queueListenerExtension.StartListener();
        return this;
    }
}

Then I set it up in the main worker role like this:

QueueListener<MyMessageType>.StartNew(this, storageAccountString,
         newMsg => _log.InfoFormat("Got {0}", newMsg));

I have a web app that posts MyMessageType messages to the queue, but the action is never executed. I see some traces in the diagnostic logs that indicate the listener is pointed to the correct storage account and the correct queue, and I even see a call to ReliableCloudQueueStorage.Get. I haven't been able to build the samples, but I think I'm using the listener extension in the exact same way as the sample.

Any ideas as to what might be going on?

1

There are 1 best solutions below

1
On

Thanks for the great explanation and code snippet. It was very useful for troubleshooting. :-)

We have recently migrated CloudFx to Reactive Extensions 2.0 and while we were undertaking the migration, we failed to appreciate a couple of specifics of how we handle observable sequences internally in the queue listener component. This has now been addressed and I just pushed a new version of the NuGet package (1.3.0.1) that is intended to solve the challenge you were facing.

Let us know how you will be getting on.

Valery