2026-03-23 17:34:51 +05:30

538 lines
18 KiB
Python

import frappe
from frappe import _
from frappe.utils import nowdate
import json
def _ok(payload, code=200):
frappe.response.status_code = code
frappe.response.message = payload
def _err(msg, code=500):
frappe.response.status_code = code
frappe.response.message = {"error": msg}
@frappe.whitelist(allow_guest=True)
def get_number_cards():
"""
Returns counts for Number Cards:
- total_assets
- work_orders_open
- work_orders_in_progress
- work_orders_completed
"""
try:
total_assets = frappe.db.count("Asset")
work_orders_open = frappe.db.count("Work Order", {"status": ["in", ["Not Started", "Open", "Pending"]]})
work_orders_in_progress = frappe.db.count("Work Order", {"status": ["in", ["In Process", "In Progress", "Started"]]})
work_orders_completed = frappe.db.count("Work Order", {"status": ["in", ["Completed", "Closed", "Finished"]]})
_ok({
"total_assets": total_assets,
"work_orders_open": work_orders_open,
"work_orders_in_progress": work_orders_in_progress,
"work_orders_completed": work_orders_completed,
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "get_number_cards")
_err(str(e))
@frappe.whitelist(allow_guest=True)
def list_dashboard_charts(search=None, public_only=True, limit=50):
"""
List available Dashboard Chart docs and their y-axis rows.
"""
try:
filters = {}
if str(public_only) in ("1", "true", "True"): # tolerate string flags
filters["is_public"] = 1
charts = frappe.get_all(
"Dashboard Chart",
filters=filters,
fields=[
"name",
"chart_name",
"type",
"is_public",
"chart_type",
"report_name",
"use_report_chart",
"x_field",
"time_interval",
"timespan",
"custom_options",
],
limit=int(limit or 50),
order_by="modified desc",
)
for c in charts:
y_rows = frappe.get_all(
"Dashboard Chart Field",
filters={"parenttype": "Dashboard Chart", "parent": c["name"]},
fields=["y_field", "color"],
order_by="idx asc",
)
c["y_axes"] = y_rows
_ok({"charts": charts})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "list_dashboard_charts")
_err(str(e))
@frappe.whitelist(allow_guest=True)
def get_dashboard_chart_data(chart_name, report_filters=None):
"""
Return chart-ready JSON for any Dashboard Chart (Report-based or Custom).
Supports:
- Report-based charts (with or without use_report_chart)
- Custom/Document-type based charts
- Multi-series (stacked) bar charts
"""
try:
if isinstance(report_filters, str):
report_filters = json.loads(report_filters or "{}")
report_filters = report_filters or {}
# Try to find chart by name first, then by chart_name
chart = None
if frappe.db.exists("Dashboard Chart", chart_name):
chart = frappe.get_doc("Dashboard Chart", chart_name)
else:
# Search by chart_name field
chart_doc_name = frappe.db.get_value("Dashboard Chart", {"chart_name": chart_name}, "name")
if chart_doc_name:
chart = frappe.get_doc("Dashboard Chart", chart_doc_name)
else:
# Try case-insensitive search
chart_doc_name = frappe.db.sql("""
SELECT name FROM `tabDashboard Chart`
WHERE LOWER(chart_name) = LOWER(%s) OR LOWER(name) = LOWER(%s)
LIMIT 1
""", (chart_name, chart_name), as_dict=True)
if chart_doc_name:
chart = frappe.get_doc("Dashboard Chart", chart_doc_name[0].name)
if not chart:
_err(f"Dashboard Chart '{chart_name}' not found. Use list_dashboard_charts API to see available charts.")
return
# Get y-axes configuration
y_axes = frappe.get_all(
"Dashboard Chart Field",
filters={"parenttype": "Dashboard Chart", "parent": chart.name},
fields=["y_field", "color"],
order_by="idx asc",
)
# PRIORITY 1: Handle Report-based charts (check report_name first)
if chart.report_name:
return _process_report_chart(chart, y_axes, report_filters)
# PRIORITY 2: Handle Custom/Document-type charts (only if NO report_name)
if chart.chart_type == "Custom" or (chart.document_type and chart.based_on):
return _process_custom_chart(chart, y_axes)
# PRIORITY 3: Handle Group By / Count / Sum charts
if chart.chart_type in ("Group By", "Count", "Sum"):
return _process_group_by_chart(chart)
_err(f"Unsupported chart type: {chart.chart_type} for chart: {chart.name}")
except Exception as e:
frappe.log_error(frappe.get_traceback(), "get_dashboard_chart_data")
_err(str(e))
def _process_report_chart(chart, y_axes, report_filters):
"""
Process a Report-based Dashboard Chart.
Runs the report and extracts data based on x_field and y_axes configuration.
"""
try:
# Merge chart's default filters with provided filters
chart_filters = _parse_custom_options(chart.filters_json) if chart.filters_json else {}
merged_filters = {**chart_filters, **report_filters}
# Run the report
run = frappe.get_attr("frappe.desk.query_report.run")
report_result = run(chart.report_name, filters=merged_filters)
rows = report_result.get("result", []) or []
# Filter out total rows
data_rows = [r for r in rows if not (isinstance(r, dict) and r.get("is_total_row"))]
# Get x-axis labels
x_key = chart.x_field
labels = []
for r in data_rows:
if isinstance(r, dict):
val = r.get(x_key)
if val is not None:
labels.append(str(val))
# Build datasets from y-axes
datasets = []
# Color palette for multi-series charts
default_colors = [
"#6366F1", # Indigo
"#10B981", # Green
"#3B82F6", # Blue
"#F59E0B", # Amber
"#EC4899", # Pink
"#8B5CF6", # Purple
"#06B6D4", # Cyan
"#EF4444", # Red
]
for idx, y in enumerate(y_axes):
y_field = y.get("y_field")
series_name = y_field # Use field name as series name
values = []
for r in data_rows:
if isinstance(r, dict):
val = r.get(y_field)
try:
values.append(float(val) if val is not None else 0)
except (ValueError, TypeError):
values.append(0)
# Use provided color or default from palette
color = y.get("color") or default_colors[idx % len(default_colors)]
datasets.append({
"name": series_name,
"values": values,
"color": color
})
chart_type = (chart.type or "Bar").title()
custom_options = _parse_custom_options(chart.custom_options)
# Handle Pie charts (only use first dataset)
if chart_type.lower() == "pie":
ds = datasets[0] if datasets else {"name": "value", "values": []}
_ok({
"labels": labels,
"datasets": [ds],
"type": "Pie",
"options": custom_options,
"source": {"report": chart.report_name},
})
return
# Return data for Bar/Line charts (supports multi-series/stacked)
_ok({
"labels": labels,
"datasets": datasets,
"type": chart_type,
"options": custom_options,
"source": {"report": chart.report_name},
# Include result for frontend transformation if needed
"result": data_rows if len(y_axes) > 1 else None,
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), f"Report Chart Error: {chart.name}")
_err(f"Error processing report chart: {str(e)}")
def _process_custom_chart(chart, y_axes):
"""
Process a Custom/Document-type based Dashboard Chart.
Queries the source doctype directly based on configuration.
"""
try:
source = chart.document_type
based_on = chart.based_on
value_based_on = chart.value_based_on or "name"
if not source or not based_on:
_err(f"Custom chart '{chart.name}' missing document_type or based_on configuration")
return
# Build aggregation query
if chart.type and chart.type.lower() == "pie":
# Pie chart: Group by based_on field and count
data = frappe.db.sql(f"""
SELECT `{based_on}` as label, COUNT(`{value_based_on}`) as value
FROM `tab{source}`
GROUP BY `{based_on}`
ORDER BY value DESC
""", as_dict=True)
labels = [str(d.get("label") or "Unknown") for d in data]
values = [float(d.get("value") or 0) for d in data]
_ok({
"labels": labels,
"datasets": [{"name": "count", "values": values}],
"type": "Pie",
"options": _parse_custom_options(chart.custom_options),
"source": {"doctype": source},
})
else:
# Bar chart: Group by based_on
data = frappe.db.sql(f"""
SELECT `{based_on}` as label, COUNT(`{value_based_on}`) as value
FROM `tab{source}`
GROUP BY `{based_on}`
ORDER BY value DESC
LIMIT 20
""", as_dict=True)
labels = [str(d.get("label") or "Unknown") for d in data]
values = [float(d.get("value") or 0) for d in data]
_ok({
"labels": labels,
"datasets": [{"name": "count", "values": values, "color": "#4F46E5"}],
"type": "Bar",
"options": _parse_custom_options(chart.custom_options),
"source": {"doctype": source},
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), f"Custom Chart Error: {chart.name}")
_err(f"Error processing custom chart: {str(e)}")
def _process_group_by_chart(chart):
"""
Process a Group By / Count / Sum type Dashboard Chart.
These charts aggregate data from a doctype without using a report.
"""
try:
source = chart.document_type
if not source:
_err(f"Group By chart '{chart.name}' missing document_type")
return
group_by_field = chart.group_by_based_on or chart.based_on
aggregate_function = chart.chart_type # Count, Sum, etc.
value_field = chart.aggregate_function_based_on or "name"
if not group_by_field:
_err(f"Group By chart '{chart.name}' missing group_by field")
return
# Build query based on aggregate function
if aggregate_function == "Count":
query = f"""
SELECT `{group_by_field}` as label, COUNT(*) as value
FROM `tab{source}`
WHERE `{group_by_field}` IS NOT NULL
GROUP BY `{group_by_field}`
ORDER BY value DESC
LIMIT 20
"""
elif aggregate_function == "Sum":
query = f"""
SELECT `{group_by_field}` as label, SUM(`{value_field}`) as value
FROM `tab{source}`
WHERE `{group_by_field}` IS NOT NULL
GROUP BY `{group_by_field}`
ORDER BY value DESC
LIMIT 20
"""
else:
# Default to count
query = f"""
SELECT `{group_by_field}` as label, COUNT(*) as value
FROM `tab{source}`
WHERE `{group_by_field}` IS NOT NULL
GROUP BY `{group_by_field}`
ORDER BY value DESC
LIMIT 20
"""
data = frappe.db.sql(query, as_dict=True)
labels = [str(d.get("label") or "Unknown") for d in data]
values = [float(d.get("value") or 0) for d in data]
chart_type = (chart.type or "Bar").title()
_ok({
"labels": labels,
"datasets": [{"name": aggregate_function.lower(), "values": values, "color": "#6366F1"}],
"type": chart_type,
"options": _parse_custom_options(chart.custom_options),
"source": {"doctype": source},
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), f"Group By Chart Error: {chart.name}")
_err(f"Error processing group by chart: {str(e)}")
def _parse_custom_options(raw):
"""Parse custom_options JSON string to dict."""
if not raw:
return {}
try:
if isinstance(raw, dict):
return raw
return json.loads(raw)
except Exception:
return {}
@frappe.whitelist(allow_guest=True)
def get_repair_cost_by_item(year=None):
"""
Example specialized endpoint for 'Repair Cost' report style chart
(X: item_code, Y: amount, Filter: Year)
"""
try:
year = int(year or frappe.utils.getdate(nowdate()).year)
rows = frappe.db.sql(
"""
SELECT item_code, SUM(amount) as amount
FROM `tabWork Order` wo
WHERE YEAR(wo.posting_date) = %(year)s
GROUP BY item_code
ORDER BY amount DESC
""",
{"year": year},
as_dict=True,
)
labels = [r.item_code or "Unknown" for r in rows]
values = [float(r.amount or 0) for r in rows]
_ok({
"labels": labels,
"datasets": [{"name": f"Repair Cost {year}", "values": values}],
"type": "Bar",
"options": {},
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "get_repair_cost_by_item")
_err(str(e))
@frappe.whitelist(allow_guest=True)
def get_technician_working_hours(filters=None):
"""
Dedicated endpoint for Technician Working Hours chart.
Returns hours worked by each technician.
"""
try:
if isinstance(filters, str):
filters = json.loads(filters or "{}")
filters = filters or {}
# Run the report
run = frappe.get_attr("frappe.desk.query_report.run")
report_result = run("Technicians working Hours", filters=filters)
rows = report_result.get("result", []) or []
# Filter out total rows
data_rows = [r for r in rows if isinstance(r, dict) and not r.get("is_total_row")]
labels = [str(r.get("technician_name") or "Unknown") for r in data_rows]
values = [float(r.get("total_hours") or 0) for r in data_rows]
_ok({
"labels": labels,
"datasets": [{
"name": "total_hours",
"values": values,
"color": "#6366F1"
}],
"type": "Bar",
"options": {"barOptions": {"stacked": False}},
"result": data_rows,
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "get_technician_working_hours")
_err(str(e))
@frappe.whitelist(allow_guest=True)
def get_technician_work_summary(filters=None):
"""
Dedicated endpoint for Technician Work Order Summary chart.
Returns work order counts (total, completed, in_progress, open) by technician.
"""
try:
if isinstance(filters, str):
filters = json.loads(filters or "{}")
filters = filters or {}
# Run the report
run = frappe.get_attr("frappe.desk.query_report.run")
report_result = run("Technician Work Order Summary", filters=filters)
rows = report_result.get("result", []) or []
# Filter out total rows
data_rows = [r for r in rows if isinstance(r, dict) and not r.get("is_total_row")]
labels = [str(r.get("assigned_technician") or "Unknown") for r in data_rows]
# Build multi-series datasets
datasets = [
{
"name": "total",
"values": [float(r.get("total") or 0) for r in data_rows],
"color": "#6366F1" # Indigo
},
{
"name": "completed",
"values": [float(r.get("completed") or 0) for r in data_rows],
"color": "#10B981" # Green
},
{
"name": "in_progress",
"values": [float(r.get("in_progress") or 0) for r in data_rows],
"color": "#3B82F6" # Blue
},
{
"name": "open",
"values": [float(r.get("open") or 0) for r in data_rows],
"color": "#F59E0B" # Amber
}
]
# Filter out datasets with all zeros
datasets = [ds for ds in datasets if any(v > 0 for v in ds["values"])]
_ok({
"labels": labels,
"datasets": datasets,
"type": "Bar",
"options": {"barOptions": {"stacked": True}},
"result": data_rows,
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "get_technician_work_summary")
_err(str(e))
@frappe.whitelist(allow_guest=True)
def debug_list_charts():
"""
Debug endpoint to list all Dashboard Chart names.
Use this to verify exact chart names in the database.
"""
try:
charts = frappe.db.sql("""
SELECT name, chart_name, chart_type, report_name, is_public, type
FROM `tabDashboard Chart`
ORDER BY modified DESC
""", as_dict=True)
_ok({
"total": len(charts),
"charts": charts
})
except Exception as e:
frappe.log_error(frappe.get_traceback(), "debug_list_charts")
_err(str(e))