How do I stream market data via SignalR that a user subscribes to

171 Views Asked by At

I want to stream market data similar to https://min-api.cryptocompare.com/documentation/websockets?key=Channels&cat=Trade&api_key=c1f6792f289d2672ac209b6c2ec2b552b0c1082b3ed27ad4bef19edde0684d07 for a TradingView chart.

There are several streams that we can subscribe to:

  • Trade: 0~{exchange}~{base}~{quote}
  • Ticker: 2~{exchange}~{base}~{quote}
  • 24~{exchange or CCCAGG}~{base}~{quote}~{period}
  • etc.

How do I achieve the same using SignalR? It is currently doing kinda the same (I mean it works), however I don't feel like I'm doing it correctly.

public class MarketData : Hub
{
    public async Task SendMessage(object message)
    {
        // Here you handle the received message
        var receivedData = JsonConvert.DeserializeObject<dynamic>(message.ToString());

        if (receivedData.action == "SubAdd")
        {
            // Start streaming data.
            // This is a simple example, replace with your own logic.
            while (true)
            {
                var randomData = GenerateRandomData(); // This method generates random data
                await Clients.Caller.SendAsync("ReceiveMessage", randomData);
                await Task.Delay(1000); // for example, send data every second
            }
        }
    }

    private string GenerateRandomData()
    {
        // Replace this with whatever logic you have to generate or fetch the data
        return "Random Data: " + new Random().Next(1000).ToString();
    }
}

I made the following snippet which works fine for testing purposes.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SignalR Client Test</title>

    <!-- Include the SignalR JavaScript client library -->
    <script src="https://cdn.jsdelivr.net/npm/@microsoft/signalr@latest/dist/browser/signalr.min.js"></script>
</head>
<body>
<h1>SignalR Client Test</h1>

<button id="connectBtn">Connect</button>
<button id="disconnectBtn" disabled>Disconnect</button>
<h2>Received Messages:</h2>
<ul id="messagesList"></ul>

<h2>Send a Custom Message:</h2>
<textarea id="customMessage" rows="4" cols="50">{"action":"SubAdd","subs":["0~Binance~BTC~USDT"]}</textarea><br>
<button id="sendCustomBtn">Send Custom Message</button>

<script>
    let connection = new signalR.HubConnectionBuilder()
        .withUrl("https://localhost:5001/hub")
        .configureLogging(signalR.LogLevel.Information)
        .build();

    const connectBtn = document.getElementById("connectBtn");
    const disconnectBtn = document.getElementById("disconnectBtn");
    const messagesList = document.getElementById("messagesList");
    const sendCustomBtn = document.getElementById("sendCustomBtn");

    connectBtn.addEventListener("click", function () {
        connection
            .start()
            .then(function () {
                console.log("connected");
                connectBtn.disabled = true;
                disconnectBtn.disabled = false;
            })
            .catch((err) => console.error(err.toString()));
    });

    disconnectBtn.addEventListener("click", function () {
        connection.stop().then(function () {
            connectBtn.disabled = false;
            disconnectBtn.disabled = true;
        });
    });

    sendCustomBtn.addEventListener("click", function() {
        const customMessageText = document.getElementById("customMessage").value;
        let customMessage;
        try {
            customMessage = JSON.parse(customMessageText);
            connection.invoke("SendMessage", customMessage).catch(err => console.error(err.toString()));
        } catch (error) {
            console.error("Invalid JSON message:", error);
            alert("Please enter a valid JSON message.");
        }
    });

    connection.on("ReceiveMessage", function (message) {
        let li = document.createElement("li");
        li.textContent = message;
        messagesList.appendChild(li);
    });
</script>
</body>
</html>

I was also thinking that Microsoft Orleans grain would be perfect in this case because if there are multiple users trying to subscribe to the same exchange/symbol, we could reuse that grain. And also the load will spread across nodes equally.

1

There are 1 best solutions below

0
Jason Pan On BEST ANSWER

You could use timers to periodically send data updates instead of an infinite loop. This approach ensures that data is sent at regular intervals without blocking the thread.

I have created a sample for you, you can check it.

My Test Result

enter image description here

My MarketData.cshtml

@{
    ViewData["Title"] = "MarketData";
}
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SignalR Client Test</title>
    <script src="~/js/signalr/dist/browser/signalr.min.js"></script>
