Using NetAdvantage jQuery Chart, Infragistics Motion Framework and SignalR to Display Live Twitter Rates

Graham Murray / Friday, November 4, 2011

Not only can the Infragistics NetAdvantage jQuery Chart display live data streams and high volume data, but it also supports the Infragistics Motion Framework™ and helps you display changes to data over time. In this post I will show you how you can connect to the Twitter Streaming API and record the tweet rates of various keywords, and then use SignalR to push this data down to the chart. First we will see how the chart reacts normally, and then how it can use the Motion Framework to animation changes to the data.

Setting up the Project

Start by creating an ASP.NET MVC 3 Web Application from the New Project menu.


Select Internet Application, View engine: Razor and hit OK.
In one of my previous posts on the chart, we used the chart’s MVC API to configure it, but for this post I’ll show you how you can create and update the chart using its jQuery Widget API, so we can skip adding the reference to Infragistics.Web.Mvc.
The project template for an MVC 3 Internet Application will have already created a folder named Content and a folder named Scripts in your project. Next, we need to add the appropriate NetAdvantage jQuery bits to these folders.
If you used the default install path for the product, you should find the css files to add to the Content\themes Folder at:
C:\Program Files (x86)\Infragistics\NetAdvantage 2011.2\jQuery\themes\min
And the requisite script files to add to the Scripts\IG folder at:
C:\Program Files (x86)\Infragistics\NetAdvantage 2011.2\jQuery\js\combined\min


Now we can add the requisite script references to _Layout.cshtml to be able to use the Chart from any of the views in the application:

<!-- Required for Chart -->  
    <link href="@Url.Content("~/Content/themes/base/jquery-ui.min.css")" rel="stylesheet" type="text/css" />  
    <link href=""@Url.Content("~/Content/themes/ig/jquery.ui.custom.min.css")" rel="stylesheet" type="text/css" />  
    <script src="@Url.Content("~/Scripts/jquery-ui-1.8.11.min.js")" type="text/javascript"></script>  
    <link href="@Url.Content("~/Content/themes/base/ig.ui.chart.min.css")" rel="stylesheet" type="text/css" />  
    <script src="@Url.Content("~/Scripts/IG/ig.ui.chart.min.js")" type="text/javascript"></script> 

 

Setting Up SignalR

We are going to use SignalR to push data about the tweet rates down to the client, so let’s setup SignalR in the project next.
SignalR is a cool library for ASP.NET and JavaScript for enabling neat real time web application scenarios. SignalR is available on NuGet so do a right click on the project node and select Manage NuGet Packages.
In the resulting Dialog, select Online and then enter SignalR in the search field. Select SignalR and then hit Install.


SignalR requires a more recent version of jQuery than is included in an MVC project by default, so it will add jquery-1.6.4.js to the Scripts folder, and will likely update jQuery UI to 1.8.16, so we need to replace the existing references to jQuery in _Layout.cshtml with:

<script src="@Url.Content("~/Scripts/jquery-1.6.4.min.js")" type="text/javascript"></script>  
<script src="@Url.Content("~/Scripts/jquery-ui-1.8.16.min.js")" type="text/javascript"></script>  

And then also add the reference to SignalR and the SignalR hubs:

<script src="@Url.Content("~/Scripts/jquery.signalR.min.js")" type="text/javascript"></script>  
<script src="@Url.Content("~/signalr/hubs")" type="text/javascript"></script>

 

Setting Up The Client

Now that we have SignalR set up, we can declare the client side of this piece. Replace the content of Index.cshtml with this:

<script>
    $(function () {
        var tweetMeterHub = $.connection.tweetMeterHub, joined = false, tweetData = [], currMax = 100, maxOutOfDate = 0;

        tweetMeterHub.rateUpdate = function (rateMessage) {
            var items = rateMessage.Rates, output = "", first = true, maxRate = 0;
            $.each(items, function (i, item) {
                if (first) {
                    first = false;
                } else {
                    output += ", ";
                }
                output += item.Keyword + " = " + item.Rate.toString();
            });

            for (var i = 0; i < tweetData.length; i++) {
                var key = tweetData[i].label;
                $.each(items, function (i, item) {
                    if (item.Keyword == key.toLowerCase()) {
                        maxRate = Math.max(maxRate, item.Rate);
                        tweetData[i].rate = item.Rate;
                    }
                });
            }

            if (maxRate > currMax) {
                currMax += 100;
                $('#chart1').igDataChart("option", "axes", [{ name: "yAxis", maximumValue: currMax}]);
            } else if (currMax - maxRate > 100) {
                maxOutOfDate++;
                if (maxOutOfDate > 5) {
                    maxOutOfDate = 0;
                    currMax -= 100;
                    $('#chart1').igDataChart("option", "axes", [{ name: "yAxis", maximumValue: currMax}]);
                }
            }

            $('#chart1').igDataChart("notifyClearItems", "tweetMeter");
            $('#infoBoxContent').prepend('<li>' + output + '</li>');
        };

        $("#addKeyword").click(function () {
            tweetData.push({ label: $('#keyword').val(), rate: 0 });
            $("#chart1").igDataChart("notifyInsertItem", "tweetMeter", tweetData.length - 1, tweetData[tweetData.length - 1]);

            if (!joined) {
                joined = true;
                tweetMeterHub.join();
            }
            tweetMeterHub.addKeyword($('#keyword').val());
        });

        $("#removeKeyword").click(function () {
            var removed = null;
            for (var i = 0; i < tweetData.length; i++) {
                if (tweetData[i].label == $("#keyword").val()) {
                    removed = tweetData[i];
                    tweetData.splice(i, 1);
                    break;
                }
            }
            if (removed) {
                $("#chart1").igDataChart("notifyRemoveItem", "tweetMeter", i, removed);
            }

            if (!joined) {
                joined = true;
                tweetMeterHub.join();
            }
            tweetMeterHub.removeKeyword($('#keyword').val());
        });

        $("#chart1").igDataChart({
            width: "500px",
            height: "500px",
            dataSource: tweetData,
            axes: [{
                name: "xAxis",
                type: "categoryX",
                label: "label",
                labelExtent: 50
            }, {
                name: "yAxis",
                type: "numericY",
                minimumValue: 0,
                maximumValue: 100
            }],
            series: [{
                name: "tweetMeter",
                type: "column",
                xAxis: "xAxis",
                yAxis: "yAxis",
                title: "Tweet Meter",
                valueMemberPath: "rate"
            }]
        });

        $.connection.hub.start();
    });
</script>

<div id="chart1"></div>
<div id="infoBoxContainer" class="ui-widget-content ui-corner-all" style='width: 500px; height: 200px; overflow: auto'>
    <ul id='infoBoxContent'>

    </ul>
</div>

<input type='text' id='keyword' />
<input type='button' id='addKeyword' value='Add Keyword' />
<input type='button' id='removeKeyword' value='Remove Keyword' />

Let’s examine this a bit at a time to see what’s going on. First, we have:

var tweetMeterHub = $.connection.tweetMeterHub, joined = false, tweetData = [], currMax = 100, maxOutOfDate = 0;

        tweetMeterHub.rateUpdate = function (rateMessage) {
            var items = rateMessage.Rates, output = "", first = true, maxRate = 0, i, key;
            $.each(items, function (i, item) {
                if (first) {
                    first = false;
                } else {
                    output += ", ";
                }
                output += item.Keyword + " = " + item.Rate.toString();
            });

            for (i = 0; i < tweetData.length; i++) {
                key = tweetData[i].label;
                $.each(items, function (i, item) {
                    if (item.Keyword == key.toLowerCase()) {
                        maxRate = Math.max(maxRate, item.Rate);
                        tweetData[i].rate = item.Rate;
                    }
                });
            }

            if (maxRate > currMax) {
                currMax += 100;
                $('#chart1').igDataChart("option", "axes", [{ name: "yAxis", maximumValue: currMax}]);
            } else if (currMax - maxRate > 100) {
                maxOutOfDate++;
                if (maxOutOfDate > 5) {
                    maxOutOfDate = 0;
                    currMax -= 100;
                    $('#chart1').igDataChart("option", "axes", [{ name: "yAxis", maximumValue: currMax}]);
                }
            }

            $('#chart1').igDataChart("notifyClearItems", "tweetMeter");
            $('#infoBoxContent').prepend('<li>' + output + '</li>');
        };

This is a method that we are going to use SignalR to call from the server. To it we will pass a list of keywords paired with the number of times they are used in the last minute on twitter (actually, we are measuring how often they come through on the streaming API, which may be a bit filtered to ensure quality statuses). This method logs the rates of the keywords it receives as text, and then it updates the rates in the tweetData array with the new rate values. If the scale of the values has changed enough, it will adjust the maximumValue of the yAxis in the chart. Also, notice, at the end, we are notifying the chart that all the data values its array have changed:

$('#chart1').igDataChart("notifyClearItems", "tweetMeter");

This will cause it to examine the new values and update the visual of the chart.
Next, we have:

$("#addKeyword").click(function () {
            tweetData.push({ label: $('#keyword').val(), rate: 0 });
            $("#chart1").igDataChart("notifyInsertItem", "tweetMeter", tweetData.length - 1, tweetData[tweetData.length - 1]);

            if (!joined) {
                joined = true;
                tweetMeterHub.join();
            }
            tweetMeterHub.addKeyword($('#keyword').val());
        });

        $("#removeKeyword").click(function () {
            var removed = null, i = 0;
            for (i = 0; i < tweetData.length; i++) {
                if (tweetData[i].label == $("#keyword").val()) {
                    removed = tweetData[i];
                    tweetData.splice(i, 1);
                    break;
                }
            }
            if (removed) {
                $("#chart1").igDataChart("notifyRemoveItem", "tweetMeter", i, removed);
            }

            if (!joined) {
                joined = true;
                tweetMeterHub.join();
            }
            tweetMeterHub.removeKeyword($('#keyword').val());
        });

Here we are defining two handlers for buttons that will add a keyword to what we are tracking in the chart, or remove a keyword from what we are tracking in the chart. They add or remove the new keyword to the tweetData array, which is bound to the chart. Then we notify the chart of how the array has changed.
We also call the join method on the server, and either call addKeyword or removeKeyword to tell the server to add or remove the keyword from the list of keywords it is streaming tweets for from the twitter servers.
Next, we have the chart definition:

$("#chart1").igDataChart({
            width: "500px",
            height: "500px",
            dataSource: tweetData,
            axes: [{
                name: "xAxis",
                type: "categoryX",
                label: "label",
                labelExtent: 50
            }, {
                name: "yAxis",
                type: "numericY",
                minimumValue: 0,
                maximumValue: 100
            }],
            series: [{
                name: "tweetMeter",
                type: "column",
                xAxis: "xAxis",
                yAxis: "yAxis",
                title: "Tweet Meter",
                valueMemberPath: "rate"
            }]
        });

This is similar enough to how we defined the chart in my last post that you should refer to it for an explaination of the above. The salient difference here, is that we have set type to “column” in order to get a column chart.

$.connection.hub.start();

Here, we are telling SignalR to start up the hub connections.
And finally:

<div id="chart1"></div>
<div id="infoBoxContainer" class="ui-widget-content ui-corner-all" style='width: 500px; height: 200px; overflow: auto'>
    <ul id='infoBoxContent'>

    </ul>
</div>

<input type='text' id='keyword' />
<input type='button' id='addKeyword' value='Add Keyword' />
<input type='button' id='removeKeyword' value='Remove Keyword' />

Here we are defining the DOM element for the chart. And a box to log our textual rate values into. And buttons and a text input to let us add and remove keywords from the chart.

Defining the Server Side

Now that we have the client completely set up, we just need to implement the SignalR hub that we have been assuming is present. This hub needs to let clients add and remove keywords that we are pulling from the streaming twitter feed, and needs to be able to connect to the streaming Twitter API and be pushed the stream of status updates.
Create a class called TweetMeterHub, and add this content to the file:

