管道概念
POSIX多線程的使用方式中, 有一種很重要的方式-----流水線(亦稱為“管道”)方式,“數據元素”流串行地被一組線程按順序執行。它的使用架構可參考下圖:
以面向對象的思想去理解,整個流水線,可以理解為一個數據傳輸的管道;該管道中的每一個工作線程,可以理解為一個整個流水線的一個工作階段stage,這些工作線程之間的合作是一環扣一環的。靠輸入口越近的工作線程,是時序較早的工作階段stage,它的工作成果會影響下一個工作線程階段(stage)的工作結果,即下個階段依賴于上一個階段的輸出,上一個階段的輸出成為本階段的輸入。這也是pipeline的一個共有特點!
為了回應用戶對簡單數據訪問的需求,MongoDB2.2版本引入新的功能聚合框架(Aggregation Framework) ,它是數據聚合的一個新框架,其概念類似于數據處理的管道。 每個文檔通過一個由多個節點組成的管道,每個節點有自己特殊的功能(分組、過濾等),文檔經過管道處理后,最后輸出相應的結果。管道基本的功能有兩個:
一是對文檔進行“過濾”,也就是篩選出符合條件的文檔;
二是對文檔進行“變換”,也就是改變文檔的輸出形式。
其他的一些功能還包括按照某個指定的字段分組和排序等。而且在每個階段還可以使用表達式操作符計算平均值和拼接字符串等相關操作。管道提供了一個MapReduce 的替代方案,MapReduce使用相對來說比較復雜,而管道的擁有固定的接口(操作符表達),使用比較簡單,對于大多數的聚合任務管道一般來說是首選方法。
該框架使用聲明性管道符號來支持類似于SQL Group By操作的功能,而不再需要用戶編寫自定義的JavaScript例程。
大部分管道操作會在“aggregate”子句后會跟上“$match”打頭。它們用在一起,就類似于SQL的from和where子句,或是MongoDB的find函數。“$project”子句看起來也非常類似SQL或MongoDB中的某個概念(和SQL不同的是,它位于表達式尾端)。
接下來介紹的操作在MongoDB聚合框架中是獨一無二的。與大多數關系數據庫不同,MongoDB天生就可以在行/文檔內存儲數組。盡管該特性對于全有全無的數據訪問十分便利,但是它對于需要組合投影、分組和過濾操作來編寫報告的工作,卻顯得相當復雜。“$unwind”子句將數組分解為單個的元素,并與文檔的其余部分一同返回。
“$group”操作與SQL的Group By子句用途相同,但是使用起來卻更像是LINQ中的分組運算符。與取回一行平面數據不同,“$group”操作的結果集會呈現為一個持續的嵌套結構。正因如此,使用“$group”可以返回聚合信息,例如對于每個分組中的實際文檔,計算文檔整體或部分的數目和平均值。
管道操作符
管道是由一個個功能節點組成的,這些節點用管道操作符來進行表示。聚合管道以一個集合中的所有文檔作為開始,然后這些文檔從一個操作節點 流向下一個節點 ,每個操作節點對文檔做相應的操作。這些操作可能會創建新的文檔或者過濾掉一些不符合條件的文檔,在管道中可以對文檔進行重復操作。
先看一個管道聚合的例子:
管道操作符的種類:
Name | Description |
Reshapes a document stream. $project can rename, add, or remove fields as well as create computed values and sub-documents. | |
Filters the document stream, and only allows matching documents to pass into the next pipeline stage.$match uses standard MongoDB queries. | |
Restricts the number of documents in an aggregation pipeline. | |
Skips over a specified number of documents from the pipeline and returns the rest. | |
Takes an array of documents and returns them as a stream of documents. | |
Groups documents together for the purpose of calculating aggregate values based on a collection of documents. | |
Takes all input documents and returns them in a stream of sorted documents. | |
Returns an ordered stream of documents based on proximity to a geospatial point. |
管道操作符詳細使用說明
1. $project: 數據投影,主要用于重命名、增加和刪除字段
例如:
db.article.aggregate(
{ $project : {
title : 1 ,
author : 1 ,
}}
);
這樣的話結果中就只還有_id,tilte和author三個字段了,默認情況下_id字段是被包含的,如果要想不包含_id話可以這樣:
db.article.aggregate(
{ $project : {
_id : 0 ,
title : 1 ,
author : 1
}});
也可以在$project內使用算術類型表達式操作符,例如:
db.article.aggregate(
{ $project : {
title : 1,
doctoredPageViews : { $add:["$pageViews", 10] }
}});
通過使用$add給pageViews字段的值加10,然后將結果賦值給一個新的字段:doctoredPageViews
注:必須將$add計算表達式放到中括號里面
除此之外使用$project還可以重命名字段名和子文檔的字段名:
db.article.aggregate(
{ $project : {
title : 1 ,
page_views : "$pageViews" ,
bar : "$other.foo"
}});
也可以添加子文檔:
db.article.aggregate(
{ $project : {
title : 1 ,
stats : {
pv : "$pageViews",
foo : "$other.foo",
dpv : { $add:["$pageViews", 10] }
}
}});
產生了一個子文檔stats,里面包含pv,foo,dpv三個字段。
2.$match: 濾波操作,篩選符合條件文檔,作為下一階段的輸入
$match的語法和查詢表達式(db.collection.find())的語法相同
db.articles.aggregate( [
{ $match : { score : { $gt : 70, $lte : 90 } } },
{ $group: { _id: null, count: { $sum: 1 } } }
] );
$match用于獲取分數大于70小于或等于90記錄,然后將符合條件的記錄送到下一階段$group管道操作符進行處理。
注意:1.不能在$match操作符中使用$where表達式操作符。
2.$match盡量出現在管道的前面,這樣可以提早過濾文檔,加快聚合速度。
3.如果$match出現在最前面的話,可以使用索引來加快查詢。
3. $limit: 限制經過管道的文檔數量
$limit的參數只能是一個正整數
db.article.aggregate(
{ $limit : 5 });
這樣的話經過$limit管道操作符處理后,管道內就只剩下前5個文檔了
4. $skip: 從待操作集合開始的位置跳過文檔的數目
$skip參數也只能為一個正整數
db.article.aggregate(
{ $skip : 5 });
經過$skip管道操作符處理后,前五個文檔被“過濾”掉
5.$unwind:將數組元素拆分為獨立字段
例如:article文檔中有一個名字為tags數組字段:
> db.article.find()
{ "_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
"author" : "Jone", "title" : "Abook",
"tags" : [ "good", "fun", "good" ] }
使用$unwind操作符后:
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tags"})
{
"result" : [
{
"_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
"author" : "Jone",
"title" : "A book",
"tags" : "good"
},
{
"_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
"author" : "Jone",
"title" : "A book",
"tags" : "fun"
},
{
"_id" : ObjectId("528751b0e7f3eea3d1412ce2"),
"author" : "Jone",
"title" : "A book",
"tags" : "good"
}
],
"ok" : 1
}
注意:a.{$unwind:"$tags"})不要忘了$符號
b.如果$unwind目標字段不存在的話,那么該文檔將被忽略過濾掉,例如:
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$tag"})
{ "result" : [ ], "ok" : 1 }
將$tags改為$tag因不存在該字段,該文檔被忽略,輸出的結果為空
c.如果$unwind目標字段不是一個數組的話,將會產生錯誤,例如:
> db.article.aggregate({$project:{author:1,title:1,tags:1}},{$unwind:"$title"})
Error: Printing Stack Trace
at printStackTrace (src/mongo/shell/utils.js:37:15)
at DBCollection.aggregate (src/mongo/shell/collection.js:897:9)
at (shell):1:12
Sat Nov 16 19:16:54.488 JavaScript execution failed: aggregate failed: {
"errmsg" : "exception: $unwind: value at end of field path must be an array",
"code" : 15978,
"ok" : 0
} at src/mongo/shell/collection.js:L898
d.如果$unwind目標字段數組為空的話,該文檔也將會被忽略。
6.$group 對數據進行分組
$group的時候必須要指定一個_id域,同時也可以包含一些算術類型的表達式操作符:
db.article.aggregate(
{ $group : {
_id : "$author",
docsPerAuthor : { $sum : 1 },
viewsPerAuthor : { $sum : "$pageViews" }
}});
注意: 1.$group的輸出是無序的。
2.$group操作目前是在內存中進行的,所以不能用它來對大量個數的文檔進行分組。
7.$sort : 對文檔按照指定字段排序
使用方式如下:
db.users.aggregate( { $sort : { age : -1, posts: 1 } });
按照年齡進行降序操作,按照posts進行升序操作
注意:1.如果將$sort放到管道前面的話可以利用索引,提高效率
2.MongoDB 24.對內存做了優化,在管道中如果$sort出現在$limit之前的話,$sort只會對前$limit個文檔進行操作,這樣在內存中也只會保留前$limit個文檔,從而可以極大的節省內存
3.$sort操作是在內存中進行的,如果其占有的內存超過物理內存的10%,程序會產生錯誤
8.$goNear
$goNear會返回一些坐標值,這些值以按照距離指定點距離由近到遠進行排序
具體使用參數見下表:
Field | Type | Description |
near | GeoJSON point orlegacy coordinate pairs | The point for which to find the closest documents. |
distanceField | string | The output field that contains the calculated distance. To specify a field within a subdocument, use dot notation. |
limit | number | Optional. The maximum number of documents to return. The default value is 100. See also the num option. |
num | number | Optional. The num option provides the same function as the limitoption. Both define the maximum number of documents to return. If both options are included, the num value overrides the limit value. |
maxDistance | number | Optional. A distance from the center point. Specify the distance in radians. MongoDB limits the results to those documents that fall within the specified distance from the center point. |
query | document | Optional. Limits the results to the documents that match the query. The query syntax is the usual MongoDB read operation query syntax. |
spherical | Boolean | Optional. If true, MongoDB references points using a spherical surface. The default value is false. |
distanceMultiplier | number | Optional. The factor to multiply all distances returned by the query. For example, use the distanceMultiplier to convert radians, as returned by a spherical query, to kilometers by multiplying by the radius of the Earth. |
includeLocs | string | Optional. This specifies the output field that identifies the location used to calculate the distance. This option is useful when a location field contains multiple locations. To specify a field within a subdocument, usedot notation. |
uniqueDocs | Boolean | Optional. If this value is true, the query returns a matching document once, even if more than one of the document’s location fields match the query. If this value is false, the query returns a document multiple times if the document has multiple matching location fields. See $uniqueDocsfor more information. |
例如:
db.places.aggregate([
{
$geoNear: {
near: [40.724, -73.997],
distanceField: "dist.calculated",
maxDistance: 0.008,
query: { type: "public" },
includeLocs: "dist.location",
uniqueDocs: true,
num: 5
}
}
])
其結果為:
{
"result" : [
{ "_id" : 7,
"name" : "Washington Square",
"type" : "public",
"location" : [
[ 40.731, -73.999 ],
[ 40.732, -73.998 ],
[ 40.730, -73.995 ],
[ 40.729, -73.996 ]
],
"dist" : {
"calculated" : 0.0050990195135962296,
"location" : [ 40.729, -73.996 ]
}
},
{ "_id" : 8,
"name" : "Sara D. Roosevelt Park",
"type" : "public",
"location" : [
[ 40.723, -73.991 ],
[ 40.723, -73.990 ],
[ 40.715, -73.994 ],
[ 40.715, -73.994 ]
],
"dist" : {
"calculated" : 0.006082762530298062,
"location" : [ 40.723, -73.991 ]
}
}
],
"ok" : 1}
其中,dist.calculated中包含了計算的結果,而dist.location中包含了計算距離時實際用到的坐標
注意: 1.使用$goNear只能在管道處理的開始第一個階段進行
2.必須指定distanceField,該字段用來決定是否包含距離字段
3.$gonNear和geoNear命令比較相似,但是也有一些不同:distanceField在$geoNear中是必選的,而在geoNear中是可選的;includeLocs在$geoNear中是string類型,而在geoNear中是boolen類型。
管道表達式
管道操作符作為“鍵”,所對應的“值”叫做管道表達式。例如上面例子中{$match:{status:"A"}},$match稱為管道操作符,而{status:"A"}稱為管道表達式,它可以看作是管道操作符的操作數(Operand),每個管道表達式是一個文檔結構,它是由字段名、字段值、和一些表達式操作符組成的,例如上面例子中管道表達式就包含了一個表達式操作符$sum進行累加求和。
每個管道表達式只能作用于處理當前正在處理的文檔,而不能進行跨文檔的操作。管道表達式對文檔的處理都是在內存中進行的。除了能夠進行累加計算的管道表達式外,其他的表達式都是無狀態的,也就是不會保留上下文的信息。累加性質的表達式操作符通常和$group操作符一起使用,來統計該組內最大值、最小值等,例如上面的例子中我們在$group管道操作符中使用了具有累加的$sum來計算總和。
除了$sum以為,還有以下性質的表達式操作符:
組聚合操作符
Name | Description |
Returns an array of all the unique values for the selected field among for each document in that group. | |
Returns the first value in a group. | |
Returns the last value in a group. | |
Returns the highest value in a group. | |
Returns the lowest value in a group. | |
Returns an average of all the values in a group. | |
Returns an array of all values for the selected field among for each document in that group. | |
Returns the sum of all the values in a group. |
Bool類型聚合操作符
Name | Description |
Returns true only when all values in its input array are true. | |
Returns true when any value in its input array are true. | |
Returns the boolean value that is the opposite of the input value. |
比較類型聚合操作符
Name | Description |
Compares two values and returns the result of the comparison as an integer. | |
Takes two values and returns true if the values are equivalent. | |
Takes two values and returns true if the first is larger than the second. | |
Takes two values and returns true if the first is larger than or equal to the second. | |
Takes two values and returns true if the second value is larger than the first. | |
Takes two values and returns true if the second value is larger than or equal to the first. | |
Takes two values and returns true if the values are not equivalent. |
算術類型聚合操作符
Name | Description |
Computes the sum of an array of numbers. | |
Takes two numbers and divides the first number by the second. | |
Takes two numbers and calcualtes the modulo of the first number divided by the second. | |
Computes the product of an array of numbers. | |
Takes two numbers and subtracts the second number from the first. |
字符串類型聚合操作符
Name | Description |
Concatenates two strings. | |
Compares two strings and returns an integer that reflects the comparison. | |
Takes a string and returns portion of that string. | |
Converts a string to lowercase. | |
Converts a string to uppercase. |
日期類型聚合操作符
Name | Description |
Converts a date to a number between 1 and 366. | |
Converts a date to a number between 1 and 31. | |
Converts a date to a number between 1 and 7. | |
Converts a date to the full year. | |
Converts a date into a number between 1 and 12. | |
Converts a date into a number between 0 and 53 | |
Converts a date into a number between 0 and 23. | |
Converts a date into a number between 0 and 59. | |
Converts a date into a number between 0 and 59. May be 60 to account for leap seconds. | |
Returns the millisecond portion of a date as an integer between 0 and 999. |
條件類型聚合操作符
Name | Description |
A ternary operator that evaluates one expression, and depending on the result returns the value of one following expressions. | |
Evaluates an expression and returns a value. |
注:以上操作符都必須在管道操作符的表達式內來使用。
各個表達式操作符的具體使用方式參見:
http://docs.mongodb.org/manual/reference/operator/aggregation-group/
聚合管道的優化
1.$sort + $skip + $limit順序優化
如果在執行管道聚合時,如果$sort、$skip、$limit依次出現的話,例如:
{ $sort: { age : -1 } },
{ $skip: 10 },
{ $limit: 5 }
那么實際執行的順序為:
{ $sort: { age : -1 } },
{ $limit: 15 },
{ $skip: 10 }
$limit會提前到$skip前面去執行。
此時$limit = 優化前$skip+優化前$limit
這樣做的好處有兩個:1.在經過$limit管道后,管道內的文檔數量個數會“提前”減小,這樣會節省內存,提高內存利用效率。2.$limit提前后,$sort緊鄰$limit這樣的話,當進行$sort的時候當得到前“$limit”個文檔的時候就會停止。
2.$limit + $skip + $limit + $skip Sequence Optimization
如果聚合管道內反復出現下面的聚合序列:
{ $limit: 100 },
{ $skip: 5 },
{ $limit: 10},
{ $skip: 2 }
首先進行局部優化為:可以按照上面所講的先將第二個$limit提前:
{ $limit: 100 },
{ $limit: 15},
{ $skip: 5 },
{ $skip: 2 }
進一步優化:兩個$limit可以直接取最小值 ,兩個$skip可以直接相加:
{ $limit: 15 },
{ $skip: 7 }
3.Projection Optimization
過早的使用$project投影,設置需要使用的字段,去掉不用的字段,可以大大減少內存。除此之外也可以過早使用
我們也應該過早使用$match、$limit、$skip操作符,他們可以提前減少管道內文檔數量,減少內存占用,提供聚合效率。
除此之外,$match盡量放到聚合的第一個階段,如果這樣的話$match相當于一個按條件查詢的語句,這樣的話可以使用索引,加快查詢效率。
聚合管道的限制
1.類型限制
在管道內不能操作 Symbol, MinKey, MaxKey, DBRef, Code, CodeWScope類型的數據( 2.4版本解除了對二進制數據的限制).
2.結果大小限制
管道線的輸出結果不能超過BSON 文檔的大小(16M),如果超出的話會產生錯誤.
3.內存限制
如果一個管道操作符在執行的過程中所占有的內存超過系統內存容量的10%的時候,會產生一個錯誤。
當$sort和$group操作符執行的時候,整個輸入都會被加載到內存中,如果這些占有內存超過系統內存的%5的時候,會將一個warning記錄到日志文件。同樣,所占有的內存超過系統內存容量的10%的時候,會產生一個錯誤。
分片上使用聚合管道
聚合管道支持在已分片的集合上進行聚合操作。當分片集合上進行聚合操縱的時候,聚合管道被分為兩成兩個部分,分別在mongod實例和mongos上進行操作。
聚合管道使用
首先下載測試數據:http://media.mongodb.org/zips.json 并導入到數據庫中。
1.查詢各州的人口數
var connectionString = ConfigurationManager.AppSettings["MongodbConnection"];
var client = new MongoClient(connectionString);
var DatabaseName = ConfigurationManager.AppSettings["DatabaseName"];
string collName = ConfigurationManager.AppSettings["collName"];
MongoServer mongoDBConn = client.GetServer();
MongoDatabase db = mongoDBConn.GetDatabase(DatabaseName);
MongoCollection<BsonDocument> table = db[collName];
var group = new BsonDocument
{
{"$group", new BsonDocument
{
{
"_id","$state"
},
{
"totalPop", new BsonDocument
{
{ "$sum","$pop" }
}
}
}
}
};
var sort = new BsonDocument
{
{"$sort", new BsonDocument{ { "_id",1 }}}
};
var pipeline = new[] { group, sort };
var result = table.Aggregate(pipeline);
var matchingExamples = result.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples)
{
var message = string.Format("{0}- {1}", example["_id"], example["totalPop"]);
Console.WriteLine(message);
}
2.計算每個州平均每個城市打人口數
> db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},
{$group:{_id:"$_id.state",avCityPop:{$avg:"$pop"}}},
{$sort:{_id:1}})
var group1 = new BsonDocument
{
{"$group", new BsonDocument
{
{
"_id",new BsonDocument
{
{"state","$state"},
{"city","$city"}
}
},
{
"pop", new BsonDocument
{
{ "$sum","$pop" }
}
}
}
}
};
var group2 = new BsonDocument
{
{"$group", new BsonDocument
{
{
"_id","$_id.state"
},
{
"avCityPop", new BsonDocument
{
{ "$avg","$pop" }
}
}
}
}
};
var pipeline1 = new[] { group1,group2, sort };
var result1 = table.Aggregate(pipeline1);
var matchingExamples1 = result1.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples1)
{
var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);
Console.WriteLine(message);
}
3.計算每個州人口最多和最少的城市名字
>db.zipcode.aggregate({$group:{_id:{state:"$state",city:"$city"},pop:{$sum:"$pop"}}},
{$sort:{pop:1}},
{$group:{_id:"$_id.state",biggestCity:{$last:"$_id.city"},biggestPop:{$last:"$pop"},smallestCity:{$first:"$_id.city"},smallestPop:{$first:"$pop"}}},
{$project:{_id:0,state:"$_id",biggestCity:{name:"$biggestCity",pop:"$biggestPop"},smallestCity:{name:"$smallestCity",pop:"$smallestPop"}}})
var sort1 = new BsonDocument
{
{"$sort", new BsonDocument{ { "pop",1 }}}
};
var group3 = new BsonDocument
{
{
"$group", new BsonDocument
{
{
"_id","$_id.state"
},
{
"biggestCity",new BsonDocument
{
{"$last","$_id.city"}
}
},
{
"biggestPop",new BsonDocument
{
{"$last","$pop"}
}
},
{
"smallestCity",new BsonDocument
{
{"$first","$_id.city"}
}
},
{
"smallestPop",new BsonDocument
{
{"$first","$pop"}
}
}
}
}
};
var project = new BsonDocument
{
{
"$project", new BsonDocument
{
{"_id",0},
{"state","$_id"},
{"biggestCity",new BsonDocument
{
{"name","$biggestCity"},
{"pop","$biggestPop"}
}},
{"smallestCity",new BsonDocument
{
{"name","$smallestCity"},
{"pop","$smallestPop"}
}
}
}
}
};
var pipeline2 = new[] { group1,sort1 ,group3, project };
var result2 = table.Aggregate(pipeline2);
var matchingExamples2 = result2.ResultDocuments.Select(x => x.ToDynamic()).ToList();
foreach (var example in matchingExamples2)
{
Console.WriteLine(example.ToString());
//var message = string.Format("{0}- {1}", example["_id"], example["avCityPop"]);
//Console.WriteLine(message);
}
總結
對于大多數的聚合操作,聚合管道可以提供很好的性能和一致的接口,使用起來比較簡單, 和MapReduce一樣,它也可以作用于分片集合,但是輸出的結果只能保留在一個文檔中,要遵守BSON Document大小限制(當前是16M)。
管道對數據的類型和結果的大小會有一些限制,對于一些簡單的固定的聚集操作可以使用管道,但是對于一些復雜的、大量數據集的聚合任務還是使用MapReduce。
相關文章:
http://mikaelkoskinen.net/mongodb-aggregation-framework-examples-in-c/
文章列表