Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code

Side by Side Diff: sitescripts/stats/bin/logprocessor.py

Issue 5182947690807296: Centralize stats processing, have the stats server pull in logs (Closed)
Patch Set: Created Dec. 22, 2013, 10:01 p.m.
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 2
3 # This file is part of the Adblock Plus web scripts, 3 # This file is part of the Adblock Plus web scripts,
4 # Copyright (C) 2006-2013 Eyeo GmbH 4 # Copyright (C) 2006-2013 Eyeo GmbH
5 # 5 #
6 # Adblock Plus is free software: you can redistribute it and/or modify 6 # Adblock Plus is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License version 3 as 7 # it under the terms of the GNU General Public License version 3 as
8 # published by the Free Software Foundation. 8 # published by the Free Software Foundation.
9 # 9 #
10 # Adblock Plus is distributed in the hope that it will be useful, 10 # Adblock Plus is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of 11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details. 13 # GNU General Public License for more details.
14 # 14 #
15 # You should have received a copy of the GNU General Public License 15 # You should have received a copy of the GNU General Public License
16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>. 16 # along with Adblock Plus. If not, see <http://www.gnu.org/licenses/>.
17 17
18 import os, sys, codecs, re, math, urllib, urlparse, socket, json 18 import argparse
19 import codecs
20 from collections import OrderedDict
21 from datetime import datetime, timedelta
22 import errno
23 import gzip
24 import json
25 import math
26 import multiprocessing
27 import numbers
28 import os
29 import re
19 import pygeoip 30 import pygeoip
20 from collections import OrderedDict 31 import socket
32 import subprocess
33 import sys
34 import traceback
35 import urllib
36 import urlparse
37
21 import sitescripts.stats.common as common 38 import sitescripts.stats.common as common
22 from sitescripts.utils import get_config, setupStderr 39 from sitescripts.utils import get_config, setupStderr
23 from datetime import datetime, timedelta
24 40
25 log_regexp = None 41 log_regexp = None
26 mirror_name = None
27 gecko_apps = None 42 gecko_apps = None
28 43
44 def open_stats_file(path):
45 parseresult = urlparse.urlparse(path)
46 if parseresult.scheme == "ssh" and parseresult.username and parseresult.hostna me and parseresult.path:
47 command = [
48 "ssh", "-q", "-o", "NumberOfPasswordPrompts 0", "-T", "-k",
49 "-l", parseresult.username,
50 parseresult.hostname,
51 parseresult.path.lstrip("/")
52 ]
53 if parseresult.port:
54 command[1:1] = ["-P", str(parseresult.port)]
55 result = subprocess.Popen(command, stdout=subprocess.PIPE).stdout
56 elif parseresult.scheme in ("http", "https"):
57 result = urllib.urlopen(path)
58 elif os.path.exists(path):
59 result = open(path, "rb")
60 else:
61 raise IOError("Path '%s' not recognized" % path)
62
63 if path.endswith(".gz"):
64 # Built-in gzip module doesn't support streaming (fixed in Python 3.2)
65 result = subprocess.Popen(["gzip", "-cd"], stdin=result, stdout=subprocess.P IPE).stdout
66 return result
67
68 def get_stats_files():
69 config = get_config()
70
71 prefix = "mirror_"
72 options = filter(lambda o: o.startswith(prefix), config.options("stats"))
73 for option in options:
74 if config.has_option("stats", option):
75 value = config.get("stats", option)
76 if " " in value:
77 yield [option[len(prefix):]] + value.split(None, 1)
78 else:
79 print >>sys.stderr, "Option '%s' has invalid value: '%s'" % (option, val ue)
80 else:
81 print >>sys.stderr, "Option '%s' not found in the configuration" % option
82
29 def cache_lru(func): 83 def cache_lru(func):
30 """ 84 """
31 Decorator that memoizes the return values of a single-parameter function in 85 Decorator that memoizes the return values of a single-parameter function in
32 case it is called again with the same parameter. The 1024 most recent 86 case it is called again with the same parameter. The 1024 most recent
33 results are saved. 87 results are saved.
34 """ 88 """
35 89
36 results = OrderedDict() 90 results = OrderedDict()
37 results.entries_left = 1024 91 results.entries_left = 1024
38 92
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
150 if ua == "Adblock Plus": 204 if ua == "Adblock Plus":
151 return "ABP", "" 205 return "ABP", ""
152 206
153 return "Other", "" 207 return "Other", ""
154 208
155 def process_ip(ip, geo, geov6): 209 def process_ip(ip, geo, geov6):
156 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip) 210 match = re.search(r"^::ffff:(\d+\.\d+\.\d+\.\d+)$", ip)
157 if match: 211 if match:
158 ip = match.group(1) 212 ip = match.group(1)
159 213
160 if ":" in ip: 214 try:
161 country = geov6.country_code_by_addr(ip) 215 if ":" in ip:
162 else: 216 country = geov6.country_code_by_addr(ip)
163 country = geo.country_code_by_addr(ip) 217 else:
218 country = geo.country_code_by_addr(ip)
219 except:
220 traceback.print_exc()
221 country = ""
222
164 if country in (None, "", "--"): 223 if country in (None, "", "--"):
165 country = "unknown" 224 country = "unknown"
166 country = country.lower() 225 country = country.lower()
167 226
168 return ip, country 227 return ip, country
169 228
170 @cache_last 229 @cache_last
171 def parse_time(timestr, tz_hours, tz_minutes): 230 def parse_time(timestr, tz_hours, tz_minutes):
172 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S") 231 result = datetime.strptime(timestr, "%d/%b/%Y:%H:%M:%S")
173 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours)) 232 result -= timedelta(hours = tz_hours, minutes = math.copysign(tz_minutes, tz_h ours))
(...skipping 124 matching lines...) Expand 10 before | Expand all | Expand 10 after
298 357
299 # Only leave the major and minor release number for application 358 # Only leave the major and minor release number for application
300 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion) 359 applicationVersion = re.sub(r"^(\d+\.\d+).*", r"\1", applicationVersion)
301 360
302 return version, application, applicationVersion 361 return version, application, applicationVersion
303 362
304 def parse_update_flag(query): 363 def parse_update_flag(query):
305 return "update" if query == "update" else "install" 364 return "update" if query == "update" else "install"
306 365
307 def parse_record(line, ignored, geo, geov6): 366 def parse_record(line, ignored, geo, geov6):
308 global log_regexp, mirror_name 367 global log_regexp
309 if log_regexp == None: 368 if log_regexp == None:
310 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?') 369 log_regexp = re.compile(r'(\S+) \S+ \S+ \[([^]\s]+) ([+\-]\d\d)(\d\d)\] "GET ([^"\s]+) [^"]+" (\d+) (\d+) "[^"]*" "([^"]*)"(?: "[^"]*" \S+ "[^"]*" "[^"]*" " ([^"]*)")?')
311 if mirror_name == None:
312 mirror_name = re.sub(r"\..*", "", socket.gethostname())
313 370
314 match = re.search(log_regexp, line) 371 match = re.search(log_regexp, line)
315 if not match: 372 if not match:
316 return None 373 return None
317 374
318 status = int(match.group(6)) 375 status = int(match.group(6))
319 if status != 200: 376 if status != 200:
320 return None 377 return None
321 378
322 info = { 379 info = {
323 "mirror": mirror_name,
324 "size": int(match.group(7)), 380 "size": int(match.group(7)),
325 } 381 }
326 382
327 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6) 383 info["ip"], info["country"] = process_ip(match.group(1), geo, geov6)
328 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4))) 384 info["time"], info["month"], info["day"], info["weekday"], info["hour"] = pars e_time(match.group(2), int(match.group(3)), int(match.group(4)))
329 info["file"], info["query"] = parse_path(match.group(5)) 385 info["file"], info["query"] = parse_path(match.group(5))
330 info["ua"], info["uaversion"] = parse_ua(match.group(8)) 386 info["ua"], info["uaversion"] = parse_ua(match.group(8))
331 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"]) 387 info["fullua"] = "%s %s" % (info["ua"], info["uaversion"])
332 info["clientid"] = match.group(9) 388 info["clientid"] = match.group(9)
333 389
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
375 continue 431 continue
376 432
377 value = info[field] 433 value = info[field]
378 if field not in section: 434 if field not in section:
379 section[field] = {} 435 section[field] = {}
380 if value not in section[field]: 436 if value not in section[field]:
381 section[field][value] = {} 437 section[field][value] = {}
382 438
383 add_record(info, section[field][value], ignore_fields + (field,)) 439 add_record(info, section[field][value], ignore_fields + (field,))
384 440
385 def parse_stdin(geo, geov6, verbose): 441 def parse_fileobj(mirror_name, fileobj, geo, geov6, ignored):
386 data = {} 442 data = {}
387 ignored = set() 443 for line in fileobj:
388 for line in sys.stdin:
389 info = parse_record(line, ignored, geo, geov6) 444 info = parse_record(line, ignored, geo, geov6)
390 if info == None: 445 if info == None:
391 continue 446 continue
392 447
448 info["mirror"] = mirror_name
393 if info["month"] not in data: 449 if info["month"] not in data:
394 data[info["month"]] = {} 450 data[info["month"]] = {}
395 section = data[info["month"]] 451 section = data[info["month"]]
396 452
397 if info["file"] not in section: 453 if info["file"] not in section:
398 section[info["file"]] = {} 454 section[info["file"]] = {}
399 section = section[info["file"]] 455 section = section[info["file"]]
400 456
401 add_record(info, section) 457 add_record(info, section)
458 return data
402 459
403 if verbose: 460 def merge_objects(object1, object2):
404 print "Ignored files" 461 for key, value in object2.iteritems():
405 print "=============" 462 key = unicode(key)
406 print "\n".join(sorted(ignored)) 463 if key in object1:
407 return data 464 if isinstance(value, numbers.Number):
465 object1[key] += value
466 else:
467 merge_objects(object1[key], value)
468 else:
469 object1[key] = value
470
471 def save_stats(server_type, data):
472 base_dir = os.path.join(get_config().get("stats", "dataDirectory"), common.fil ename_encode(server_type))
473 for month, month_data in data.iteritems():
474 for name, file_data in month_data.iteritems():
475 path = os.path.join(base_dir, common.filename_encode(month), common.filena me_encode(name + ".json"))
476 if os.path.exists(path):
477 with codecs.open(path, "rb", encoding="utf-8") as fileobj:
478 existing = json.load(fileobj)
479 else:
480 existing = {}
481
482 merge_objects(existing, file_data)
483
484 dir = os.path.dirname(path)
485 try:
486 os.makedirs(dir)
487 except OSError, e:
488 if e.errno != errno.EEXIST:
489 raise
490
491 with codecs.open(path, "wb", encoding="utf-8") as fileobj:
492 json.dump(existing, fileobj, indent=2, sort_keys=True)
493
494 def parse_source((mirror_name, server_type, log_file)):
495 try:
496 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CA CHE)
497 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMOR Y_CACHE)
498
499 ignored = set()
500 fileobj = open_stats_file(log_file)
501 try:
502 data = parse_fileobj(mirror_name, fileobj, geo, geov6, ignored)
503 finally:
504 fileobj.close()
505 return server_type, log_file, data, ignored
506 except:
507 print >>sys.stderr, "Unable to process log file '%s'" % log_file
508 traceback.print_exc()
509 return None, None, None, None
510
511 def parse_sources(sources, verbose):
512 pool = multiprocessing.Pool()
Sebastian Noack 2013/12/23 10:01:41 You should call pool.close(), when you are done.
513 for server_type, log_file, data, ignored in pool.imap(parse_source, sources, c hunksize=1):
514 if server_type == None:
515 continue
516
517 save_stats(server_type, data)
518 if verbose:
519 print "Ignored files for %s" % log_file
520 print "============================================================"
521 print "\n".join(sorted(ignored))
408 522
409 if __name__ == "__main__": 523 if __name__ == "__main__":
410 setupStderr() 524 setupStderr()
411 525
412 verbose = (len(sys.argv) >= 2 and sys.argv[1] == "verbose") 526 parser = argparse.ArgumentParser(description="Processes log files and merges t hem into the stats database")
413 geo = pygeoip.GeoIP(get_config().get("stats", "geoip_db"), pygeoip.MEMORY_CACH E) 527 parser.add_argument("--verbose", dest="verbose", action="store_const", const=T rue, default=False, help="Verbose mode, ignored requests will be listed")
414 geov6 = pygeoip.GeoIP(get_config().get("stats", "geoipv6_db"), pygeoip.MEMORY_ CACHE) 528 parser.add_argument("mirror_name", nargs="?", help="Name of the mirror server that the file belongs to")
415 result = parse_stdin(geo, geov6, verbose) 529 parser.add_argument("server_type", nargs="?", help="Server type like download, update or subscription")
530 parser.add_argument("log_file", nargs="?", help="Log file path, can be a local file path, http:// or ssh:// URL")
531 args = parser.parse_args()
416 532
417 with codecs.open(get_config().get("stats", "tempFile"), "wb", encoding="utf-8" ) as file: 533 if args.mirror_name and args.server_type and args.log_file:
418 json.dump(result, file, indent=2, sort_keys=True) 534 sources = [(args.mirror_name, args.server_type, args.log_file)]
535 else:
536 sources = get_stats_files()
537 parse_sources(sources, args.verbose)
OLDNEW
« no previous file with comments | « sitescripts/stats/bin/datamerger.py ('k') | sitescripts/stats/bin/pagegenerator.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld