BlinkDB Review
Introduction
</h2><div style="text-align:left">BlinkDB is a massively parallel, sample-based approximate query engine for running ad-hoc, interactive SQL queries on large volumes of data. The core idea of BlinkDB is by enabling interactive queries over massive data by running queries on data samples and representing results annotated with meaningful error bars to achieve a trade-off query accuracy for response time. To achieve this, BlinkDB uses two key ideas that differentiate it from previous work in this area: (1) an adaptive optimization framework that builds and maintains a set of multi-dimensional, multi-resolution samples from original data over time, and (2) a dynamic sample selection strategy that selects an appropriately sized sample based on a query’s accuracy and/or response time requirements.</div><div style="text-align:left">When dealing with query orders from users, traditionally but inefficient, data process systems need to scan large fractions of a database to compute the appropriate statistics. However, the response time of the system is proportional to the data scale and also the response time to some extend decide the potential profit. Another problem is uniform samples had poorly performance for skewed distributions and also uniform samples may not contain certain infrequent subgroups which may leads to missing rows in the final output of queries. Facing these problems mentioned above, BlinkDB provides Multi-dimensional, Multi-resolution strategy which make three main contributions as following:</div><div style="text-align:left"><ol><li>Provides faster convergence, minimizes missing results in the output , and provides error/latency guarantees for ad-hoc workloads.</li><li>cast the decision of what stratified samples to build as an optimization problem that takes into account: (i) the skew of the data distribution, (ii) query templates, and (iii) the storage overhead of each sample.</li><li>develop a run-time dynamic sample selection strategy that uses multiple smaller samples to quickly estimate query selectivity and choose the best samples for satisfying the response time and error guarantees.</li></ol>In summary, BlinkDB is a massively parallel query processing system that incorporates these ideas. In this project, I will try to understand the whole design of BlinkDB and try to go through the experiment environments which using both the TPC-H benchmarks and a real-world workload derived from Conviva Inc as given in the paper. Hopefully, the experiments can show multi-dimensional sampling approach, versus just using single dimensional samples can improve query response times and some graphs may given at the last.</div><div style="text-align:left">
</div><div style="text-align:left">
</div><div style="text-align:left">
</div><div>
System Overview
Settings and Assumptions
- Query with Joins.
BlinkDB supports two types of joins (1) Arbitrary joins are allowed whenever there is a stratified sample on one of the join tables that contains the join key its column set. (2) Under the case which in the absence of any suitable stratified sample, the join is still allowed as long as one of the two tables fits in the memory. And the second condition is more common is practice.
<ul><li>Workload Characteristics.</li></ul> Since BlinkDB’s workload is targeted at ad-hoc queries, this paper assume the query templates remain fairly stable over time when choosing which samples to create. What is worth mention is that although BlinkDB creates a set of stratified samples based on past query templates, at runtime, it can still use the set of available samples to answer any query, even if<div>it is not from one of the historical templates.</div></div><div><ul><li>Closed-Form Aggregates.</li></ul> This paper focus on a small set of aggregation operators: COUNT, SUM, MEAN, MEDIAN/QUANTILE. BlinkDB estimate error of these functions using standard estimates of closed-form error. However, closed-form estimates can be easily derived for any combination of these basic aggregates as well as any algebraic function that is mean-like and asymptotically normal.</div><div><ul><li>Offline Sampling.</li></ul> BlinkDB computes samples of input data and reuses them across many queries. It may happen that there is a small but non-zero probability that a given sample may be non-representative of the true data, which may leads in un-meet the requirements on response time or error rate. Rather than existing solutions towards the problems, BlinkDB prefer periodically replace samples with new ones in the background with taking the economical and stability.</div><div><h3>Architecture</h3><div><div style="display:block;text-align:center;margin-right:auto;margin-left:auto"></div><div style="display:block;text-align:center;margin-right:auto;margin-left:auto">Figure 1 Architecture of BlinkDB framework</div><div style="text-align:left;display:block;margin-right:auto;margin-left:auto">As we can see, Figure 1 shows the whole architecture of BlinkDB’s framework, BlinkDB builds on the Apache Hive framework and adds two major components to it: (1) an offline sampling module that creates and maintains samples over time, and (2) a run-time sample selection module that creates an Error-Latency Profile (ELP) for ad-hoc queries. The ELP characterizes the rate at which the error (or response time) decreases (or increases) as the size of the sample on which the query operates increases. This is used to select a sample that best satisfies the user’s constraints. BlinkDB augments the query parser, optimizer, and a number of aggregation operators to allow queries to specify constraints for accuracy, or execution</div><div style="margin-right:auto;margin-left:auto">time.</div></div><h4>Offline Sample Creation and Maintenance</h4><div><div>This part is responsible for creating and maintaining a set of uniform and stratified samples. BlinkDB use uniform samples over the entire dataset to handle queries on groups of columns with relatively uniform distributions, and stratified samples (on one or more columns) to handle queries on groups of columns with less uniform distributions. This component consists of three sub-components:</div><div><ol><li>Offline Sample Creation. Based on statistics collected from the data and historic query templates, BlinkDB computes a set of uniform samples and multiple sets of stratified samples from the underlying data. Intuitively, the optimization framework builds stratified samples over column(s) that are (a) most useful for the query templates in the workload, and (b) most skewed.</li><li>Sample Maintenance. As new data arrives, BlinkDB periodically update the initial set of samples. Its update strategy is designed to minimize performance overhead and avoid service interruption. A monitoring module observes overall system performance, detecting any significant changes in data distribution, and triggers periodic sample replacement, and updates, deletions, or creations of new samples.</li><li>Storage optimizations. In addition to caching samples in memory, to maximize disk throughput, BlinkDB partition each sample into many small files, and leverage the block distribution strategy of HDFS to spread those files across the nodes in a cluster. additionally, we optimize the storage overhead, by recursively building larger samples as a union of smaller samples that are built on the same set of columns.</li></ol><h4>Run-time Sample Selection</h4></div></div></div><div><div>Given a query, BlinkDB select an optimal sample at runtime so as to meet its accuracy or response time constraints. BlinkDB do this by dynamically running the query on smaller samples to estimate the query’s selectivity, error rate, and response time, and then extrapolate to a sample size that will satisfy user-specified error or response time goals. </div></div><div><div style="margin-right:auto;margin-left:auto">
</div><h2>Sample Creation</h2>This part is about the details in creating multi-dimensional. multi-resolution samples.
<h3>Multi-resolution stratified Samples</h3><div>In order to achieve the goal that fast scan of skewed distributions and contain every instance of certain subgroups, I’d like to introduce to use of stratified sampling in BlinkDB. Firstly, we given table 1 as notations of various.</div><div><div style="text-align:center;display:block"></div><div style="text-align:center;display:block">Table 1 notations of terms may used</div><div style="text-align:left;display:block">
</div></div><div>Let be a subset of columns in the original table, T. For any such subset we define a sample family as a sequence of stratified samples over :</div><div> <div style="text-align:center;display:block"></div><div style="text-align:left;display:block">where m is the number of samples in the family. By maintaining multiple stratified samples for the same column subset . we allow a finer granularity tradeoff between query accuracy and response time.</div><div style="text-align:left;display:block">A stratified sample S(, K(i) ) on the set of columns, caps the frequency of every value x in <div style="display:block;text-align:left">to K(i). What is more, tuple x=<x(1), x(2),…., x(k)> where x(i) is the value in column c(i), and let F(, T, x) be the frequency of x in column set . in the original table, T. we design that if F(, T, x)K(i), then means S(, K(i) ) contains all rows containing x in T, so BlinkDB answers a query Q exact as the sample contains all rows from the original table. Otherwise,</div><div style="display:block;text-align:left"><div>if F(, T, x)>K(i). Then S(, K(i) ) contains K(i) randomly chosen rows from T that contains x and the answer with same Q will based on K random rows in the original table. It is worth mention that when there are different samples with different sizes, there is no need to independently allocate storage for each sample. Instead, we can construct smaller samples from the larger ones, and thus need an amount of storage equivalent to maintaining only the largest sample.</div><div>Two Properties.</div><div><ol><ol><li>For a query with response time constraints, the response time of the query running on S(, K’(i) ) is within a factor of c of the response time of the query running on the optimal-sized sample, .</li><li>For a query with error constraints, the standard deviation of the query running on S(, K’(i) ) is within a factor of of the response time of the query running on </li></ol></ol>These two properties are defined to the issue that how “good” is a sample family. Most of the terms are showed at table 1. And be the samllest possible stratified sample on <div style="display:block;text-align:left">that satisfies the error or response time constraints of Q.</div><div style="display:block;text-align:left">Another factor should be taken in consideration is the overhead storage, we may talk it in details later.</div><h3>Optimization Framework</h3></div><div>As we have discussed before, we could say in confidence that stratified samples on multiple columns that are frequently queried together can lead to significant improvements in both query accuracy and latency, especially when the set of columns have a skewed joint distribution. However, these samples lead to an increase in the storage overhead because (1) samples on multiple columns can be larger than single-column samples since multiple columns often contains more unique values than individual columns, and (2) there are an exponential number of subsets of columns, all of which may not fit in our storage budget. So we may want do some optimal work in this part. There factor should taken into account in problem formulation:</div><div><ol><li>Non-uniformity (skew) of the data. Intuitively, the greater the skew for a set of columns, the more important it is to have a stratified sample on those columns. We let D() denote the set of all distinct values appearing in and F(, T, v) be the is the frequency of value v in . Let be a non-uniformity metric on the distribution of the values in . We define the notion of non-uniformity as:</li><div style="text-align:center"></div><div> Where K represents the cap corresponding to the largest sample in the family S(,K).</div><li>Workload. BlinkDB use a query workload define as a set of m query templates and their weights: where 0<w(i)<1 is the weight of the ith query template and is the set of columns appearing in the ith template’s WHERE and GROUP BY clauses.</li><li>Storage cost. In this paper, the author use Store(.) to denote the storage cost of building a sample family on a set of columns ..</li></ol>Given the three factors above, BlinkDB maximize the following mixed linear integer program (MILP):</div><div><div style="text-align:center;display:block"></div><div style="text-align:left">subject to</div><div style="text-align:left"><div style="text-align:center;display:block"></div><div style="text-align:left">and </div><div style="text-align:left"><div style="text-align:center;display:block"></div><div style="text-align:left;display:block">where .</div><div style="text-align:left;display:block">we need to weigh the coverage of each set of columns by their importance: a set of columns <div style="display:block;text-align:left">is more important to cover when (1) it has a higher frequency, which is represented by w(i), or (2) when the joint distribution of . is more skewed (non-uniform), which is represented by . Thus, the best solution is when we maximize the sum of w(i) *y(i) *
for all query templates, as captured by our goal function.</div></div>Consider in practice, BlinkDB restrict the candidate subsets to only those that have appeared together at least in one of the query templates. This does not affect the optimality of the solution, because a column A that has not appeared with the rest of the columns in .can be safely removed without affecting any of the query templates. Also, BlinkDB limit candidate</div><div>subsets to those consisting of no more than a fixed number of columns, like 3 or 4 columns.</div><div>In handing data/workload variations in BlinkDB, the optimization formulation is designed to avoid over-fitting samples to past queries by: (i) only looking at the set of columns that appear in the query templates instead optimizing for specific constants in queries and (ii) considering infrequent subsets with a high degree of skew.In addition, BlinkDB periodically updates data and workload statistics to decide whether the current set of sample families are still effective or if the optimization problem needs to be re-solved based on the new input parameters. </div><div><div>Specifically, BlinkDB allows the administrator to decide what percentage of the sample families (in terms of storage cost) can be discarded/added to the system whenever BlinkDB triggers the sample creation module as a result of changes in data or workload distribution.</div><div><div style="text-align:center;display:block"></div><div style="text-align:left;display:block">Here .’s are additional input parameters stating whether .already exists in the system (when .= 1) or it does not (. = 0). </div><h2>BlinkDB Running</h2></div></div><div><div style="font-style:normal">In this section, we provide an overview of query execution in BlinkDB. and our approach for online sample selection. Given a query Q, the goal is to select one sample at run-time that meet the specified time or error constraints and then compute answers over them. Selecting a sample involves first selecting a sample family (i.e., dimension), and then selecting a sample resolution within that family.</div><h3>Selecting the Sample Family</h3></div></div></div></div></div></div><div>Choosing an appropriate sample family for a query primarily depends on the set of columns used for filtering and/or grouping. The WHERE clause itself may either consist of conjunctive predicates (condition1 AND condition2), disjunctive predicates (condition1 OR condition2) or a combination of the two. where:</div><div><ol><li>Queries with Conjunctive Predicates. Consider a query Q whose WHERE clause contains only conjunctive predicates. Let . be the set of columns that appear in these clause predicates. Q has multiple WHERE and/or GROUP BY clauses, then . represents the union of the columns that appear in each of these predicates. If BlinkDB finds one or more stratified sample family on a set of columns .such that . . , we simply pick the . with the smallest number of columns, and run the query on SFam(). However, if there is no stratified sample on a column set that is a superset of ., we run Q in parallel on the smallest sample of all sample families currently maintained by the system. Then, out of these samples we select the one that corresponds to the highest ratio of (i) the number of rows selected by Q, to (ii) the number of rows read by Q.</li><li>Queries with Disjunctive Predicates. Consider a query Q with disjunctions in its WHERE clause. In this case, we rewrite Q as a union of queries where each query Q(i) contains only conjunctive predicates. Let . be the set of columns in Q(j) ’s predicates. Then, we associate with every query Q(i) an error constraint or time constraint, such that we can still satisfy Q’s error/time constraints when aggregating the results over Q(i) (1<= . i <= . p) in parallel. </li></ol><h3>Selecting the Sample Size</h3><div>To select an appropriately sized sample in that family based on the query’s response time or error constraints, BlinkDB constructing an Error-Latency Profile (ELP) for the query. The ELP characterizes the rate at which the error decreases with increasing sample sizes, and is built simply by running the query on smaller samples to estimate the selectivity and project latency and error for larger samples. For a distributed query, its runtime scales with sample size, with the scaling rate depending on the exact query structure, physical placement of it’s inputs and the underlying data distribution. As shown in Table 2, the variation of error primarily depends on the variance of the underlying data distribution and the actual number of tuples processed in the sample, which in turn depends on the selectivity of a query’s predicates.</div></div><div><div style="text-align:center;display:block"></div><div style="text-align:left;display:block">Error Profile: An error profile is created for all queries with error constraints. If Q specifies an error constraint, the BlinkDB error profile tries to predict the size of the smallest sample that satisfies Q’s error constraint.</div><div style="text-align:left;display:block">Latency Profile: Similarly, a latency profile is created for all queries with response time constraints. If Q specifies a response time constraint, we select the sample family on which</div><div>to run Q the same way as above. Again, let SFam(), be the selected family and let be the number of rows that Q reads when running on S(.,K(m)).In addition, let n be the maximum number of rows that Q can read without exceeding its response time constraint.</div><div><div>Although BlinkDB requires a query to operate on smaller samples to construct its ELP, the intermediate data produced in the process is effectively utilized when the query runs on</div><div>larger samples.Physically, each progressively bigger logical sample (A, B or C) consists of all data blocks of the smaller samples in the same family. BlinkDB maintains a transparent mapping between logical samples and data blocks, i.e., A maps to (I), B maps to (I, II) and C maps to (I, II, III). Now, consider a query Q on this data. First, BlinkDB creates an ELP for Q by running it on the smallest sample A,it operates on the first two data blocks to estimate various query parameters described above and caches all intermediate data in this process.So if sample C is chosen based on the Q’s error/latency requirements, BlinkDB only operates on the additional data blocks, utilizing the previously cached intermediate data.</div></div></div><h2>Implementation</h2></div><div>Firstly in this part, the entire BlinkDB ecosystem should be given:</div><div><div style="text-align:center;display:block"></div><div style="text-align:center;display:block">Figure 2 The BlinkDB ecosystem</div><div style="text-align:left;display:block">Basically BlinkDB is built on top of the Hive Query Engine, supports both Hadoop MapReduce and Spark at the execution layer and uses the Hadoop Distributed File System at the storage layer.</div><div style="text-align:left;display:block"><div>Our implementation required changes in a few key components. We added a shim layer of BlinkDB Query Interface to the HiveQL parser that enables queries with response time and error bounds. Furthermore, it detects data input, which causes the Sample Creation and Maintenance module to create or update the set of random and multi-dimensional samples</div><div>at multiple granularity. BlinkDB further extend the HiveQL parser to implement a Sample Selection module that re-writes the query and iteratively assigns it an appropriately sized biased or random sample. We also added an Uncertainty Propagation module to modify the pre-existing aggregation functions to return errors bars and confidence intervals in addition to the result. Finally, we extended the SQLite based Hive Metastore to create BlinkDB Metastore that maintains a transparent mapping between the non-overlapping logical samples and physical HDFS data blocks.</div><div><div>In BlinkDB, uniform samples are generally created in a few hundred seconds. This is because the time taken to create them only depends on the disk/memory bandwidth and the degree of parallelism. On the other hand, creating stratified samples on a set of columns takes anywhere between a 5–30 minutes depending on the number of unique values to stratify on, which decides the number of reducers and the amount of data shuffled.</div></div><h2>Evaluation and Results</h2><div><div style="font-style:normal">The paper evaluate BlinkDB’s performance on a 100 node EC2 cluster using two workloads: a workload from Conviva Inc. and the well-known TPC-H benchmark.Where:</div><div><ul><li style="font-style:normal">Conviva Workload. The Conviva data represents information about video streams viewed by Internet users. We use query traces from their SQL-based ad-hoc querying system which is used for problem diagnosis and data analytics on a log of media accesses by Conviva users. These access logs are 1:7 TB in size and constitute a small fraction of data collected across 30 days. Based on their underlying data distribution, we generated a 17 TB dataset for our experiments and partitioned it across 100 nodes. The data consists of a single large fact table with 104 columns, such as, customer ID, city, media URL, genre, date, time, user OS, browser type, request response time, etc. The 17 TB dataset has about 5:5 billion rows.</li><li></li><li>TPC-HWorkload. We also ran a smaller number of experiments on TPC-H to demonstrate the generality of our results, with respect to a standard benchmark. All the TPC-H experiments ran on the same 100 node cluster, on 1 TB of data. The 22 benchmark queries in TPC-H were mapped to 6 unique query templates.</li></ul></div><div><div style="font-style:normal">First, we compare BlinkDB to query execution on full-sized datasets to demonstrate how even a small trade-off in the accuracy of final answers can result in orders-of-magnitude improvements in query response times. Second, we evaluate the accuracy and convergence properties of our optimal multi-dimensional, multi-granular stratified-sampling approach against both random sampling and single-column stratified-sampling approaches. Third, we evaluate the effectiveness of our cost models and error projections at meeting the user’s accuracy/response time requirements. Finally, we demonstrate BlinkDB’s ability to scale gracefully with increasing cluster size. And I prefer to put some result graphs from the paper as following:</div><div style="font-style:normal"><div style="text-align:center;display:block"></div><div style="display:block;text-align:left">
</div><div style="text-align:center">Figure 3.Sampling vs. No sampling</div></div><div style="text-align:center"><div style="font-style:normal;text-align:center;display:block"></div><div style="font-style:normal;text-align:center;display:block">Figure 4. Error Comparison in Conviva</div><div style="text-align:center;display:block"><div style="font-style:normal;text-align:center;display:block"></div><div style="font-style:normal;text-align:center;display:block">Figure 5. Error Convergence in Conviva</div><div style="text-align:center;display:block"><div style="font-style:normal;text-align:center;display:block"></div><div style="font-style:normal;text-align:center;display:block">Figure 6. Response Time Bounds</div><div style="text-align:center;display:block"><div style="font-style:normal;text-align:center;display:block"></div><div style="font-style:normal;text-align:center;display:block">Figure 7. Scale up</div><h2 style="text-align:left">Conclusion</h2><div style="text-align:left"><div>In this paper, we presented BlinkDB, a parallel, sampling-based approximate query engine that provides support for ad-hoc queries with error and response time constraints. BlinkDB is based on two key ideas: (i) a multi-dimensional, multi-granularity sampling strategy that builds and maintains a large variety of samples, and (ii) a run-time dynamic sample selection strategy that uses smaller samples to estimate query selectivity and choose the best samples for satisfying query constraints.</div><div>Basically what I do is learnt the whole structure of BlinkDB and try to do some simulation job based on EC2. Hopefully share something interesting with others.</div></div></div>