public class TweetMeterHub
        : Hub, IDisconnect
    {
        private static Dictionary<string, User> _connectedClients = 
            new Dictionary<string, User>();
        private static Dictionary<string, KeywordInfo> _allKeywords =
            new Dictionary<string, KeywordInfo>();
        private static object _stateLock = new Object();
        private static bool _running = false;
        private static bool _shouldStop = false;
        private static bool _shouldRestart = false;
        private static bool _reportRunning = false;

        private void AssertReportRunning()
        {
            lock (_stateLock)
            {
                if (_reportRunning)
                {
                    return;
                }

                if (ThreadPool.QueueUserWorkItem(_ =>
                {
                    Report();
                }))
                {
                    _reportRunning = true;
                }
            }
        }

        private void AssertReportStopped()
        {
            lock (_stateLock)
            {
                if (!_reportRunning)
                {
                    return;
                }

                _reportRunning = false;
            }
        }

        private void Report()
        {
            while (true)
            {
                List<RateMessage> toSend = new List<RateMessage>();
                lock (_stateLock)
                {
                    if (!_reportRunning)
                    {
                        break;
                    }

                    foreach (var user in _connectedClients.Values)
                    {
                        var m = new RateMessage();
                        m.ClientID = user.ID;
                        m.Rates = new RateInfo[user.TrackedKeywords.Count];
                        int index = 0;
                        foreach (var word in user.TrackedKeywords)
                        {
                            RateInfo info = new RateInfo();
                            info.Keyword = word;
                            info.Rate = 0;

                            KeywordInfo val;
                            if (_allKeywords.TryGetValue(word, out val))
                            {
                                info.Rate = val.TweetsPerMinute;
                            }
                            m.Rates[index] = info;
                            index++;
                        }
                        toSend.Add(m);
                    }
                }

                foreach (var item in toSend)
                {
                    Clients[item.ClientID].rateUpdate(item);
                }

                Thread.Sleep(2000);
            }
        }

        private void AssertRunning()
        {
            lock (_stateLock)
            {
                if (_running)
                {
                    return;
                }

                if (ThreadPool.QueueUserWorkItem(_ =>
                {
                    Process();
                }))
                {
                    Debug.WriteLine("running");
                    _running = true;
                }
            }
        }

        private void AssertStopped(bool shouldRestart)
        {
            Debug.WriteLine("asserting stopped");
            lock (_stateLock)
            {
                _shouldRestart = shouldRestart;
                if (!_running)
                {
                    return;
                }
                Debug.WriteLine("indicating should stop");
                _shouldStop = true;
            }
        }

        private void Process()
        {
            Debug.WriteLine("calling process");
            while (true)
            {
                ListenForTweets();
                lock (_stateLock)
                {
                    if (_running)
                    {
                        continue;
                    }

                    if (_shouldRestart)
                    {
                        _shouldRestart = false;
                        _running = true;
                    }
                    else
                    {
                        break;
                    }
                }
            }
        }

        private string GetFilters()
        {
            lock (_stateLock)
            {
                string filters = "track=";
                bool first = true;
                foreach (var item in _allKeywords.Values)
                {
                    if (first)
                    {
                        first = false;
                    }
                    else
                    {
                        filters += ",";
                    }

                    filters += item.Keyword;
                }

                return filters;
            }
        }

        private void ListenForTweets()
        {
            var url = "https://stream.twitter.com/1/statuses/filter.json";
            var filters = GetFilters();
            var filtersData = Encoding.UTF8.GetBytes(filters);
            var request = WebRequest.Create(url);
            request.Method = "POST";
            request.ContentLength = filtersData.Length;
            request.ContentType = "application/x-www-form-urlencoded";
            var stream = request.GetRequestStream();
            stream.Write(filtersData, 0, filtersData.Length);
            stream.Close();

            var serializer = new DataContractJsonSerializer(typeof(TweetInfo));

            request.Credentials = new NetworkCredential("[FILL IN USER NAME]", "[FILL IN PASSWORD]");
            Debug.WriteLine("starting twitter pull");
            request.BeginGetResponse((resp) =>
            {
                var state = (WebRequest)resp.AsyncState;
                using (var response = state.EndGetResponse(resp))
                {
                    using (var reader = new StreamReader(response.GetResponseStream()))
                    {
                        while (!reader.EndOfStream)
                        {
                            lock (_stateLock)
                            {
                                if (_shouldStop)
                                {
                                    Debug.WriteLine("stopping twitter pull");
                                    _shouldStop = false;
                                    _running = false;
                                    request.Abort();
                                    response.Close();
                                    reader.Close();
                                    return;
                                }
                            }

                            var line = reader.ReadLine();
                            if (line.Trim().Length < 1)
                            {
                                continue;
                            }
                            TweetInfo info = null;
                            using (MemoryStream jsonStream = new MemoryStream(Encoding.Unicode.GetBytes(line)))
                            {
                                info = (TweetInfo)serializer.ReadObject(jsonStream);
                            }

                            Debug.WriteLine(info.Text);

                            lock (_stateLock)
                            {
                                foreach (var item in _allKeywords.Values)
                                {
                                    if (info.Text.ToLower().Contains(item.Keyword))
                                    {
                                        var now = DateTime.Now;
                                        item.TweetTimes.Enqueue(now);
                                        while (item.TweetTimes.Peek() < now.Subtract(TimeSpan.FromMinutes(1)))
                                        {
                                            item.TweetTimes.Dequeue();
                                        }
                                        item.TweetsPerMinute = item.TweetTimes.Count;
                                    }
                                }
                            }
                        }
                    }
                }
            }, request);
        }

        public void Join()
        {
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    Disconnect();
                }
                user = new User() { ID = id, TrackedKeywords = new List<string>() };
                _connectedClients.Add(user.ID, user);
            }
        }

        public void AddKeyword(string keyword)
        {
            keyword = keyword.ToLower();
            lock (_stateLock)
            {
                bool shouldStart = false;
                if (_allKeywords.Count == 0 && !_running)
                {
                    shouldStart = true;
                }

                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    if (!user.TrackedKeywords.Contains(keyword))
                    {
                        user.TrackedKeywords.Add(keyword);
                        KeywordInfo info = null;
                        if (!_allKeywords.TryGetValue(keyword, out info))
                        {
                            info = new KeywordInfo();
                            info.Keyword = keyword;
                            info.TweetsPerMinute = 0;
                            info.RefCount = 0;
                            _allKeywords.Add(keyword, info);
                        }

                        info.RefCount++;

                        if (shouldStart)
                        {
                            AssertRunning();
                            AssertReportRunning();
                        }
                        else
                        {
                            AssertStopped(true);
                        }
                    }
                }
            }
        }

        public void RemoveKeyword(string keyword)
        {
            keyword = keyword.ToLower();
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    if (user.TrackedKeywords.Contains(keyword))
                    {
                        user.TrackedKeywords.Remove(keyword);
                        KeywordInfo info = null;
                        if (_allKeywords.TryGetValue(keyword, out info))
                        {
                            info.RefCount--;
                            if (info.RefCount == 0)
                            {
                                _allKeywords.Remove(keyword);
                            }
                        }
                    }
                }

                if (_allKeywords.Count == 0)
                {
                    AssertStopped(false);
                }
                else
                {
                    AssertStopped(true);
                }
            }
        }

        public void Disconnect()
        {
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;

                if (_connectedClients.TryGetValue(id, out user))
                {
                    RemoveUserItems(user);
                    _connectedClients.Remove(id);
                }

                if (_connectedClients.Count == 0)
                {
                    AssertStopped(false);
                    AssertReportStopped();
                }
            }
        }

        private void RemoveUserItems(User user)
        {
            lock (_stateLock)
            {
                foreach (var item in user.TrackedKeywords)
                {
                    KeywordInfo info = null;
                    if (_allKeywords.TryGetValue(item, out info))
                    {
                        info.RefCount--;
                        if (info.RefCount == 0)
                        {
                            _allKeywords.Remove(item);
                        }
                    }
                }

                if (_allKeywords.Count == 0)
                {
                    AssertStopped(false);
                }
                else
                {
                    AssertStopped(true);
                }
            }
        }
    }

    public class User
    {
        public User()
        {
            TrackedKeywords = new List<string>();
        }

        public string ID { get; set; }
        public List<string> TrackedKeywords { get; set; }
    }

    public class KeywordInfo
    {
        public KeywordInfo()
        {
            TweetTimes = new Queue<DateTime>();
        }

        public string Keyword { get; set; }
        public int TweetsPerMinute { get; set; }
        public Queue<DateTime> TweetTimes { get; set; }
        public int RefCount { get; set; }
    }

    [DataContract]
    public class TweetInfo
    {
        [DataMember(Name = "id")]
        public long Id { get; set; }

        [DataMember(Name = "text")]
        public string Text { get; set; }
    }

    public class RateMessage
    {
        public string ClientID { get; set; }
        public RateInfo[] Rates { get; set; }
    }

    public class RateInfo
    {
        public string Keyword { get; set; }
        public int Rate { get; set; }
    }