</head>
<body>
    <h1>SignalR MarketData</h1>

    <button id="connectBtn">Connect</button>
    <button id="disconnectBtn" disabled>Disconnect</button>
    <button id="subscribeBtn" disabled>Subscribe</button>
    <button id="unsubscribeBtn" disabled>Unsubscribe</button>
    <h2>Received Messages:</h2>
    <ul id="messagesList"></ul>

    <h2>Subscribe to a Stream:</h2>
    <input type="text" id="streamInput" placeholder="Enter stream ID (e.g., 'Binance~BTC~USDT')" />
    <button id="subscribeToStreamBtn">Subscribe to Stream</button>

    <h2>Unsubscribe from Stream:</h2>
    <button id="unsubscribeFromStreamBtn">Unsubscribe from Stream</button>

    <script>
        let connection = new signalR.HubConnectionBuilder()
            .withUrl("https://localhost:44310/mainhub")
            .configureLogging(signalR.LogLevel.Information)
            .build();

        const connectBtn = document.getElementById("connectBtn");
        const disconnectBtn = document.getElementById("disconnectBtn");
        const subscribeBtn = document.getElementById("subscribeBtn");
        const unsubscribeBtn = document.getElementById("unsubscribeBtn");
        const messagesList = document.getElementById("messagesList");
        const streamInput = document.getElementById("streamInput");
        const subscribeToStreamBtn = document.getElementById("subscribeToStreamBtn");
        const unsubscribeFromStreamBtn = document.getElementById("unsubscribeFromStreamBtn");

        connectBtn.addEventListener("click", function () {
            connection
                .start()
                .then(function () {
                    console.log("connected");
                    connectBtn.disabled = true;
                    disconnectBtn.disabled = false;
                    subscribeBtn.disabled = false;
                })
                .catch((err) => console.error(err.toString()));
        });

        disconnectBtn.addEventListener("click", function () {
            connection.stop().then(function () {
                connectBtn.disabled = false;
                disconnectBtn.disabled = true;
                subscribeBtn.disabled = true;
                unsubscribeBtn.disabled = true;
            });
        });

        subscribeBtn.addEventListener("click", function () {
            connection.invoke("SubscribeToStream", streamInput.value)
                .then(function () {
                    subscribeBtn.disabled = true;
                    unsubscribeBtn.disabled = false;
                })
                .catch(err => console.error(err.toString()));
        });

        unsubscribeBtn.addEventListener("click", function () {
            connection.invoke("UnsubscribeFromStream")
                .then(function () {
                    subscribeBtn.disabled = false;
                    unsubscribeBtn.disabled = true;
                })
                .catch(err => console.error(err.toString()));
        });

        subscribeToStreamBtn.addEventListener("click", function () {
            const streamId = streamInput.value;
            connection.invoke("SubscribeToStream", streamId)
                .then(function () {
                    console.log("Subscribed to stream:", streamId);
                    subscribeToStreamBtn.disabled = true;
                    unsubscribeFromStreamBtn.disabled = false;
                })
                .catch(err => console.error(err.toString()));
        });

        unsubscribeFromStreamBtn.addEventListener("click", function () {
            connection.invoke("UnsubscribeFromStream")
                .then(function () {
                    console.log("Unsubscribed from the stream");
                    subscribeToStreamBtn.disabled = false;
                    unsubscribeFromStreamBtn.disabled = true;
                })
                .catch(err => console.error(err.toString()));
        });

        connection.on("ReceiveStreamData", function (data) {
            let li = document.createElement("li");
            li.textContent = data;
            messagesList.appendChild(li);
        });
    </script>
</body>
</html>

My Hub class

using Microsoft.AspNetCore.SignalR;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SignalRMiddleawre.Hubs
{
    public partial class MainHub : Hub
    {
        private readonly Dictionary<string, Timer> _subscriptionTimers = new Dictionary<string, Timer>();
        private readonly object _lock = new object();

        public override Task OnDisconnectedAsync(Exception exception)
        {
            // Cleanup resources when a user disconnects
            string userId = Context.ConnectionId;
            if (_subscriptionTimers.TryGetValue(userId, out var timer))
            {
                timer.Dispose();
                _subscriptionTimers.Remove(userId);
            }

            return base.OnDisconnectedAsync(exception);
        }

        public async Task SubscribeToStream(string streamId)
        {
            // Start streaming data for the specified streamId
            string userId = Context.ConnectionId;

            // Stop any existing timer for this user
            lock (_lock)
            {
                if (_subscriptionTimers.TryGetValue(userId, out var existingTimer))
                {
                    existingTimer.Dispose();
                    _subscriptionTimers.Remove(userId);
                }
            }

            var newTimer = new Timer(async _ =>
            {
                try
                {
                    // Replace this with logic to fetch actual data from a reliable source
                    var data = await FetchMarketData(streamId);

                    // Send the data update to the user
                    await Clients.Caller.SendAsync("ReceiveStreamData", data);
                }
                catch (Exception ex)
                {
                    // Handle errors gracefully
                    Console.WriteLine("Error fetching market data: " + ex.Message);
                }
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(5)); // Adjust the interval as needed

            lock (_lock)
            {
                _subscriptionTimers[userId] = newTimer;
            }
        }

        public async Task UnsubscribeFromStream()
        {
            // Stop the subscription for the current user
            string userId = Context.ConnectionId;
            if (_subscriptionTimers.TryGetValue(userId, out var timer))
            {
                timer.Dispose();
                _subscriptionTimers.Remove(userId);
            }

            await Task.CompletedTask;
        }

        private async Task<string> FetchMarketData(string streamId)
        {
            // Replace this with logic to fetch actual market data
            // from a reliable source based on the provided streamId
            await Task.Delay(100); // Simulate data fetch
            return "Market Data for " + streamId + ": " + new Random().Next(1000).ToString();
        }


    }
}