Integrating MuleSoft AMC Logs with OpenObserve
Introduction
MuleSoftβs Anypoint Monitoring Console (AMC) provides runtime logs for deployed Mule applications. While useful in the Anypoint platform, teams often want to centralize logs in an observability platform for better search, dashboards, and alerting.
Overview
This guide shows how to stream MuleSoft AMC logs into OpenObserve using a small Python script. The script authenticates to Anypoint with a username/password, fetches logs from AMC, and pushes them into OpenObserve.
Steps to Integrate
Prerequisites
- A running MuleSoft CloudHub deployment (or Runtime Manager application).
- Python 3.8+ installed.
- OpenObserve account (Cloud or Self-Hosted)
Step 1: Create the Script
-
Save the following script as
mulesoft_logs_to_openobserve.py
:import datetime import requests import time import os, json # ------------------------------ # Configuration # ------------------------------ ANYPOINT_USERNAME = "YOUR_USERNAME" ANYPOINT_PASSWORD = "YOUR_PASSWORD" OPENOBSERVE_ENDPOINT = "https://<your-openobserve-endpoint>/api/default/mulesoft/_json" OPENOBSERVE_API_TOKEN = "BASE64_ENCODED_USER:PASSWORD" ANYPOINT_ORG_NAME = "<ANYPOINT_ORG_NAME>" ANYPOINT_ENV_NAME = "<ANYPOINT_ENV_NAME>" ANYPOINT_APP_NAME = "<ANYPOINT_APP_NAME>" STATE_FILE = "last_log_time.json" # ------------------------------ # Authentication # ------------------------------ def get_anypoint_token(username, password): url = "https://anypoint.mulesoft.com/accounts/login" headers = {"Content-Type": "application/json"} payload = {"username": username, "password": password} resp = requests.post(url, headers=headers, json=payload) resp.raise_for_status() return resp.json().get("access_token") # ------------------------------ # Helpers to resolve Org/Env/Deployment # ------------------------------ def get_org_id(token, org_name): url = "https://anypoint.mulesoft.com/accounts/api/me" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) resp.raise_for_status() orgs = resp.json()["user"]["memberOfOrganizations"] return next(o["id"] for o in orgs if o["name"] == org_name) def get_env_id(token, org_id, env_name): url = f"https://anypoint.mulesoft.com/accounts/api/organizations/{org_id}/environments" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) resp.raise_for_status() envs = resp.json()["data"] return next(e["id"] for e in envs if e["name"].lower() == env_name.lower()) def get_deployment_id(token, org_id, env_id, app_name): url = f"https://anypoint.mulesoft.com/amc/application-manager/api/v2/organizations/{org_id}/environments/{env_id}/deployments" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) resp.raise_for_status() deps = resp.json()["items"] return next(d["id"] for d in deps if d["name"] == app_name) def get_spec_id(token, org_id, env_id, dep_id): url = f"https://anypoint.mulesoft.com/amc/application-manager/api/v2/organizations/{org_id}/environments/{env_id}/deployments/{dep_id}/specs" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}) resp.raise_for_status() specs = resp.json() return specs[0].get("id") or specs[0].get("version") # ------------------------------ # Log fetch + dedup state # ------------------------------ def load_last_time(): if os.path.exists(STATE_FILE): return json.load(open(STATE_FILE)).get("last_time") return None def save_last_time(ts): json.dump({"last_time": ts}, open(STATE_FILE, "w")) def fetch_amc_logs(access_token, org_id, env_id, dep_id, spec_id, start_time=None): headers = {"Authorization": f"Bearer {access_token}"} all_logs = [] offset = 0 size = 50 # fetch 50 logs per request while True: params = {"descending": "false", "size": size, "offset": offset} if start_time: params["startTime"] = start_time resp = requests.get( f"https://anypoint.mulesoft.com/amc/application-manager/api/v2/" f"organizations/{org_id}/environments/{env_id}/deployments/{dep_id}/specs/{spec_id}/logs", headers=headers, params=params ) resp.raise_for_status() resp_json = resp.json() # print("DEBUG log response:", resp_json) # π check whatβs coming back logs = resp_json if not logs: break all_logs.extend(logs) if len(logs) < size: break offset += size return all_logs def parse_log(log): if isinstance(log, dict): return { "timestamp": log.get("timestamp", ""), "level": log.get("level", "INFO"), "message": log.get("message", "") } else: parts = log.split(" ", 2) return { "timestamp": parts[0] + " " + parts[1], "level": parts[2].split(" ", 1)[0], "message": parts[2].split(" ", 1)[1] if len(parts) >= 3 else log } # ------------------------------ # Push to OpenObserve # ------------------------------ def push_to_openobserve(logs, api_token, endpoint): headers = {"Authorization": f"Basic {api_token}", "Content-Type": "application/json"} for log_item in logs: log = parse_log(log_item) try: dt = datetime.datetime.strptime(log["timestamp"], "%Y-%m-%d %H:%M:%S") time_ns = int(dt.timestamp() * 1e9) time_ms = int(dt.timestamp() * 1000) except: now = time.time() time_ns = int(now * 1e9) time_ms = int(now * 1000) payload = { "timeUnixNano": time_ns, "severityText": log["level"], "name": "MuleApp", "body": log["message"], "app_name": ANYPOINT_APP_NAME } resp = requests.post(endpoint, headers=headers, json=payload) resp.raise_for_status() # return last timestamp in epoch millis return time_ms # ------------------------------ # Main Flow # ------------------------------ def main(): print("Authenticating...") token = get_anypoint_token(ANYPOINT_USERNAME, ANYPOINT_PASSWORD) print("Resolving Org / Env / Deployment / Spec IDs...") org_id = get_org_id(token, ANYPOINT_ORG_NAME) env_id = get_env_id(token, org_id, ANYPOINT_ENV_NAME) dep_id = get_deployment_id(token, org_id, env_id, ANYPOINT_APP_NAME) spec_id = get_spec_id(token, org_id, env_id, dep_id) last_time = load_last_time() print("Fetching logs...") logs = fetch_amc_logs(token, org_id, env_id, dep_id, spec_id, start_time=last_time) if logs: print(f"Pushing {len(logs)} logs to OpenObserve...") latest_ts = push_to_openobserve(logs, OPENOBSERVE_API_TOKEN, OPENOBSERVE_ENDPOINT) save_last_time(latest_ts) else: print("No new logs.") print("Done β ") if __name__ == "__main__": main()
-
Install Dependencies"
-
Edit the script and set:
ANYPOINT_USERNAME
/ANYPOINT_PASSWORD
β your MuleSoft account.ANYPOINT_ORG_NAME
,ANYPOINT_ENV_NAME
,ANYPOINT_APP_NAME
.OPENOBSERVE_ENDPOINT
β your OpenObserve API_json
ingestion URL.OPENOBSERVE_API_TOKEN
β base64-encoded user\:password.
-
Run Once
Step 2: Verify Logs in OpenObserve
- Go to Streams β mulesoft in OpenObserve to query logs.
Step 3: Automate
Run every 5 minutes with cron
:
Because the script saves the last log timestamp in last_log_time.json
, it only pushes new logs each run.
Tip
- CloudHub Logs: If your organization has enabled CloudHub, you can explore the CloudHub API to fetch instance logs programmatically.
- Custom Log Appenders: For more reliable log extraction, consider using Mule Custom Log Appenders instead of relying on the API. This allows you to push logs directly to OpenObserve or other observability platforms with minimal loss.
- Pagination & Limits: Always use pagination (
offset
+limit
) when fetching logs to avoid missing older logs.
Troubleshooting
- No logs fetched
- Verify AnyPoint credentials, org/env/app/spec names, and that the app is running. Use debug prints to check IDs.
- Duplicate logs
- Ensure last_log_time.json is updated and start_time is passed when fetching logs.
- Cron job not running
- Check syntax (/5 * * *) and Python path (which python3). Redirect output to a log file for debugging.
- Python not found
- Use the full virtualenv Python path in cron.
- API errors
- Print API responses to debug 401/404 issues.