You will need to add a reference to System.Runtime.Serialization and these includes:

using SignalR.Hubs;
using System.Threading;
using System.Diagnostics;
using System.Text;
using System.Net;
using System.Runtime.Serialization.Json;
using System.IO;
using System.Runtime.Serialization;

Now, what on earth is going on in there? Let’s break it down. First we have:

private static Dictionary<string, User> _connectedClients =
new Dictionary<string, User>();
private static Dictionary<string, KeywordInfo> _allKeywords =
new Dictionary<string, KeywordInfo>();
private static object _stateLock = new Object();
private static bool _running = false;
private static bool _shouldStop = false;
private static bool _shouldRestart = false;
private static bool _reportRunning = false;

This is just some variables we need. Notice, we are going to be doing some locking since the Hub can/will be accessed concurrently.

private void AssertReportRunning()
        {
            lock (_stateLock)
            {
                if (_reportRunning)
                {
                    return;
                }

                if (ThreadPool.QueueUserWorkItem(_ =>
                {
                    Report();
                }))
                {
                    _reportRunning = true;
                }
            }
        }

This makes sure that our thread which will report rates every 2 seconds to the user is running.

private void AssertReportStopped()
        {
            lock (_stateLock)
            {
                if (!_reportRunning)
                {
                    return;
                }

                _reportRunning = false;
            }
        }

This makes sure that out thread which will report rates will stop at the earliest opportunity.

private void Report()
        {
            while (true)
            {
                List<RateMessage> toSend = new List<RateMessage>();
                lock (_stateLock)
                {
                    if (!_reportRunning)
                    {
                        break;
                    }

                    foreach (var user in _connectedClients.Values)
                    {
                        var m = new RateMessage();
                        m.ClientID = user.ID;
                        m.Rates = new RateInfo[user.TrackedKeywords.Count];
                        int index = 0;
                        foreach (var word in user.TrackedKeywords)
                        {
                            RateInfo info = new RateInfo();
                            info.Keyword = word;
                            info.Rate = 0;

                            KeywordInfo val;
                            if (_allKeywords.TryGetValue(word, out val))
                            {
                                info.Rate = val.TweetsPerMinute;
                            }
                            m.Rates[index] = info;
                            index++;
                        }
                        toSend.Add(m);
                    }
                }

                foreach (var item in toSend)
                {
                    Clients[item.ClientID].rateUpdate(item);
                }

                Thread.Sleep(2000);
            }
        }

This thread will collect the rates that each user is listening for and report the values to that user every 2 seconds. Our other thread will be recording the rates, this one will just report them by calling:

Clients[item.ClientID].rateUpdate(item);

Which invokes the client side JavaScript method that we defined earlier with the list of rates that we have collected.

private void AssertRunning()
        {
            lock (_stateLock)
            {
                if (_running)
                {
                    return;
                }

                if (ThreadPool.QueueUserWorkItem(_ =>
                {
                    Process();
                }))
                {
                    Debug.WriteLine("running");
                    _running = true;
                }
            }
        }

This makes sure that the thread that collects the twitter rates is running.

private void AssertStopped(bool shouldRestart)
        {
            Debug.WriteLine("asserting stopped");
            lock (_stateLock)
            {
                _shouldRestart = shouldRestart;
                if (!_running)
                {
                    return;
                }
                Debug.WriteLine("indicating should stop");
                _shouldStop = true;
            }
        }

This makes sure that the thread collecting twitter rates stops at the earliest opportunity, and then potentially restarts. Whenever we have a change in the keywords that we are tracking, we are going to stop and restart the request that we are making of Twitter. So in those instances we are going to call AssertStopped(true);

private void Process()
        {
            Debug.WriteLine("calling process");
            while (true)
            {
                ListenForTweets();
                lock (_stateLock)
                {
                    if (_running)
                    {
                        continue;
                    }

                    if (_shouldRestart)
                    {
                        _shouldRestart = false;
                        _running = true;
                    }
                    else
                    {
                        break;
                    }
                }
            }
        }

This is our thread that is going to connect to Twitter and listen for the status updates on our keywords. Notice that we may fall out of here and then restart if the _shouldRestart flag has been set to true.

private string GetFilters()
        {
            lock (_stateLock)
            {
                string filters = "track=";
                bool first = true;
                foreach (var item in _allKeywords.Values)
                {
                    if (first)
                    {
                        first = false;
                    }
                    else
                    {
                        filters += ",";
                    }

                    filters += item.Keyword;
                }

                return filters;
            }
        }

Here we are going to get a tracking string to pass to the Twitter streaming API to tell it which keywords we want tracked. _allKeywords accumulates the full list of keywords that all the users want to track, so that we only use one connection to the streaming API.

