Skip to main content

Backend Systems

Introduction

The Backend Systems of the ESG Environment Management project form the core infrastructure that powers comprehensive ESG data management, real-time processing, and advanced analytics. Built on the robust Frappe framework, the backend provides enterprise-grade capabilities including sophisticated data processing pipelines, intelligent metric calculations, comprehensive API services, and scalable IoT integration.

1. Core Backend Architecture

Frappe Framework Foundation

The system leverages Frappe's proven architecture while extending it with specialized ESG capabilities:

Application Configuration and Hooks

# Application metadata and configuration
app_name = "esg_enviroment"
app_title = "ESG Enviroment Management"
app_publisher = "Duongtk"
app_description = "ESG ENV"
app_email = "hyperdatavn@gmail.com"
app_license = "hyperdata"

# Logo and branding configuration
app_logo_url = "/assets/esg_enviroment/images/company_logo.png"
brand_html = '<div><img src="/assets/esg_enviroment/images/company_logo.png" alt="hyperdata" /></div>'

# Module inclusion
app_include_modules = ["ESG Enviroment Management"]

# Asset inclusion
app_include_css = ["/assets/esg_enviroment/css/esg_dashboard.bundle.css"]
app_include_js = ["/assets/esg_enviroment/js/custom_block_widget.js"]

Custom Commands and Fixtures

# Register custom commands for bench CLI
commands = ["esg_enviroment.esg_enviroment_management.utils.commands"]

# Ensure workspace is registered and visible
fixtures = [
{"dt": "Workspace", "filters": [["name", "in", ["ESG Enviroment", "Environmental Metrics"]]]},
{"dt": "Module Def", "filters": [["name", "=", "ESG Enviroment Management"]]},
{"dt": "Desktop Icon", "filters": [["module_name", "=", "ESG Enviroment Management"]]},
{"dt": "Custom Role", "filters": [["name", "in", ["ESG Manager", "ESG User"]]]},
{"dt": "Page", "filters": [["name", "in", ["esg-dashboard"]]]}
]

# API endpoints for IoT and data integration
api_endpoints = [
{'method': 'esg_enviroment.api.receive_mqtt_data', 'endpoint': 'esg/iot/mqtt'},
{'method': 'esg_enviroment.api.receive_bulk_data', 'endpoint': 'esg/iot/bulk'},
{'method': 'esg_enviroment.api.get_realtime_data', 'endpoint': 'esg/iot/data'}
]

2. Advanced API Layer

Comprehensive API Services

The backend provides a sophisticated API layer with multiple endpoints for different use cases:

ESG Metric Validation and Processing

def validate_esg_metric(data: Dict) -> tuple[bool, str]:
"""Validate ESG metric data"""
required_fields = ['metric_type', 'value', 'unit', 'reporting_period']

# Check required fields
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return False, f"Missing required fields: {', '.join(missing_fields)}"

# Validate metric type
valid_metrics = ['emissions', 'energy', 'water', 'waste', 'social', 'governance']
if data['metric_type'] not in valid_metrics:
return False, f"Invalid metric type. Must be one of: {', '.join(valid_metrics)}"

# Validate value
if not isinstance(data['value'], (int, float)) or data['value'] < 0:
return False, "Value must be a non-negative number"

return True, ""

ESG Score Calculation Engine

def calculate_esg_score(metrics: List[Dict]) -> Dict:
"""Calculate ESG scores from metrics"""
scores = {
'environmental': 0,
'social': 0,
'governance': 0,
'total': 0
}

weights = {
'emissions': 0.4,
'energy': 0.3,
'water': 0.15,
'waste': 0.15,
'social': 1.0,
'governance': 1.0
}

metric_counts = {'environmental': 0, 'social': 0, 'governance': 0}

for metric in metrics:
if metric['metric_type'] in ['emissions', 'energy', 'water', 'waste']:
category = 'environmental'
else:
category = metric['metric_type']

