Add and use optimized http log syncing script
The previous one synced all hosts serially and ran rsync for each log file. This reimplements the shell script in Python, with these changes: - Run rsync on whole directories of log files, with much reduced overhead. - Use a pool of five workers which process hosts in parallel. Additionally, remove download-rdu01.vpn.fedoraproject.org from the list of synced hosts. Signed-off-by: Nils Philippsen <nils@redhat.com>
This commit is contained in:
parent
ea1c19d522
commit
f703e7a771
4 changed files with 282 additions and 140 deletions
203
roles/web-data-analysis/files/sync-http-logs.py
Normal file
203
roles/web-data-analysis/files/sync-http-logs.py
Normal file
|
@ -0,0 +1,203 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import datetime as dt
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import uuid
|
||||
import yaml
|
||||
from multiprocessing import Pool
|
||||
from pathlib import Path
|
||||
from tempfile import mktemp
|
||||
|
||||
from fedora_messaging import api, message
|
||||
|
||||
|
||||
RSYNC_PROG = "/usr/bin/rsync"
|
||||
RSYNC_FLAGS = "-avSHP --no-motd --timeout=1200 --contimeout=1200"
|
||||
RSYNC_CMD = [RSYNC_PROG] + RSYNC_FLAGS.split()
|
||||
DEBUG = True
|
||||
|
||||
RUN_ID = str(uuid.uuid4())
|
||||
LOGHOST = socket.getfqdn()
|
||||
MSGTOPIC_PREFIX = "logging.stats"
|
||||
|
||||
CONFIG_FILE = Path("/etc/sync-http-logs.yaml")
|
||||
BASE_LOGDIR = Path("/var/log/hosts")
|
||||
DAYS_TO_FETCH = 3
|
||||
RETRY_ATTEMPTS = 3
|
||||
PARALLEL_WORKERS = 5
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.DEBUG if DEBUG else logging.INFO)
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setLevel(logging.DEBUG)
|
||||
log.addHandler(console_handler)
|
||||
|
||||
|
||||
def send_bus_msg(topic, **kwargs):
|
||||
msg = message.Message(topic=f"{MSGTOPIC_PREFIX}.{topic}", body=kwargs)
|
||||
api.publish(msg)
|
||||
|
||||
|
||||
def send_sync_msg(topic, **kwargs):
|
||||
kwargs.setdefault("host", LOGHOST)
|
||||
kwargs.setdefault("run_id", RUN_ID)
|
||||
send_bus_msg(topic, **kwargs)
|
||||
|
||||
|
||||
def link_force_atomic(src, dst):
|
||||
"""(Hard-)link src to dst atomically.
|
||||
|
||||
As one can't create a link over an existing destination file, this
|
||||
function creates it as a temporary file in the same directory first and
|
||||
then renames it over the specified destination file.
|
||||
"""
|
||||
dst = Path(dst)
|
||||
dstdir = dst.parent
|
||||
|
||||
while True:
|
||||
tmpdst = mktemp(dir=dstdir)
|
||||
try:
|
||||
os.link(src, tmpdst)
|
||||
except FileExistsError:
|
||||
# ignore if the file exists, just try another file name
|
||||
pass
|
||||
else:
|
||||
# no exception: linking succeeded, rename & break out of the loop
|
||||
os.rename(tmpdst, dst)
|
||||
break
|
||||
|
||||
|
||||
def seed_dated_logfiles(date, srcdir, dstdir):
|
||||
date_tag = date.strftime('%Y%m%d')
|
||||
untagged_re = re.compile(r"^(?P<name>.*)log\.(?P<ext>[^\.]+)$")
|
||||
|
||||
for srcfile in srcdir.glob("*log.*"):
|
||||
if srcfile.is_symlink() or not srcfile.is_file():
|
||||
continue
|
||||
match = untagged_re.match(srcfile.name)
|
||||
if not match:
|
||||
continue
|
||||
parts = match.groupdict()
|
||||
dstfile = dstdir / f"{parts['name']}log-{date_tag}.{parts['ext']}"
|
||||
link_force_atomic(srcfile, dstfile)
|
||||
|
||||
|
||||
def link_back_undated_logfiles(date, srcdir, dstdir):
|
||||
date_tag = date.strftime('%Y%m%d')
|
||||
tagged_re = re.compile(rf"^(?P<name>.*)log-{date_tag}\.(?P<ext>[^\.]+)$")
|
||||
|
||||
for srcfile in srcdir.glob("*log-*.*"):
|
||||
if srcfile.is_symlink() or not srcfile.is_file():
|
||||
continue
|
||||
match = tagged_re.match(srcfile.name)
|
||||
if not match:
|
||||
continue
|
||||
parts = match.groupdict()
|
||||
dstfile = dstdir / f"{parts['name']}log.{parts['ext']}"
|
||||
link_force_atomic(srcfile, dstfile)
|
||||
|
||||
|
||||
def sync_http_logs(synced_host):
|
||||
# Skip hosts not known to DNS
|
||||
try:
|
||||
socket.gethostbyname(synced_host)
|
||||
except socket.gaierror as e:
|
||||
if e.errno == socket.EAI_NONAME:
|
||||
log.info("Skipping sync from unknown host %s.", synced_host)
|
||||
send_sync_msg("sync.host.skip", synced_host=synced_host, reason="Synced host unknown")
|
||||
return
|
||||
raise
|
||||
|
||||
log.info("Started sync from %s.", synced_host)
|
||||
send_sync_msg("sync.host.start", synced_host=synced_host)
|
||||
|
||||
for days_in_past in range(1, DAYS_TO_FETCH + 1):
|
||||
date = dt.date.today() - dt.timedelta(days=days_in_past)
|
||||
date_str = date.strftime("%Y%m%d")
|
||||
year = str(date.year)
|
||||
month = f"{date.month:02d}"
|
||||
day = f"{date.day:02d}"
|
||||
log_date = f"{year}-{month}-{day}"
|
||||
target_dir_root = BASE_LOGDIR / synced_host / year / month / day
|
||||
target_dir_undated = target_dir_root / "http"
|
||||
target_dir_dated = target_dir_root / "http_with_date"
|
||||
|
||||
send_sync_msg("sync.host.logdate.start", synced_host=synced_host, log_date=log_date)
|
||||
|
||||
target_dir_undated.mkdir(parents=True, exist_ok=True)
|
||||
target_dir_dated.mkdir(exist_ok=True)
|
||||
|
||||
log.info(
|
||||
"... host %s, log date %s, seeding dated logfiles from undated", synced_host, log_date
|
||||
)
|
||||
seed_dated_logfiles(date, target_dir_undated, target_dir_dated)
|
||||
|
||||
for attempt in range(1, RETRY_ATTEMPTS + 1):
|
||||
log.info("... host %s, log date %s, attempt %d", synced_host, log_date, attempt)
|
||||
try:
|
||||
exitcode = subprocess.call(
|
||||
RSYNC_CMD + [f"{synced_host}::log/httpd/*{date_str}*", str(target_dir_dated)],
|
||||
stdout=subprocess.DEVNULL,
|
||||
timeout=7200,
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
reason = "Timeout expired."
|
||||
else:
|
||||
if exitcode:
|
||||
reason = f"Error code: {exitcode}"
|
||||
else:
|
||||
break
|
||||
if attempt > 0:
|
||||
topic = "sync.host.logdate.fail.retry"
|
||||
else:
|
||||
topic = "sync.host.logdate.fail.final"
|
||||
log.info(
|
||||
"rsync from %s failed for %s for the last time",
|
||||
synced_host,
|
||||
log_date,
|
||||
)
|
||||
send_sync_msg(
|
||||
topic, synced_host=synced_host, log_date=log_date, reason=reason
|
||||
)
|
||||
|
||||
log.info(
|
||||
"... host %s, log date %s, linking back undated logfiles from dated",
|
||||
synced_host,
|
||||
log_date,
|
||||
)
|
||||
link_back_undated_logfiles(date, target_dir_dated, target_dir_undated)
|
||||
|
||||
send_sync_msg("sync.host.logdate.finish", synced_host=synced_host, log_date=log_date)
|
||||
|
||||
log.info(f"Finished sync from {synced_host}.")
|
||||
send_sync_msg("sync.host.finish", synced_host=synced_host)
|
||||
|
||||
|
||||
def init_worker():
|
||||
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||
|
||||
|
||||
# main
|
||||
|
||||
with open(CONFIG_FILE) as config_file:
|
||||
config = yaml.load(config_file)
|
||||
|
||||
worker_pool = Pool(PARALLEL_WORKERS, initializer=init_worker)
|
||||
|
||||
send_sync_msg("sync.start")
|
||||
log.info("=== START OF RUN ===")
|
||||
|
||||
log.debug("Mapping synced hosts to pool workers.")
|
||||
try:
|
||||
worker_pool.map(sync_http_logs, config["synced_hosts"])
|
||||
except KeyboardInterrupt:
|
||||
log.warn("Interrupted!")
|
||||
worker_pool.terminate()
|
||||
|
||||
send_sync_msg("sync.finish")
|
||||
log.info("=== FINISH OF RUN ===")
|
|
@ -1,138 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
RSYNC_FLAGS='-avSHP --no-motd --timeout=1200 --contimeout=1200'
|
||||
DEBUG=1
|
||||
|
||||
RUN_ID="$(uuidgen -r)"
|
||||
LOGHOST="$(hostname)"
|
||||
MSGTOPIC_PREFIX="logging.stats"
|
||||
|
||||
function send_bus_msg {
|
||||
local topic="${MSGTOPIC_PREFIX}.$1"
|
||||
shift
|
||||
local sent_at="$(TZ=UTC date -Iseconds)"
|
||||
local id="$(uuidgen -r)"
|
||||
local body_piece
|
||||
local body="{"
|
||||
local sep=""
|
||||
|
||||
for body_piece; do
|
||||
local key_type key type value
|
||||
key_type="${body_piece%%=*}"
|
||||
key="${key_type%%:*}"
|
||||
type="${key_type#${key}}"
|
||||
type="${type#:}"
|
||||
value="${body_piece#*=}"
|
||||
|
||||
if [ "$type" != "int" ]; then
|
||||
# quote strings
|
||||
value="${value//\\/\\\\}"
|
||||
value="${value//\"/\\\"}"
|
||||
value="\"${value}\""
|
||||
fi
|
||||
body="${body}${sep}\"${key}\": ${value}"
|
||||
sep=", "
|
||||
done
|
||||
body="${body}}"
|
||||
|
||||
fedora-messaging publish - << EOF >/dev/null
|
||||
{"body": ${body}, "headers": {"fedora_messaging_schema": "base.message", "fedora_messaging_severity": 20, "sent-at": "${sent_at}"}, "id": "${id}", "queue": "queue", "topic": "${topic}"}
|
||||
EOF
|
||||
}
|
||||
|
||||
function syncHttpLogs {
|
||||
HOST=$1
|
||||
send_bus_msg sync.host.start host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST"
|
||||
# in case we missed a run or two.. try to catch up the last 3 days.
|
||||
for d in 1 2 3; do
|
||||
# some machines store stuff in old format. some new.
|
||||
if [ "$2" = "old" ]; then
|
||||
YESTERDAY=$(/bin/date -d "-$d days" +%Y-%m-%d)
|
||||
else
|
||||
YESTERDAY=$(/bin/date -d "-$d days" +%Y%m%d)
|
||||
fi
|
||||
YEAR=$(/bin/date -d "-$d days" +%Y)
|
||||
MONTH=$(/bin/date -d "-$d days" +%m)
|
||||
DAY=$(/bin/date -d "-$d days" +%d)
|
||||
send_bus_msg sync.host.logdate.start host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST" log_date="${YEAR}-${MONTH}-${DAY}"
|
||||
/bin/mkdir -p /var/log/hosts/$HOST/$YEAR/$MONTH/$DAY/http
|
||||
cd /var/log/hosts/$HOST/$YEAR/$MONTH/$DAY/http/
|
||||
RSYNC_OUTPUT=$(/usr/bin/rsync $RSYNC_FLAGS --list-only $HOST::log/httpd/*$YESTERDAY* | grep xz$ | awk '{ print $5 }' )
|
||||
for f in ${RSYNC_OUTPUT}; do
|
||||
DEST=$(echo $f | /bin/sed s/-$YESTERDAY//)
|
||||
if [[ ${DEBUG} -eq 1 ]]; then
|
||||
echo "${HOST}: Getting ${f} and saving to ${DEST}"
|
||||
fi
|
||||
for i in 2 1 0; do
|
||||
timeout 2h /usr/bin/rsync $RSYNC_FLAGS $HOST::log/httpd/$f ./$DEST &> /dev/null && break
|
||||
if [[ $? -ne 0 ]]; then
|
||||
send_bus_msg sync.host.logdate.fail.retry host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST" log_date="${YEAR}-${MONTH}-${DAY}" failure="Error code: $?"
|
||||
echo "rsync from $HOST for file $f failed, will repeat $i times"
|
||||
else
|
||||
send_bus_msg sync.host.logdate.fail.final host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST" log_date="${YEAR}-${MONTH}-${DAY}" failure="Error code: $?"
|
||||
fi
|
||||
done
|
||||
done
|
||||
send_bus_msg sync.host.logdate.finish host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST" log_date="${YEAR}-${MONTH}-${DAY}"
|
||||
done
|
||||
send_bus_msg sync.host.finish host="$LOGHOST" run_id="$RUN_ID" synced_host="$HOST"
|
||||
}
|
||||
|
||||
send_bus_msg sync.start host="$LOGHOST" run_id="$RUN_ID"
|
||||
|
||||
syncHttpLogs proxy01.iad2.fedoraproject.org
|
||||
syncHttpLogs proxy02.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy03.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy04.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy05.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy06.vpn.fedoraproject.org
|
||||
# syncHttpLogs proxy08.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy09.vpn.fedoraproject.org # proxy09 is acting up
|
||||
syncHttpLogs proxy10.iad2.fedoraproject.org
|
||||
syncHttpLogs proxy11.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy12.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy13.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy14.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy30.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy31.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy32.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy33.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy34.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy35.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy36.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy37.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy38.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy39.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy40.vpn.fedoraproject.org
|
||||
syncHttpLogs proxy101.iad2.fedoraproject.org
|
||||
syncHttpLogs proxy110.iad2.fedoraproject.org
|
||||
# syncHttpLogs proxy01.stg.iad2.fedoraproject.org
|
||||
syncHttpLogs datagrepper01.iad2.fedoraproject.org
|
||||
# syncHttpLogs datagrepper02.iad2.fedoraproject.org
|
||||
# syncHttpLogs datagrepper01.stg.iad2.fedoraproject.org
|
||||
# syncHttpLogs badges-web01.iad2.fedoraproject.org
|
||||
# syncHttpLogs badges-web02.iad2.fedoraproject.org
|
||||
# syncHttpLogs badges-web01.stg.iad2.fedoraproject.org
|
||||
# syncHttpLogs packages03.iad2.fedoraproject.org
|
||||
# syncHttpLogs packages04.iad2.fedoraproject.org
|
||||
# syncHttpLogs packages03.stg.iad2.fedoraproject.org
|
||||
syncHttpLogs blockerbugs01.iad2.fedoraproject.org
|
||||
# syncHttpLogs blockerbugs02.iad2.fedoraproject.org
|
||||
# syncHttpLogs blockerbugs01.stg.iad2.fedoraproject.org
|
||||
syncHttpLogs value01.iad2.fedoraproject.org
|
||||
syncHttpLogs people02.vpn.fedoraproject.org
|
||||
syncHttpLogs noc01.iad2.fedoraproject.org
|
||||
syncHttpLogs dl01.iad2.fedoraproject.org
|
||||
syncHttpLogs dl02.iad2.fedoraproject.org
|
||||
syncHttpLogs dl03.iad2.fedoraproject.org
|
||||
syncHttpLogs dl04.iad2.fedoraproject.org
|
||||
syncHttpLogs dl05.iad2.fedoraproject.org
|
||||
syncHttpLogs download-rdu01.vpn.fedoraproject.org
|
||||
syncHttpLogs download-ib01.vpn.fedoraproject.org
|
||||
syncHttpLogs download-cc-rdu01.vpn.fedoraproject.org
|
||||
syncHttpLogs sundries01.iad2.fedoraproject.org
|
||||
# syncHttpLogs sundries02.iad2.fedoraproject.org
|
||||
# syncHttpLogs sundries01.stg.iad2.fedoraproject.org
|
||||
|
||||
send_bus_msg sync.finish host="$LOGHOST" run_id="$RUN_ID"
|
||||
## eof
|
|
@ -151,9 +151,30 @@
|
|||
- web-data
|
||||
- cron
|
||||
|
||||
- name: install a sync httpd logs cron script only on log01
|
||||
copy: src=syncHttpLogs.sh dest=/etc/cron.daily/syncHttpLogs.sh mode=0755
|
||||
- name: remove old syncHttpLogs.sh cron script only on log01
|
||||
file:
|
||||
path: /etc/cron.daily/syncHttpLogs.sh
|
||||
state: absent
|
||||
when: inventory_hostname.startswith('log01')
|
||||
tags:
|
||||
- web-data
|
||||
- cron
|
||||
|
||||
- name: write configuration file for script to sync httpd logs
|
||||
template:
|
||||
src: sync-http-logs.yaml.j2
|
||||
dest: /etc/sync-http-logs.yaml
|
||||
when: inventory_hostname.startswith('log01')
|
||||
tags:
|
||||
- web-data
|
||||
- config
|
||||
|
||||
- name: install a script to sync httpd logs via cron only on log01
|
||||
copy:
|
||||
src: sync-http-logs.py
|
||||
dest: /etc/cron.daily/sync-http-logs.py
|
||||
mode: 0755
|
||||
when: inventory_hostname.startswith('log01')
|
||||
tags:
|
||||
- web-data
|
||||
- cron
|
||||
|
|
56
roles/web-data-analysis/templates/sync-http-logs.yaml.j2
Normal file
56
roles/web-data-analysis/templates/sync-http-logs.yaml.j2
Normal file
|
@ -0,0 +1,56 @@
|
|||
---
|
||||
synced_hosts:
|
||||
- proxy01.iad2.fedoraproject.org
|
||||
- proxy02.vpn.fedoraproject.org
|
||||
- proxy03.vpn.fedoraproject.org
|
||||
- proxy04.vpn.fedoraproject.org
|
||||
- proxy05.vpn.fedoraproject.org
|
||||
- proxy06.vpn.fedoraproject.org
|
||||
# - proxy08.vpn.fedoraproject.org
|
||||
# proxy09 is acting up
|
||||
- proxy09.vpn.fedoraproject.org
|
||||
- proxy10.iad2.fedoraproject.org
|
||||
- proxy11.vpn.fedoraproject.org
|
||||
- proxy12.vpn.fedoraproject.org
|
||||
- proxy13.vpn.fedoraproject.org
|
||||
- proxy14.vpn.fedoraproject.org
|
||||
- proxy30.vpn.fedoraproject.org
|
||||
- proxy31.vpn.fedoraproject.org
|
||||
- proxy32.vpn.fedoraproject.org
|
||||
- proxy33.vpn.fedoraproject.org
|
||||
- proxy34.vpn.fedoraproject.org
|
||||
- proxy35.vpn.fedoraproject.org
|
||||
- proxy36.vpn.fedoraproject.org
|
||||
- proxy37.vpn.fedoraproject.org
|
||||
- proxy38.vpn.fedoraproject.org
|
||||
- proxy39.vpn.fedoraproject.org
|
||||
- proxy40.vpn.fedoraproject.org
|
||||
- proxy101.iad2.fedoraproject.org
|
||||
- proxy110.iad2.fedoraproject.org
|
||||
# - proxy01.stg.iad2.fedoraproject.org
|
||||
- datagrepper01.iad2.fedoraproject.org
|
||||
# - datagrepper02.iad2.fedoraproject.org
|
||||
# - datagrepper01.stg.iad2.fedoraproject.org
|
||||
# - badges-web01.iad2.fedoraproject.org
|
||||
# - badges-web02.iad2.fedoraproject.org
|
||||
# - badges-web01.stg.iad2.fedoraproject.org
|
||||
# - packages03.iad2.fedoraproject.org
|
||||
# - packages04.iad2.fedoraproject.org
|
||||
# - packages03.stg.iad2.fedoraproject.org
|
||||
- blockerbugs01.iad2.fedoraproject.org
|
||||
# - blockerbugs02.iad2.fedoraproject.org
|
||||
# - blockerbugs01.stg.iad2.fedoraproject.org
|
||||
- value01.iad2.fedoraproject.org
|
||||
- people02.vpn.fedoraproject.org
|
||||
- noc01.iad2.fedoraproject.org
|
||||
- dl01.iad2.fedoraproject.org
|
||||
- dl02.iad2.fedoraproject.org
|
||||
- dl03.iad2.fedoraproject.org
|
||||
- dl04.iad2.fedoraproject.org
|
||||
- dl05.iad2.fedoraproject.org
|
||||
# - download-rdu01.vpn.fedoraproject.org
|
||||
- download-ib01.vpn.fedoraproject.org
|
||||
- download-cc-rdu01.vpn.fedoraproject.org
|
||||
- sundries01.iad2.fedoraproject.org
|
||||
# - sundries02.iad2.fedoraproject.org
|
||||
# - sundries01.stg.iad2.fedoraproject.org
|
Loading…
Add table
Add a link
Reference in a new issue