private void ListenForTweets()
        {
            var url = "https://stream.twitter.com/1/statuses/filter.json";
            var filters = GetFilters();
            var filtersData = Encoding.UTF8.GetBytes(filters);
            var request = WebRequest.Create(url);
            request.Method = "POST";
            request.ContentLength = filtersData.Length;
            request.ContentType = "application/x-www-form-urlencoded";
            var stream = request.GetRequestStream();
            stream.Write(filtersData, 0, filtersData.Length);
            stream.Close();

            var serializer = new DataContractJsonSerializer(typeof(TweetInfo));

            request.Credentials = new NetworkCredential("[Insert UserName Here]", "[Insert Password Here]");
            Debug.WriteLine("starting twitter pull");
            request.BeginGetResponse((resp) =>
            {
                var state = (WebRequest)resp.AsyncState;
                using (var response = state.EndGetResponse(resp))
                {
                    using (var reader = new StreamReader(response.GetResponseStream()))
                    {
                        while (!reader.EndOfStream)
                        {
                            lock (_stateLock)
                            {
                                if (_shouldStop)
                                {
                                    Debug.WriteLine("stopping twitter pull");
                                    _shouldStop = false;
                                    _running = false;
                                    request.Abort();
                                    response.Close();
                                    reader.Close();
                                    return;
                                }
                            }

                            var line = reader.ReadLine();
                            if (line.Trim().Length < 1)
                            {
                                continue;
                            }
                            TweetInfo info = null;
                            using (MemoryStream jsonStream = new MemoryStream(Encoding.Unicode.GetBytes(line)))
                            {
                                info = (TweetInfo)serializer.ReadObject(jsonStream);
                            }

                            Debug.WriteLine(info.Text);

                            lock (_stateLock)
                            {
                                foreach (var item in _allKeywords.Values)
                                {
                                    if (info.Text.ToLower().Contains(item.Keyword))
                                    {
                                        var now = DateTime.Now;
                                        item.TweetTimes.Enqueue(now);
                                        while (item.TweetTimes.Peek() < now.Subtract(TimeSpan.FromMinutes(1)))
                                        {
                                            item.TweetTimes.Dequeue();
                                        }
                                        item.TweetsPerMinute = item.TweetTimes.Count;
                                    }
                                }
                            }
                        }
                    }
                }
            }, request);
        }

Here’s the method where we are actually creating the long running HTTP connection to the Twitter Streaming API and continually taking data off it to record in our rate counters. We are sending the tracking string to the API via a POST and then iteratively reading off the response stream. Twitter will send empty heartbeat lines to keep the connection alive, that we ignore.
The Streaming API requires you to log on with a twitter username and password so you will have to provide one there. We are using the DataContractJsonSerializer to turn the JSON based status updates into CLR objects to examine them easily.
We only care about the text of the tweet, on which we will just use String.Contains to see if it contains our keyword (remember all the keyword statuses will be coming in on the same stream). For each keyword we simply maintain a buffer of the last minutes worth of tweet times so that we can always answer how many tweets for a particular keyword have come in during the last minute.
The _allKeywords hash is kept up to date with the latest rates, and it is this hash that is referenced by our reporting thread that sends the rates down to the clients every 2 seconds.

public void Join()
        {
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    Disconnect();
                }
                user = new User() { ID = id, TrackedKeywords = new List<string>() };
                _connectedClients.Add(user.ID, user);
            }
        }

This method gets called when a client wants to join the hub. We create an entry in a hash to track which keywords the client wants to listen to.

public void AddKeyword(string keyword)
        {
            keyword = keyword.ToLower();
            lock (_stateLock)
            {
                bool shouldStart = false;
                if (_allKeywords.Count == 0 && !_running)
                {
                    shouldStart = true;
                }

                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    if (!user.TrackedKeywords.Contains(keyword))
                    {
                        user.TrackedKeywords.Add(keyword);
                        KeywordInfo info = null;
                        if (!_allKeywords.TryGetValue(keyword, out info))
                        {
                            info = new KeywordInfo();
                            info.Keyword = keyword;
                            info.TweetsPerMinute = 0;
                            info.RefCount = 0;
                            _allKeywords.Add(keyword, info);
                        }

                        info.RefCount++;

                        if (shouldStart)
                        {
                            AssertRunning();
                            AssertReportRunning();
                        }
                        else
                        {
                            AssertStopped(true);
                        }
                    }
                }
            }
        }