# Normalize value based on unit and type
normalized_value = normalize_metric_value(metric)

# Apply weight
weighted_value = normalized_value * weights.get(metric['metric_type'], 1.0)
scores[category] += weighted_value
metric_counts[category] += 1

# Calculate total score
scores['total'] = sum([
scores['environmental'] * 0.4,
scores['social'] * 0.3,
scores['governance'] * 0.3
])

return scores

IoT Data Integration APIs

MQTT Data Reception with Rate Limiting

@frappe.whitelist(allow_guest=True)
@rate_limit(limit=100, seconds=300) # 100 requests per 5 minutes
def receive_mqtt_data():
"""
API endpoint to receive MQTT data from an IoT broker

Expected payload format:
{
"sensor_id": "sensor-123",
"facility_id": "facility-xyz",
"reading_type": "Energy",
"reading_value": 123.45,
"reading_unit": "kWh",
"timestamp": "2023-01-15T12:30:45",
"data_category": "Energy Consumption",
"message_id": "mqtt-msg-123",
"protocol": "MQTT",
"metadata": {
"quality": "good",
"battery_level": 85,
"signal_strength": -65
}
}
"""
try:
if not frappe.request or not frappe.request.data:
log_api_call("receive_mqtt_data", False, "No data received")
return {"success": False, "message": "No data received"}

data = json.loads(frappe.request.data)

# Validate data
is_valid, error_message = validate_sensor_data(data)
if not is_valid:
log_api_call("receive_mqtt_data", False, error_message)
return {"success": False, "message": error_message}

# Create Realtime Data document with enhanced fields
doc = frappe.get_doc({
"doctype": "Realtime Data",
"sensor_id": data['sensor_id'],
"facility_id": data['facility_id'],
"reading_type": data['reading_type'],
"reading_value": flt(data['reading_value']),
"reading_unit": data.get('reading_unit'),
"timestamp": format_timestamp(data.get('timestamp')),
"data_category": data.get('data_category'),
"message_id": data.get('message_id'),
"protocol": data['protocol'],
"raw_data": frappe.as_json(data),
"status": "Pending",
"quality_indicator": metadata.get('quality', 'unknown'),
"battery_level": metadata.get('battery_level'),
"signal_strength": metadata.get('signal_strength'),
"metadata": frappe.as_json(metadata)
})

doc.insert(ignore_permissions=True)

# Process the data asynchronously if possible
if frappe.conf.get('enable_async_mqtt_processing'):
frappe.enqueue(
'esg_enviroment.api.process_realtime_data',
doc_name=doc.name,
queue='short',
timeout=300
)
status_message = "MQTT data received and queued for processing"
else:
# Process synchronously
doc.status = "Valid"
doc.process_data()
doc.save()
status_message = "MQTT data received and processed"

return {
"success": True,
"message": status_message,
"data_id": doc.name,
"timestamp": doc.timestamp
}

except Exception as e:
error_msg = f"Error receiving MQTT data: {str(e)}"
frappe.logger().error(error_msg)
return {"success": False, "message": error_msg}

Bulk Data Processing

@frappe.whitelist(allow_guest=True)
def receive_bulk_data():
"""
API endpoint to receive bulk data import from systems like SCADA or ERP
"""
try:
if frappe.request and frappe.request.data:
data = json.loads(frappe.request.data)

# Validate data structure
if 'readings' not in data or not isinstance(data['readings'], list):
return {"success": False, "message": "Missing or invalid readings array"}

if 'facility_id' not in data:
return {"success": False, "message": "Missing facility_id"}

# Process each reading
results = []

