Thursday 3 March 2016

Scalable Querying multiple Azure DocumentDB databases and collections - quick blog

Azure DocumentDB is highly scalable - it seems that every 10GB or so you need a new "Collection".  My guess is each Collection gets you a new VM so you get scale!
So - if you want to run a query you need to do a request to all your collections (http) and then put all the answers together.
So - this service will do it for you
http://documentdbservices.azurewebsites.net/api/Document/?endpointUrl={YOUR ENDPOINT}&authorizationKey={YOUR KEY}&query={YOUR QUERY}

You must url encode the parameters.  I used this service - http://www.url-encode-decode.com/.
Actually - I only had to encode the authorizationKey - it contains a fair amount of special characters - everything else the browser was able to take care of.

I'll be publishing a proper web interface and maybe even putting the code in GIT over the next couple of weeks but for those who can't wait for the excitement - here's a brief how it works.

The crux of it

The class below does all the work.  
Couple of attributes store the url and key for connection and I provide instance and static methods (the instance methods using the connection details).  This should allow a user of the class to cache instances/connections to reduce overhead.

The real parallelism happens around 

 foreach (Database item in databases)
            {
                IEnumerable<DocumentCollection> dcollections = client.CreateDocumentCollectionFeedReader(item.CollectionsLink);
                foreach (DocumentCollection dc in dcollections)
                {
                    QueryDocumentCollection(query, client, retStack, tasks, item, dc);
                }
            }
            Task.WaitAll(tasks.ToArray()); 
Which calls query document collection for each collection in all the databases and waits for all of the queries to complete.
The QueryDocumentCollection calls the ExecuteQuery which creates the http request and then creates a "continuation" task to process the result.
The continuation task is added to the tasks that the above method is waiting on - the  Task.WaitAll(tasks.ToArray()) .

            Task<IQueryable<dynamic>> task = new Task<IQueryable<dynamic>>(() => ExecuteQuery(client,
                item,
                dc,
                query
                ));
            Task continuation = task.ContinueWith((prevTask) => AddCollection(retStack, prevTask));
            tasks.Add(continuation);
            task.Start();
So - each http request - is fired off async allowing the query to run in parallel across all the collections.
The ConcurrentStack<Document> is used to put all the results back together in a thread safe way.



