Multi-Server Log Summarization Pipeline with Local LLMs
A scalable, single-pass pipeline architecture that aggregates daily server metrics across any number of remote nodes to a centralized orchestration server. A local lightweight model (Llama 3.2 1B via Ollama) parses the aggregated logs and issues exactly one structural alert email via local SMTP if and only if actionable system anomalies or security compromises are identified.
๐๏ธ System Architecture Layout
The architecture reverses standard data delivery loops by using a secure pull framework. Instead of allowing production boxes to push metrics into your core infrastructure, the processing node reaches outward to pull logs. This ensures that a compromise on a public web cluster cannot expose internal parsing systems.
[ Remote Host 1 ] ----(Native cron.daily)----> Staged locally to /var/log/staged_logs/metrics.txt
[ Remote Host 2 ] ----(Native cron.daily)----> Staged locally to /var/log/staged_logs/metrics.txt
[ Remote Host N ] ----(Native cron.daily)----> Staged locally to /var/log/staged_logs/metrics.txt
โฒ
โ (Secure Pull via centralized rsync shell loop using an external inventory file)
โ
[ Local Processing Node ] โโโโโโโโโโโโโโโโโโโโโโบ Aggregates and runs local Python parsing loop
โ
โผ
Executes Llama 3.2 1B (Ollama)
โ
โผ
Local SMTP Alert Engine (Only on Anomaly)
๐ง Part 1: Installation & Optimization on the Local Processing Node
Step 1.1: Install Ollama Natively
Install natively via the official Linux shell script to guarantee access to full CPU vector execution enhancements (such as AVX2 or AVX-512) and systemd core-control layouts. Avoid container sandboxes (like Snap) for performance-critical CPU tasks:
bash
curl -fSSL https://ollama.com | sh
Use code with caution.
Step 1.2: Download the Target Processing Model
Download Meta’s ultra-lightweight 1-billion parameter model. It provides highly deterministic structured schema parsing while executing inside lightweight memory limits (<1.5GB RAM profile):
bash
ollama pull llama3.2:1b
Use code with caution.
Step 1.3: Apply CPU Engine Optimization Variables
CPU-bound inference requires strict mapping boundaries to avoid high-latency hyper-thread context switching. Open the systemd runtime controller manager layout:
bash
sudo systemctl edit ollama.service
Use code with caution.
In the active space provided by systemd, paste the core limitation settings (adjust OLLAMA_NUM_THREADS to match the exact count of your processor’s physical cores, not virtual threads):
ini
[Service]
Environment="OLLAMA_NUM_THREADS=4"
Environment="OLLAMA_NUM_PARALLEL=1"
Use code with caution.
Save the file and apply the environment limitations to the background daemon:
bash
sudo systemctl daemon-reload && sudo systemctl restart ollama
Use code with caution.
Step 1.4: Build the Isolated Application Environment Workspace
To comply with recent PEP 668 external file management guardrails, initialize an isolated python environment environment wrapper inside your log processor directory structure:
bash
sudo apt update && sudo apt install -y python3-venv
mkdir -p ~/log_processor/inbound_logs
cd ~/log_processor
python3 -m venv .venv
~/log_processor/.venv/bin/pip install -U ollama pydantic
Use code with caution.
๐งฑ Part 2: Single-Pass Configuration on Remote Edge Targets
Execute these steps across all target remote hosts managed by your pipeline.
Step 2.1: Establish the Read/Write Staging Workspace
Create a dedicated space for the logs and grant folder ownership permissions to your management automation user profile account (replace youruser with your network’s non-root SSH execution username context):
bash
sudo mkdir -p /var/log/staged_logs
sudo chown -R youruser:youruser /var/log/staged_logs
sudo chmod 755 /var/log/staged_logs
Use code with caution.
Step 2.2: Reconfigure Native Logwatch Mechanics
Modify the default global configuration behavior. This alters the native system cron.daily queue, forcing it to generate a single flat text copy directly inside your staging zone instead of trying to pass standard, unstructured emails through system mail:
bash
sudo nano /etc/logwatch/conf/logwatch.conf
Use code with caution.
Update or paste these overrides into the file:
ini
Output = file
Filename = /var/log/staged_logs/metrics.txt
Format = text
# Force file generation to adopt a clean permission bitmask configuration (644)
# This allows your SSH automation profile user to cleanly read and delete it post-transfer
UMask = 0022
Use code with caution.
Save and close out the configuration tool.
๐ป Part 3: Python Structuring Logic & Orchestration Architecture
Step 3.1: Create Your Host Inventory Document
Rather than hardcoding machine string targets inside execution lines, manage your nodes inside a clean external flat tracking file. This pattern lets you scale your architecture out to an unlimited count of endpoints simply by listing them line-by-line:
bash
nano ~/log_processor/hosts.txt
Use code with caution.
Paste your server connectivity details using line breaks (supports standard host addresses, custom domain names, or distinct infrastructure IP addresses):
text
://example.com
://example.com
staging-cluster.internal.net
Use code with caution.
Step 3.2: Write the Structured Pydantic Parser Engine
bash
nano ~/log_processor/parse_logs.py
Use code with caution.
Paste the following complete object program. This engine applies strict JSON formatting configurations directly against your local llama3.2:1b environment:
python
import os
import smtplib
from email.message import EmailMessage
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
from ollama import Client
# Interface natively with the optimized background Ollama port instance
client = Client(host='http://127.0.0.1:11434')
# Structural mapping constraints for strict model translation parsing loops
class AnomalyItem(BaseModel):
server_name: str = Field(
description="The explicit server identification key target extracted directly from the incoming header."
)
issue_description: str = Field(
description="A concise summary sentence describing the specific failure anomaly or tracking alert."
)
class LogAnalysis(BaseModel):
contains_actionable_anomaly: bool = Field(
description="True ONLY if there are urgent security metrics, drive structure errors, or kernel/application level crashes."
)
severity_level: str = Field(
description="Set to 'LOW', 'MEDIUM', or 'CRITICAL'. Use 'NONE' if zero parameters are flagged."
)
anomalies_list: list[AnomalyItem] = Field(
default=[],
description="A structural collection containing all tracking alerts discovered across the text payload context data."
)
def analyze_log_content(log_text):
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
prompt = f"""
Analyze the following Logwatch summary text collected from our server infrastructure for the date {target_date}.
Filter out all routine background noise (e.g. standard package updates, expected cron jobs, normal log rotations).
Identify critical structural failures, database corruption, active brute force attempts, or security risks.
Pay strict attention to the '=== SERVER_ID ===' headers to keep track of which server each log belongs to.
LOG DATA:
\"\"\"{log_text}\"\"\"
"""
response = client.chat(
model='llama3.2:1b',
options={"temperature": 0.0},
messages=[{"role": "user", "content": prompt}],
format=LogAnalysis.model_json_schema()
)
return LogAnalysis.model_validate_json(response['message']['content'])
def send_alert_email(anomalies_list, severity):
target_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
# Generate structural clean tracking alerts
formatted_summary = ""
for item in anomalies_list:
formatted_summary += f"* [{item.server_name}] {item.issue_description}\n"
msg = EmailMessage()
msg['Subject'] = f"โ ๏ธ [ALERT] Infrastructure Monitor ({target_date}) - Severity: {severity}"
msg['From'] = 'alert-agent@example.com'
msg['To'] = 'sysadmin@example.com' # <-- UPDATE YOUR DESTINATION INBOX HERE
msg.set_content(f"The local infrastructure analysis pipeline identified actionable anomalies for {target_date}:\n\n{formatted_summary}")
try:
with smtplib.SMTP('localhost') as server:
server.send_message(msg)
print("Alert email successfully dispatched via local SMTP relay.")
except Exception as e:
print(f"SMTP Transfer Failure: {e}")
if __name__ == "__main__":
log_directory = "/home/youruser/log_processor/inbound_logs" # <-- FIX ACCORDING TO USER PATH
combined_log_payload = ""
if os.path.exists(log_directory):
for filename in os.listdir(log_directory):
if filename.endswith(".txt"):
file_path = os.path.join(log_directory, filename)
# Extract file base name identifier parameters cleanly
base_name = os.path.splitext(filename)[0].upper()
with open(file_path, 'r') as f:
# Explicitly fence files within dynamic string blocks so the LLM identifies server boundaries
combined_log_payload += f"=== SERVER_ID: {base_name} ===\n" + f.read() + "\n\n"
os.remove(file_path) # Safe ingestion tracking wipe post-read
if combined_log_payload.strip():
result = analyze_log_content(combined_log_payload)
if result.contains_actionable_anomaly and result.anomalies_list:
print("Actionable metric vulnerabilities flagged. Dispatching context logs...")
send_alert_email(result.anomalies_list, result.severity_level)
else:
print("System baseline normal. Data stack clear.")
Use code with caution.
Step 3.3: Build the Shell Collection Utility Loop Script
bash
nano ~/log_processor/fetch_and_parse.sh
Use code with caution.
Paste this extensible loop. It reads the external hosts.txt document line-by-line, runs an automated connection script, cleans the remote storage tracks, and triggers the Python processing script:
bash
#!/bin/bash
BASE_DIR="/home/youruser/log_processor" # <-- FIX USER PATHS
INBOX_DIR="$BASE_DIR/inbound_logs"
HOST_FILE="$BASE_DIR/hosts.txt"
SSH_PORT="22" # <-- ADJUST SSH CONNECTIONS PORT PER WORKFLOW
SSH_USER="youruser" # <-- ADJUST SSH PROFILE SYSTEM CONNECTIONS USER
mkdir -p "$INBOX_DIR"
if [ ! -f "$HOST_FILE" ]; then
echo "Error: Infrastructure file file path target does not exist: $HOST_FILE"
exit 1
fi
# Dynamically parse through every node row present inside the listing file
while IFS= read -r host || [ -n "$host" ]; do
# Skip empty entries or commented reference variables
[[ -z "$host" || "$host" =~ ^# ]] && continue
# Strip any unexpected line trailing carriages clean
host=$(echo "$host" | tr -d '\r\n[:space:]')
echo "Retrieving active log cache tracking assets from: $host"
# Retrieve file safely while simultaneously issuing an immediate safe cleanup wipe upon transfer success
rsync -az --remove-source-files -e "ssh -p $SSH_PORT" \
"${SSH_USER}@${host}:/var/log/staged_logs/metrics.txt" \
"${INBOX_DIR}/${host}.txt" > /dev/null 2>&1
done < "$HOST_FILE"
# Fire the evaluation program using low prioritized kernel nice throttling profiles
nice -n 19 ionice -c 3 "${BASE_DIR}/.venv/bin/python3" "${BASE_DIR}/parse_logs.py"
Use code with caution.
Make the automation engine utility launch script fully executable:
bash
chmod +x ~/log_processor/fetch_and_parse.sh
Use code with caution.
โฐ Part 4: Scheduling and Automation
To run the orchestration loop systematically in the background, assign the orchestration shell tool straight to your cron manager directory. This ensures the workflow checks for new data early every morning after standard daily system scripts have wrapped up processing across your active remote networks.
Open your local automated terminal runtime configuration profile:
bash
crontab -e
Use code with caution.
Append this execution layout line right at the very bottom of the asset interface:
text
30 9 * * * /home/youruser/log_processor/fetch_and_parse.sh > /dev/null 2>&1
Use code with caution.

Leave a Reply
You must be logged in to post a comment.