for reading in data['readings']:
try:
# Create Realtime Data document
doc = frappe.get_doc({
"doctype": "Realtime Data",
"sensor_id": reading['sensor_id'],
"facility_id": data['facility_id'],
"reading_type": reading['reading_type'],
"reading_value": flt(reading['reading_value']),
"reading_unit": reading.get('reading_unit'),
"timestamp": reading.get('timestamp', now_datetime()),
"data_category": reading.get('data_category'),
"protocol": "Batch",
"raw_data": frappe.as_json(reading),
"source_id": data.get('source_id'),
"status": "Pending"
})

doc.insert(ignore_permissions=True)

# Process the data
doc.status = "Valid"
doc.process_data()
doc.save()

results.append({
"success": True,
"message": "Data processed",
"data_id": doc.name,
"sensor_id": reading['sensor_id']
})

except Exception as e:
results.append({
"success": False,
"message": str(e),
"reading": reading
})

return {
"success": True,
"message": f"Processed {len(results)} readings",
"results": results
}

return {"success": False, "message": "No data received"}

except Exception as e:
frappe.logger().error(f"Error receiving bulk data: {str(e)}")
return {"success": False, "message": str(e)}

3. Document Processing Engine

ESG Report Generation System

The backend includes a sophisticated report generation system that handles complex ESG reporting requirements:

Intelligent Metric Aggregation

class ESGReport(Document):
def fetch_metric_values(self):
"""Fetch and populate metric values from ESG Metric Value records"""
if not self.metrics:
self.populate_standard_metrics()

# Get the year from the report period
year = getdate(self.report_start_date).year

# Get period details
period_doc = frappe.get_doc("ESG Period", self.esg_period)
period_type = period_doc.period_type

# Update metrics with values
for i, metric in enumerate(self.metrics):
# Get value based on ESG Period
current_period_value = self.get_metric_value_by_period(
metric.metric_definition,
self.esg_period
)

# Process based on period type
if period_type == PERIOD_TYPE_YEARLY:
self._handle_yearly_period(metric, current_period_value, year)
elif period_type == PERIOD_TYPE_QUARTERLY:
self._handle_quarterly_period(metric, current_period_value, period_doc, year)
elif period_type == PERIOD_TYPE_MONTHLY:
self._handle_monthly_period(metric, current_period_value, period_doc, year)

# Get previous year's value for comparison
self._set_previous_year_value(metric, period_type, year)

# Update in the document
self.metrics[i] = metric

Period-Based Data Handling

def _handle_quarterly_period(self, metric, current_period_value, period_doc, year):
"""Handle metrics for quarterly period types"""
# Determine which quarter this is based on the start date
period_month = getdate(period_doc.start_date).month

# Assign the value to the appropriate quarter based on the month
if 1 <= period_month <= 3: # Q1
metric.q1_value = current_period_value
elif 4 <= period_month <= 6: # Q2
metric.q2_value = current_period_value
elif 7 <= period_month <= 9: # Q3
metric.q3_value = current_period_value
elif 10 <= period_month <= 12: # Q4
metric.q4_value = current_period_value

# Find other quarterly periods for this year
other_quarters = self._get_other_quarterly_data(
metric.metric_definition,
year,
exclude_period=self.esg_period
)

# Set values for other quarters
for quarter in other_quarters:
quarter_month = getdate(quarter.get("start_date")).month
q_value = quarter.get("value")

# Assign to the correct quarter based on the month
if 1 <= quarter_month <= 3 and metric.q1_value is None:
metric.q1_value = q_value
elif 4 <= quarter_month <= 6 and metric.q2_value is None:
metric.q2_value = q_value
elif 7 <= quarter_month <= 9 and metric.q3_value is None:
metric.q3_value = q_value
elif 10 <= quarter_month <= 12 and metric.q4_value is None:
metric.q4_value = q_value

# Calculate annual value
self._calculate_annual_value(metric)

ESG Metric Value Processing

Advanced Calculation Engine

class ESGMetricValue(Document):
def execute_calculation_formula(self):
"""Execute the calculation formula from the metric definition to calculate the value"""
if not self.metric_definition:
return

