Sunday, 19 June 2016

Azure Stream Analytics Jobs and Reference data

If you are using Azure Stream Analytics Jobs and are utilizing "Reference Data" streams (Which are essentially a blob storage for now), then you may run into following scenario.

If your data repository is other than Azure Blob storage (which has a very high chance because Blob storage is not for that purpose) and you need to reference part of that data in your Azure Stream Analytics Jobs and if that reference data is slowly changing, then you need to synchronize data across the primary data store and Azure Blob Storage. There are multiple articles that can help you achieve that like this. However there are some finer points:

1. Though most of the articles on the web seem to indicate that reference data can be indicated to have changed at "minute" time grain level, I have not been able to get that working. New Azure portal (portal.azure.com) does not even show "minute" as an option to specify in Blob storage path. The old portal (manage.windowsazure.com) does show the option but if you try to save your input source, it fails :). Your best bet for time grain is "hour".

2. You may think of using Azure Data Copy (in case your primary data source is supported as input), however there are couple of gotchas:

a. Azure Data Copy activity creates the output file with a naming convention   that does not play well with Azure Stream Analytics Jobs. You can not specify the destination file name in Azure Data Copy activity - it likes to create a file named "data_{guid}" whereas Azure Stream Analytics Job is looking for a fixed file name as reference data stream.

b. If you let your synchronization process run every "UTC hour", then your Azure Stream Analytics Jobs will always be ahead of the synchronization process and therefore it will add delays to data refresh. Try completing your synchronization process before it strikes next hour on UTC clock. I personally liked using 15 min synchronization.

So that is that. Hope it helps.

Sunday, 12 June 2016

"Cannot have more than 5 receivers" - Azure Stream Analytics Jobs

If you try to write a little large or complex Azure Stream Analytics Job, whether due to logic requirement or whether you are trying to minimize the number of jobs to manage, there is a high chance that you will be required to utilize the UNION operator in your query so that you can club outputs. Well, that is a good option but there are certain restrictions around it.

Link#1 explains such issue. If you use EventHub as an input to the Azure Stream Analytics Job then you can not have more than 5 input streams.

Consider the below query as an example:

WITH Result1
(
   SELECT Min(S.Val) AS Col1,
               Max(S.Val) AS Col2,
               Avg(S.Val) Col3,
               S.Name AS Col4,
               S.Id AS Col5,
               S.F1 AS Col6,
               S.F2 AS Col7
   From
          InputStream S
   TIMESTAMP BY S.[TimeStamp]
   GROUP BY
          S.Name, S.Id, S.F1, S.F2, SlidingWindow(Min, 5)
),
Result2
(
   SELECT S1.Col4, 'Name' as 'Name'
   FROM Result1 S1
   UNION
   SELECT S1.Col5, 'ID' AS 'Id'
   FROM Result1 S1
   UNION
   SELECT S1.Col6, 'F1' AS 'Name'
   FROM Result1 S1
   UNION
   SELECT S1.Col7, 'F2' AS 'Name'
   FROM Result1 S1
   UNION
   SELECT S1.Col1, 'Min' AS 'Min'
   FROM Result1 S1
   UNION
   SELECT S1.Col2, 'Max' AS 'Max'
   FROM Result1 S1
   UNION
   SELECT S1.Col3, 'Avg' AS 'Avg'
   FROM Result1 S1
)
SELECT * INTO OutputStream FROM Result2

If this query utilizes EventHub as an Input source, this query will have problem in starting.

One possible solution is to break the Result2 into 2-3 queries and have their outputs sent to different output streams. There is no restriction (as yet) that stops you to add one output destination multiple times with different names. Let us check that:

WITH Result1
(
   SELECT Min(S.Val) AS Col1,
               Max(S.Val) AS Col2,
               Avg(S.Val) Col3,
               S.Name AS Col4,
               S.Id AS Col5,
               S.F1 AS Col6,
               S.F2 AS Col7
   From
          InputStream S
   TIMESTAMP BY S.[TimeStamp]
   GROUP BY
          S.Name, S.Id, S.F1, S.F2, SlidingWindow(Min, 5)
),
Result2
(
   SELECT S1.Col4, 'Name' as 'Name'
   FROM Result1 S1
   UNION
   SELECT S1.Col5, 'ID' AS 'Id'
   FROM Result1 S1
   UNION
   SELECT S1.Col6, 'F1' AS 'Name'
   FROM Result1 S1
   UNION
   SELECT S1.Col7, 'F2' AS 'Name'
   FROM Result1 S1  
),
Result3
(

  SELECT S1.Col1, 'Min' AS 'Min'
   FROM Result1 S1
   UNION
   SELECT S1.Col2, 'Max' AS 'Max'
   FROM Result1 S1
   UNION
   SELECT S1.Col3, 'Avg' AS 'Avg'
   FROM Result1 S1
)
SELECT * INTO OutputStream1 FROM Result2
SELECT * INTO OutputStream2 FROM Result3

That's one of the optional ways. Of course, it would be nicer if Azure Stream Analytics Jobs don't have such restrictions.






Sunday, 5 June 2016

Searching JSON objects

With the increased usage of JSON format to describe data for various purpose (storage like Azure Document DB, Mongo DB, Raven DB etc., exchange of data over the wire like WebApi responses), you would inevitably run into scenarios where you need to perform actions like search data, aggregate data etc., that you have been performing on other data formats like XML.

JSON.NET gives you a great library to perform a subset of such operations e.g. searching your JSON object.

Here is an example:

var objString = @"{
                ""id"":""testId"",
                ""data"":[
                    {
                        ""name"":""item1""
                    },
                    {
                        ""name"":""item2""
                    },
                ],
                ""data1"":{
                    ""id"": ""data1"",
                    ""value"":{
                        ""child"": ""data1child"",
                        ""name"":""data1child""
                    }
                }
            }";

            Newtonsoft.Json.Linq.JObject jobject = Newtonsoft.Json.Linq.JObject.Parse(objString);
            foreach (Newtonsoft.Json.Linq.JToken token in jobject.SelectTokens("$..name"))
            {
                Console.WriteLine(token);
            }


The result would be:

item1
item2
data1child


Let's check out its performance over a large object. I added below lines to the above code.

var itemString = @"{
                        ""name"":""item{0}""
                    }";
            for(int i = 0; i < 10000; i++)
            {
                jobject["data" + i.ToString()] = Newtonsoft.Json.Linq.JObject.Parse(itemString.Replace("{0}", i.ToString()));
            }

            Stopwatch sw = new Stopwatch();
            sw.Start();
            var searchResults = jobject.SelectTokens("$..name");
            sw.Stop();
            Console.WriteLine(sw.ElapsedMilliseconds);


The result is less than 1 millisecond. Not bad!!