===============the full class =====================
  /// <summary>
    /// This class provides utilities to run queries accross multiple database
    /// and subscriptions.
    /// </summary>
    public class DistributedQueryUtils
    {
        public Uri EndpointUrl { get; set; }

        public String AuthorizationKey { get; set; }
        private DocumentClient GetDocumentClient()
        {
            var client = new DocumentClient(EndpointUrl, AuthorizationKey);
            return client;
        }

        static Dictionary<String, Microsoft.Azure.Documents.DocumentCollection> collections = new Dictionary<string, DocumentCollection>();

        public async Task<DocumentCollection> GetDocumentCollectionAsync(DocumentClient client, Database database, String collection)
        {
            DocumentCollection documentCollection = null;
            // get the week number
            // check to see if we've got it
            lock (collections)
            {
                if (collections.ContainsKey(collection))
                {
                    documentCollection = collections[collection];
                }
            }
            if (null == documentCollection)
            {
                documentCollection = client.CreateDocumentCollectionQuery("dbs/" + database.Id).Where(c => c.Id == collection).AsEnumerable().FirstOrDefault();
                // If the document collection does not exist, create a new collection
                if (documentCollection == null)
                {
                    documentCollection = await client.CreateDocumentCollectionAsync("dbs/" + database.Id,
                        new DocumentCollection
                        {
                            Id = collection
                        });

                }
                lock (documentCollection)
                {
                    if (collections.ContainsKey(collection))
                    {
                        collections[collection] = documentCollection;
                    }
                    else
                    {
                        collections.Add(collection, documentCollection);
                    }
                }
            }

            return documentCollection;

        }

        public IList<Document> QueryAllDatabase(String query)
        {
            IList<Document> docs = QueryAllDatabases(query, GetDocumentClient());

            return docs;
        }
        public static IList<Document>  QueryAllDatabases(String query,
            DocumentClient client)
        {
            IList<Document> docs = QueryAllDocumentCollections(null, query, client);
           
            return docs;
        }

        public IList<Document> QueryAllDocumentCollections(String databaseName,
           String query)
        {
            return QueryAllDocumentCollections(databaseName,
                query,
                GetDocumentClient());
        }

        /// <summary>
        /// Query all document collections in the database name passed in OR 
        /// null for all databases.
        /// </summary>
        /// <param name="databaseName"></param>
        /// <param name="query"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        public static IList<Document> QueryAllDocumentCollections(String databaseName, 
            String query, 
            DocumentClient client)
        {

            // get the collection of documents to look at 
            IEnumerable<Database> databases;
            List<Document> ret = new List<Document>();
            // for thread safety
            ConcurrentStack<Document> retStack = new ConcurrentStack<Document>();
            // get the database matching the name or all
            if (null != databaseName)
            {
                databases = client.CreateDatabaseQuery().Where(db => db.Id == databaseName).AsEnumerable();
            }
            else
            {
                databases = client.CreateDatabaseQuery();
            }
            List<Task> tasks = new List<Task>();
            // create a query for each collection on each database
            foreach (Database item in databases)
            {

                IEnumerable<DocumentCollection> dcollections = client.CreateDocumentCollectionFeedReader(item.CollectionsLink);
                foreach (DocumentCollection dc in dcollections)
                {
                    QueryDocumentCollection(query, client, retStack, tasks, item, dc);

                }
            }
            Task.WaitAll(tasks.ToArray());
            ret.AddRange(retStack);
            return ret;
        }

        /// <summary>
        /// Query thge document collection -
        /// fill the retStack with the Documents matching the query.
        /// The operation is not complete until the list of tasks has finished
        /// - yopu need to Task.WaitAll(tasks.ToArray());
        /// </summary>
        /// <param name="query"></param>
        /// <param name="client"></param>
        /// <param name="retStack"></param>
        /// <param name="tasks"></param>
        /// <param name="item"></param>
        /// <param name="dc"></param>
        private static void QueryDocumentCollection(String query,
            DocumentClient client,
            ConcurrentStack<Document> retStack,
            List<Task> tasks,
            Database item,
            DocumentCollection dc)
        {
            Task<IQueryable<dynamic>> task = new Task<IQueryable<dynamic>>(() => ExecuteQuery(client,
                item,
                dc,
                query
                ));
            Task continuation = task.ContinueWith((prevTask) => AddCollection(retStack, prevTask));
            tasks.Add(continuation);
            task.Start();
        }
        static IQueryable<dynamic> ExecuteQuery(DocumentClient client,
            Database db,
            DocumentCollection dc,
            String query)
        {
            return client.CreateDocumentQuery("dbs/" + db.Id + "/colls/" + dc.Id
                   , query);
        }

        static void AddCollection(ConcurrentStack<Document> retDocs,
            Task<IQueryable<dynamic>> docsToAdd)
        {
            if (!docsToAdd.IsFaulted)
            {
                foreach (Document item in docsToAdd.Result.AsEnumerable())
                {
                    retDocs.Push(item);

                }

            }
            else
            {
                throw docsToAdd.Exception;
            }
        }
    }

Web API code

     public HttpResponseMessage Get(String endpointUrl,
            String authorizationKey,
            String query)
        {
                DistributedQueryUtils dbQuery = new DistributedQueryUtils()
            {
                AuthorizationKey = authorizationKey,
                EndpointUrl = new Uri(endpointUrl)
            };
         
            IList<Document> res = dbQuery.QueryAllDatabase(query);
            StringBuilder builder = new StringBuilder();
         
                    foreach (Document d in res)
                    {
                        /*
                        d.SaveTo(ms,
                            SerializationFormattingPolicy.Indented);
                        */
                        builder.Append(d);
                    }

            var response = this.Request.CreateResponse(HttpStatusCode.OK);
            response.Content = new StringContent(builder.ToString(), Encoding.UTF8, "application/json");
     
            return response;
       
        }
Nothing too rocket science here - idealy I would have liked to write the Documents directly to the http response stream rather than making a big string but I gave up after a while.
Also - I will probably cache the DistributedQueryUtils instances to reduce http requests and add in overrides to search just a specific datbase or container.

No comments:

Post a Comment