try:
# Get the metric definition
metric_def = frappe.get_doc("ESG Metric Definition", self.metric_definition)

# If there's no calculation formula, do nothing
if not metric_def.calculation_formula or not metric_def.calculation_formula.strip():
return

# Collect data needed for the calculation
data = self.collect_calculation_data()

# Create a safe namespace for execution
namespace = {"frappe": frappe}

# Execute the formula code
exec(metric_def.calculation_formula, namespace)

# Check if the calculate function exists
if "calculate" not in namespace or not callable(namespace["calculate"]):
frappe.log_error(
f"Calculation formula for {metric_def.metric_code} does not define a calculate function",
"ESG Metric Calculation Error"
)
return

# Execute the calculation function
result = namespace["calculate"](data)

# Update the value field if the calculation was successful
if result is not None:
# Convert to float and round to 3 decimal places for consistency
self.value = round(float(result), 3)
frappe.msgprint(f"Value calculated successfully: {self.value}", indicator="green")

except Exception as e:
frappe.log_error(
f"Error executing calculation for {self.metric_definition}: {str(e)}",
"ESG Metric Calculation Error"
)
frappe.msgprint(
f"Error calculating value: {str(e)}",
indicator="red", alert=True
)

Category-Specific Data Collection

def collect_calculation_data(self):
"""Collect all the data needed for the calculation"""
period_details = self.get_period_details()

data = {
# Basic context information
"metric_code": self.metric_definition,
"esg_period": self.esg_period,
"period_type": period_details.get("period_type"),
"period_label": period_details.get("period_label"),
"year": period_details.get("year"),
"start_date": period_details.get("start_date"),
"end_date": period_details.get("end_date"),
"date": self.value_date,
"organization": self.organization,
"facility": self.facility,
"location": self.location,
"current_value": flt(self.value) if self.value else 0
}

# Add data based on metric category
metric_def = frappe.get_doc("ESG Metric Definition", self.metric_definition)

# Get category-specific data
if metric_def.category == "Environmental":
self.add_environmental_data(data, metric_def.subcategory)
elif metric_def.category == "Social":
self.add_social_data(data, metric_def.subcategory)
elif metric_def.category == "Governance":
self.add_governance_data(data, metric_def.subcategory)

# Add previous period data
self.add_previous_period_data(data)

return data

4. Real-time Data Processing

IoT Data Processing Pipeline

class RealtimeData(Document):
def validate(self):
"""Validate the Realtime Data document before saving"""
if not self.timestamp:
self.timestamp = now_datetime()

self.validate_reading_value()
self.check_data_quality()

def validate_reading_value(self):
"""Ensure reading value is valid based on reading type"""
# Check for negative values where not appropriate
if self.reading_value < 0 and self.reading_type in ["Energy", "Water", "Emissions", "Waste"]:
frappe.msgprint(f"Warning: Negative value ({self.reading_value}) detected for {self.reading_type}")

# Check for extreme values
reading_limits = {
"Energy": 1000000, # 1M kWh
"Water": 1000000, # 1M m³
"Emissions": 1000000 # 1M kg
}

if self.reading_type in reading_limits and self.reading_value > reading_limits[self.reading_type]:
frappe.msgprint(f"Warning: Extreme value ({self.reading_value}) detected for {self.reading_type}")

def process_data(self):
"""Process the data and create appropriate ESG records"""
if self.processed:
return

try:
# Based on reading type, create or update ESG records
if self.reading_type == "Energy" and self.data_category == "Energy Consumption":
self.create_energy_consumption()
elif self.reading_type == "Water" and self.data_category == "Water Consumption":
self.create_water_consumption()
elif self.reading_type == "Emissions" and self.data_category == "GHG Emissions":
self.create_ghg_emission()

# Mark as processed
self.processed = 1
self.db_update()

except Exception as e:
frappe.logger().error(f"Error processing real-time data {self.name}: {str(e)}")

Automated ESG Record Creation

def create_energy_consumption(self):
"""Create or update Energy Consumption record"""
# Convert timestamp to date
date = self.timestamp.date() if isinstance(self.timestamp, datetime.datetime) else datetime.datetime.strptime(self.timestamp, '%Y-%m-%d %H:%M:%S.%f').date()

# Check if energy consumption record already exists for this facility and date
existing = frappe.db.get_value(
"Energy Consumption",
{"facility_id": self.facility_id, "reporting_period": date},
"name"
)

if existing:
# Update existing record
doc = frappe.get_doc("Energy Consumption", existing)
if self.reading_unit == "kWh":
doc.electricity_kwh = flt(doc.electricity_kwh) + flt(self.reading_value)
doc.source_id = self.source_id
doc.save()
else:
# Create new record
doc = frappe.get_doc({
"doctype": "Energy Consumption",
"facility_id": self.facility_id,
"reporting_period": date,
"electricity_kwh": self.reading_value if self.reading_unit == "kWh" else 0,
"source_id": self.source_id
})
doc.insert()

5. Performance and Scalability

Asynchronous Processing

# Process the data asynchronously if possible
if frappe.conf.get('enable_async_mqtt_processing'):
frappe.enqueue(
'esg_enviroment.api.process_realtime_data',
doc_name=doc.name,
queue='short',
timeout=300
)
status_message = "MQTT data received and queued for processing"
else:
# Process synchronously
doc.status = "Valid"
doc.process_data()
doc.save()
status_message = "MQTT data received and processed"

def process_realtime_data(doc_name: str) -> None:
"""Process realtime data asynchronously"""
try:
doc = frappe.get_doc("Realtime Data", doc_name)
doc.status = "Valid"
doc.process_data()
doc.save()
except Exception as e:
frappe.logger().error(f"Error processing realtime data {doc_name}: {str(e)}")
if doc:
doc.status = "Error"
doc.error_message = str(e)
doc.save()

Caching and Optimization

  • Query Optimization: Efficient database queries with proper indexing
  • Result Caching: Cached calculation results for frequently accessed metrics
  • Connection Pooling: Optimized database connection management
  • Background Jobs: Heavy processing moved to background queues

6. Security and Compliance

API Security

  • Rate Limiting: Built-in rate limiting for all API endpoints
  • Authentication: Multiple authentication methods (API keys, OAuth2)
  • Input Validation: Comprehensive input validation and sanitization
  • Error Handling: Secure error handling without information leakage

Data Security

  • Audit Logging: Comprehensive audit trails for all data changes
  • Access Control: Role-based access control for sensitive data
  • Data Encryption: Support for data encryption at rest and in transit
  • Compliance: Built-in compliance features for various regulations

Key Features Summary

API Capabilities

  • Multi-Protocol Support: MQTT, HTTP/HTTPS, WebSocket, and batch processing
  • Rate Limiting: Built-in protection against API abuse
  • Comprehensive Validation: Multi-layer data validation and quality checks
  • Asynchronous Processing: Background job processing for heavy operations

Data Processing

  • Real-time Processing: Immediate processing of IoT sensor data
  • Intelligent Aggregation: Smart data aggregation based on metric types
  • Formula Engine: Flexible calculation engine for custom metrics
  • Quality Management: Automated data quality assessment and scoring

Performance Features

  • Scalable Architecture: Designed for high-volume data processing
  • Caching Strategy: Multi-level caching for optimal performance
  • Background Jobs: Asynchronous processing for resource-intensive tasks
  • Database Optimization: Efficient queries and indexing strategies

Conclusion

The Backend Systems of the ESG Environment Management project provide a robust, scalable, and secure foundation for comprehensive ESG data management. The combination of sophisticated data processing, real-time capabilities, advanced analytics, and enterprise-grade security makes it suitable for organizations of all sizes looking to enhance their sustainability management capabilities.