腳本功能:
1. 將指定的報告文件按照指定的字段、切庫切表策略切分
2. 將切分后的文件并發導入到對應的Mongodb中
3. 生成日志文件和done標識文件
使用手冊:
-h 打印幫助信息,并退出";
-f 需要切分的數據文件";
-g 清理昨日或歷史全部數據: 1 昨日數據 2 歷史全部數據";
-k 拆分字段在文件中列數,從1開始";
-o 需要切分的數據文件格式 tsv或csv ";
-d 切分的庫數目";
-t 切分的表數目";
-m 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_01";
-c 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_0102";
-a 入庫fieldFile";
-p 配置文件",
使用步驟:
1. 在配置文件中設置日志、切割后數據臨時路徑$LOG_HOME 和 $DATA_SPLIT_HOME目錄,如果不存在,則手動創建;
在配置文件中設置目標Mongodb參數信息,用來作為導入數據的目標庫;
在配置文件中設置Mongodb程序的主目錄$MONGO;
2. 按照具體的參數意義,仿照下面的格式執行腳本:
舉例:./mongo-split-importer.sh -f /data/shell/test.ata -g 1 -o tsv -k 3 -d 3 -t 3 -m idea -c idea -p ../conf/demeter_conf_qa.sh -a ../conf/idea-head-file
-f 切分目標文件 -o 文件格式 tsv -k 切割字段,第三個 -d 切割成3個庫 -t 每個庫3個表
-m 導入的mongodb未拆分名稱idea -c 導入的mongodb未拆分表名idea -p 環境配置文件 -a 導入目標表的fieldFile文件 -g 清理昨日數據
mongo-split-importer.sh執行腳本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
|
#!/bin/bash SPLITFILE= "" #目標切割文件 FILEFORMAT= "" # 目標切割文件格式 , \t FILEFORMATNAME= "" #切割目標文件格式名稱 csv tsv SPLITKEY=1 SPLITDBNUM= "" #目標切割庫數目 SPLITTBNUM= "" #目標切割表數目 IMPORTDBNAME= "" # 目標入庫未分割庫名 IMPORTTBNAME= "" #目標入庫未切割表名 PROFILE= "" #配置文件 FIELDFILE= "" #入庫fieldFile CLEAN=0 #清理數據, 0:默認不清理, 1 : 清理昨日的數據 2: 清理所有以前的數據 SPILTTMPDIR= "" #目標切割文件存放臨時目錄 FULLPATH=$(cd `dirname $0`;pwd -P) SCRIPTFILE=`basename $0` TOTLE_RECORD_NUM=0 #文件切割前的記錄條目 SUBFILE_RECORD_NUM=0 #切割后所有文件匯總的記錄條目 _mongo_count= "-1" #------------------------------------------------函數--------------------------------------------------------------- function usage(){ echo "$SCRIPTFILE - 分庫分表后將數據導數據到mongodb" echo "SYNOPSIS" echo "OPTIONS" echo " -h 打印幫助信息,并退出" ; echo " -f 需要切分的數據文件" ; echo " -g 是否清理歷史數據,默認不清理 1:清理昨日數據 2:清理以前所有數據" ; echo " -k 拆分字段在文件中列數,從1開始" ; echo " -o 需要切分的數據文件格式 tsv或csv " ; echo " -d 切分的庫數目" ; echo " -t 切分的表數目" ; echo " -m 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_01" ; echo " -c 切分后,需要入庫的mongodb未拆分庫名,比如拆分前cpc, 拆分后cpc_0102" ; echo " -a 入庫fieldFile" ; echo " -p 配置文件,絕對或相對路徑文件" , exit } function setFileFormat(){ FILEFORMATNAME=$1 case $1 in csv) FILEFORMAT= "," ;; tsv) FILEFORMAT= "\t" ;; *) echo "unknow profile -o $1" ; usage;; esac } while getopts ':hf:g:o:k:d:t:a:p:m:c:' OPTION do case $OPTION in h) usage;; f) SPLITFILE= $OPTARG ;; g)CLEAN= $OPTARG ;; o) setFileFormat $OPTARG ;; k) SPLITKEY= $OPTARG ;; d) SPLITDBNUM= $OPTARG ;; t) SPLITTBNUM= $OPTARG ;; a) FIELDFILE= $OPTARG ;; p) PROFILE= $OPTARG ;; m) IMPORTDBNAME= $OPTARG ;; c) IMPORTTBNAME= $OPTARG ;; :) echo "選項 \"-$OPTARG\" 后面缺少對應值, 將使用默認值" ;; \?)echo " 錯誤的選項 -$OPTARG, 將退出" ; usage;; esac done #記錄日志信息 function logInfo(){ echo "[`date +" %Y - %m - %d %H : %M : %S "`] $@ " | tee -a $LOGFILE } function checkError(){ if [ $? -ne 0 ]; then echo "[`date +" %Y - %m - %d %H : %M : %S , %s "`][$SCRIPTFILE, $$] ERROR OCCURS! - $1" | tee -a $ERRORFILE exit 1; fi } function check_ready() { tmp_done_file=` printf "$reportDoneFile" "$TABLE" "$1" ` while [ "$isok" = "false" ]; do rsync --list-only ${tmp_done_file} if [ $? -eq 0 ]; then isok= "true" ; break; fi if [ "$isok" = "false" ]; then sleep 300 fi time_now=`date + %s ` if [ `expr ${time_now} - ${time_start}` -ge $max_interval ]; then return 255; fi done return 0; } #從數據庫列表里選擇主庫 function selectMongoMaster(){ tmp= "TARGET_MONGO_HOST_LIST_0$1" TMP_HOST=${!tmp} echo $TMP_HOST #replica set for DUBHE_MONGO_HOST in $TMP_HOST ; do if [ $? -eq 0 ] ; then break; fi done # single server #for DUBHE_MONGO_HOST in $TMP_HOST; do #TARGET_MONGO_HOST=$DUBHE_MONGO_HOST #echo $TARGET_MONGO_HOST #done } #切割 function split () { logInfo "spilt data file" echo "split db num" $SPLITDBNUM echo "split tb num" $SPLITTBNUM echo "Start to split file: " $SPLITFILE awk ' BEGIN { FS= "'${FILEFORMAT}'" ; } ARGIND==1{ #分庫分表 DBN=$ '${SPLITKEY}' % '${SPLITDBNUM}' + 1; TBN= int ($ '${SPLITKEY}' / '${SPLITDBNUM}' ) TBN=TBN % '${SPLITTBNUM}' + 1; DBN= "0" DBN; TBN= "0" TBN; print $0 > "'${SPILTTMPDIR}'" "/" "'${IMPORTTBNAME}'" "_" DBN "" TBN } END { } ' ${SPLITFILE}; ls $SPILTTMPDIR echo "Split file successfully : " $SPLITFILE } #導入 function import () { #importData local iter=1; while [ $iter -le $SPLITDBNUM ]; do thread_import $iter & iter=`expr $iter + 1` done #wait for child-threads wait ; } #導入子線程 function thread_import() { local num=1; targetFileName= $IMPORTTBNAME "_0" $1 "0" $num targetFile= $SPILTTMPDIR / $IMPORTTBNAME "_0" $1 "0" $num targetDB= $IMPORTDBNAME "_0" $1 targetCollection= $IMPORTTBNAME "_0" $1 "0" $num if [ ! -f $targetFile ]; then logInfo "spilt file does not exits : " $targetFile num=`expr $num + 1` continue fi user= "TARGET_MONGO_USER_0" $1 TMP_USER=${!user} password= "TARGET_MONGO_PWD_0" $1 TMP_PASSWORD=${!password} #選擇master selectMongoMaster $1; #clean dirty data if [ $CLEAN -gt 0 ]; then logInfo "$qdate $targetDB.$targetCollection cleaning up dirty data in mongodb" clean_dirty_data checkError "whether error occurs during cleaning dirty data from mongodb" fi #import data import2mongo $1 $targetFile $targetDB $targetCollection #record done file statusfile= "$STATUS_LOG_HOME/$targetFileName.done.`date -d $qdate +" %Y - %m - %d "`" touch $statusfile num=`expr $num + 1` done logInfo "thread $1 ends" } #把指定的文件導到指定的庫指定的表,并建立索引,mongodb自身會判斷索引是否存在 #不存在的情況下才創建新索引 function import2mongo(){ if [ "$FIELDFILE" != "" ]; then MONGO_FIELD_FILE= $FIELDFILE else MONGO_FIELD_FILE= $FULLPATH /../conf/${IMPORTTBNAME}-head-file fi DATAFILE=$2 if [ ! -f $DATAFILE ]; then logInfo "mongodb [${DB}.${COLL}] imported 0 objects" return 0 fi TMPLOGFILE= $INFO_LOG_HOME / $DB . $COLL .tmp. log tmp=$? if [ "$tmp" != "0" ]; then return $tmp fi #data check _mongo_count=`tail $TMPLOGFILE | grep imported` _mongo_count=`expr 0 $_mongo_count + 0` #start to ensure index ensureIndex logInfo "mongodb [${DB}.${COLL}] imported $_mongo_count objects" return $tmp } function ensureIndex(){ } #垃圾數據清理 function clean_dirty_data(){ day=`date -d ${1:- ' -1day' } + "%y%m%d" ` if [ $CLEAN -eq 1 ]; then _mongo_condition= "{\"_id\":{\"\$gte\":\"${day}_0\",\"\$lte\":\"${day}_9\"}}" else _mongo_condition= "{\"_id\":{\"\$lte\":\"${day}_9\"}}" fi logInfo "waiting for the clean task.." echo $_mongo_condition tmp=$? if [ "$tmp" != "0" ]; then return $tmp fi sleep 5s logInfo "dirty data cleaned: " $targetDB $targetCollection $dirtyCount echo "dirty data cleaned: " $targetDB $targetCollection $dirtyCount return $tmp } #parameter check function checkParams() { if [ 1 -ne $CLEAN -a 2 -ne $CLEAN ]; then logInfo "-g the parameter clean is not in [1, 2] : " $CLEAN return 1; fi if [ $FILEFORMAT != "," -a $FILEFORMAT != "\t" ]; then logInfo "-o the parameter file format is not in [csv, tsv] : " $FILEFORMAT return 1; fi if [ $SPLITKEY -lt 1 ]; then logInfo "-k split key must not be less than 1 : " $SPLITKEY return 1; fi if [ $SPLITDBNUM -lt 1 ]; then logInfo "-d database number must not be less than 1 : " $SPLITDBNUM return 1; fi if [ $SPLITTBNUM -lt 1 ]; then logInfo "-t collection number must not be less than 1 : " $SPLITTBNUM return 1; fi if [ ! -f $FIELDFILE ]; then logInfo "-a field file is not a common file or not exits : " $FIELDFILE return 1; fi if [ "" = $IMPORTDBNAME ] ; then logInfo "-m import database name is empty : " $IMPORTDBNAME return 1; fi if [ "" = $IMPORTTBNAME ] ; then logInfo "-m import table name is empty : " $IMPORTTBNAME return 1; fi } #主函數 function main() { set +x echo "check split file and profile: " $SPLITFILE $PROFILE if [ ! -f $SPLITFILE ]; then echo "-f split file is not a common file or not exits : " $SPLITFILE return 1; fi if [ ! -f $PROFILE ]; then echo "-p profile file is not a common file or not exits : " $PROFILE return 1; fi source $PROFILE qdate=`date + "%Y-%m-%d" ` last_day=`date -d "-1day" + "%Y-%m-%d" ` BASEFILENAME=$(basename $SPLITFILE ) echo "base split file name is : " $BASEFILENAME if [ ! -d $LOG_HOME ] ; then logInfo " log home is not a common directory or not exits : " $LOG_HOME return 1; fi LOGFILE= $INFO_LOG_HOME / $BASEFILENAME . $qdate . log if [ -f $LOGFILE ]; then mv $LOGFILE $LOGFILE . $last_day fi touch $LOGFILE ERRORFILE= $ERROR_LOG_HOME / $BASEFILENAME .error. log if [ -f $ERRORFILE ]; then mv $ERRORFILE $ERRORFILE . $last_day fi touch $ERRORFILE #空行 echo echo logInfo "start to check parameters!" checkParams checkError "whether error occurs during check parameters : $SPLITFILE" #空行 echo echo logInfo "start to split file: " $SPLITFILE if [ ! -d $DATA_SPLIT_HOME ] ; then logInfo " data split home is not a common directory or not exits : " $DATA_SPLIT_HOME return 1; fi SPILTTMPDIR= $DATA_SPLIT_HOME / $BASEFILENAME echo "split temple directory : " $SPILTTMPDIR if [ -d ${SPILTTMPDIR} ]; then rm -rf ${SPILTTMPDIR} fi mkdir -p ${SPILTTMPDIR} split checkError "whether error occurs during split data : $SPLITFILE" logInfo "split data completely : $SPLITFILE" statusfile= $STATUS_LOG_HOME / $BASEFILENAME ".split.done." $qdate touch ${statusfile} #空行 echo echo logInfo "start to import split file to mongodb" import logInfo "import data completely : $SPLITFILE" statusfile= $STATUS_LOG_HOME / $BASEFILENAME ".import.done." $qdate touch ${statusfile} #空行 echo echo #remove temple directory # if [ -d ${SPILTTMPDIR} ]; then # rm -rf ${SPILTTMPDIR} # fi } #-------------------------------------------------入口---------------------------------------------------------------- source /etc/profile |
demeter_conf_cpc_qa.sh 腳本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
#!/bin/bash source /etc/profile #logger path INFO_LOG_HOME= "${LOG_HOME}/info" STATUS_LOG_HOME= "${LOG_HOME}/status" if [ ! -d $ERROR_LOG_HOME ]; then if [ ! -d $INFO_LOG_HOME ]; then mkdir -p $INFO_LOG_HOME fi if [ ! -d $STATUS_LOG_HOME ]; then mkdir -p $STATUS_LOG_HOME fi if [ ! -d $DATA_HOME ]; then mkdir -p $DATA_HOME fi #data path for source and target data path DATA_SPLIT_HOME=/data/demeter/sdata #import target mongodbs TARGET_MONGO_PORT_01=XXX TARGET_MONGO_USER_01=XXX TARGET_MONGO_PWD_01=XXX TARGET_MONGO_HOST_LIST_01="test01.mongodb01: $TARGET_MONGO_PORT_01 test01.mongodb02: $TARGET_MONGO_PORT_01 test01.mongodb03:$ TARGET_MONGO_PORT_01" TARGET_MONGO_PORT_02=XXX TARGET_MONGO_USER_02=XXX TARGET_MONGO_PWD_02=XXX TARGET_MONGO_HOST_LIST_02="testt02.mongodb01: $TARGET_MONGO_PORT_02 test02.mongodb02: $TARGET_MONGO_PORT_02 test02.mongodb03:$ TARGET_MONGO_PORT_02" TARGET_MONGO_PORT_03=XXX TARGET_MONGO_USER_03=XXX TARGET_MONGO_PWD_03=XXX TARGET_MONGO_HOST_LIST_03="test03.mongodb01: $TARGET_MONGO_PORT_03 test03.mongodb02: $TARGET_MONGO_PORT_03 test03.mongodb03:$ TARGET_MONGO_PORT_03" #mongodb utils MONGO=/opt/mongodb |
xuri-cpc-head-file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
a b c d e f g h i j k l m n 0 p q r s |
host:
1
2
3
4
5
6
7
8
9
|
XX.XX.XX.XX test01.mongodb01 XX.XX.XX.XX test01.mongodb02 XX.XX.XX.XX testt01.mongodb03 XX.XX.XX.XX test02.mongodb01 XX.XX.XX.XX test02.mongodb02 XX.XX.XX.XX test02.mongodb03 XX.XX.XX.XX test03.mongodb01 XX.XX.XX.XX test03.mongodb02 XX.XX.XX.XX test03.mongodb03 |
文章列表