This method gets called when a client wants to add a keyword to the chart. We add it to the keywords for that client, and the _allKeywords hash, and them make sure that the Twitter stream thread either starts for the first time, or is restarted to accommodate the new keyword.

public void RemoveKeyword(string keyword)
        {
            keyword = keyword.ToLower();
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;
                if (_connectedClients.TryGetValue(id, out user))
                {
                    if (user.TrackedKeywords.Contains(keyword))
                    {
                        user.TrackedKeywords.Remove(keyword);
                        KeywordInfo info = null;
                        if (_allKeywords.TryGetValue(keyword, out info))
                        {
                            info.RefCount--;
                            if (info.RefCount == 0)
                            {
                                _allKeywords.Remove(keyword);
                            }
                        }
                    }
                }

                if (_allKeywords.Count == 0)
                {
                    AssertStopped(false);
                }
                else
                {
                    AssertStopped(true);
                }
            }
        }

This method gets called when a client wants to remove a keyword from the chart. It removes the keyword from the tracked keywords for that client, and removes it from the _allKeywords hash. Then it makes sure that the Twitter connection restarts because we have changed the keywords we want to listen for.

public void Disconnect()
        {
            lock (_stateLock)
            {
                string id = Context.ClientId;
                User user = null;

                if (_connectedClients.TryGetValue(id, out user))
                {
                    RemoveUserItems(user);
                    _connectedClients.Remove(id);
                }

                if (_connectedClients.Count == 0)
                {
                    AssertStopped(false);
                    AssertReportStopped();
                }
            }
        }

private void RemoveUserItems(User user)
        {
            lock (_stateLock)
            {
                foreach (var item in user.TrackedKeywords)
                {
                    KeywordInfo info = null;
                    if (_allKeywords.TryGetValue(item, out info))
                    {
                        info.RefCount--;
                        if (info.RefCount == 0)
                        {
                            _allKeywords.Remove(item);
                        }
                    }
                }

                if (_allKeywords.Count == 0)
                {
                    AssertStopped(false);
                }
                else
                {
                    AssertStopped(true);
                }
            }
        }

This gets called when a client disconnects from the hub, so we make sure that clients keywords are removed and the Twitter connection is refreshed.

public class User
    {
        public User()
        {
            TrackedKeywords = new List<string>();
        }

        public string ID { get; set; }
        public List<string> TrackedKeywords { get; set; }
    }

    public class KeywordInfo
    {
        public KeywordInfo()
        {
            TweetTimes = new Queue<DateTime>();
        }

        public string Keyword { get; set; }
        public int TweetsPerMinute { get; set; }
        public Queue<DateTime> TweetTimes { get; set; }
        public int RefCount { get; set; }
    }

    [DataContract]
    public class TweetInfo
    {
        [DataMember(Name = "id")]
        public long Id { get; set; }

        [DataMember(Name = "text")]
        public string Text { get; set; }
    }

    public class RateMessage
    {
        public string ClientID { get; set; }
        public RateInfo[] Rates { get; set; }
    }

    public class RateInfo
    {
        public string Keyword { get; set; }
        public int Rate { get; set; }
    }

These hold the various data that we need. Note that RateMessage and RateInfo are actually automatically JSON serialized by SignalR when we pass them as parameters above (cool!!).
TweetInfo is marked as a DataContract because we are using the DataContractJsonSerializer to Deserialize the tweets.
At this point we are ready to run the application.
Please Note that the above does not necessarily represent production ready code. To avoid using the Twitter Streaming API in potentially disallowed ways, please read through the Twitter Streaming API documentation and make adjustments based on their recommendations before using something like this in production. Also note that the chart is currently a CTP feature.

Running the Sample


Now go ahead and run the sample. Try entering the text “baseball” and then select add keyword. Add “football” as a keyword also.


Note that the values are updating every 2 seconds. Every 2 seconds the value with snap immediately to the next value. We can do better than that, right?
That’s where the Motion Framework comes in. Add this line to the definition of the series in the chart:

transitionDuration: 1800

And now run it again.
The above indicates that transitions in the chart’s shape should take 1800ms to complete. You should see the values smoothly animating to the next ones. Pretty cool, huh? And all it took was one line of code!

You can download the project used in this sample here.
-Graham