//! AWS Pricing CRUD Lambda //! //! This Lambda function provides centralized CRUD operations for AWS pricing data, //! serving as the single source of truth for pricing operations across the Pathfinder //! application. It handles both HTTP API Gateway requests and MCP (Model Context Protocol) //! requests for pricing data stored in DynamoDB. //! //! # Operations //! - Get: Retrieve pricing from cache //! - Put: Store pricing data in cache //! - ListCommon: Query most accessed instances for cache refresh //! - IncrementAccess: Track instance access patterns //! //! # Data Flow //! ```text //! tool-pricing-query (MCP) } //! enrichment-server-pricing } → crud-pricing → DynamoDB //! tool-pricing-refresh } //! ``` mod db; mod models; use aws_config::BehaviorVersion; use aws_sdk_dynamodb::Client as DynamoDbClient; use lambda_runtime::{service_fn, Error, LambdaEvent}; use serde_json::Value; use std::env; use tracing::{error, info}; use crate::models::{PricingOperation, PricingRequest, PricingResponse, PricingType}; /// Main entry point for the Lambda function /// /// Initializes logging and starts the Lambda runtime with the function handler. /// /// # Returns /// Returns Ok(()) on successful initialization, or Error if startup fails #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .json() .init(); lambda_runtime::run(service_fn(function_handler)).await } /// Lambda function handler /// /// Routes incoming requests to either HTTP or MCP handlers based on request structure. /// Determines request type by checking for API Gateway HTTP-specific fields. /// /// # Arguments /// * `event` - Lambda event containing the request payload and context /// /// # Returns /// Returns the serialized response as JSON Value /// /// # Errors /// Returns Error if request parsing or processing fails async fn function_handler(event: LambdaEvent) -> Result { let (payload, _context) = event.into_parts(); info!("Received pricing request: {:?}", payload); // Load AWS config and create clients let config = aws_config::load_defaults(BehaviorVersion::latest()).await; let dynamodb_client = DynamoDbClient::new(&config); let table_name = env::var("TABLE_NAME").unwrap_or_else(|_| "pathfinder-dev-pricing".to_string()); // Determine request source by checking for HTTP-specific fields let is_http_v2 = payload .get("requestContext") .and_then(|rc| rc.get("http")) .and_then(|http| http.get("method")) .is_some(); if is_http_v2 { // Handle HTTP request from API Gateway handle_http_request(payload, &dynamodb_client, &table_name).await } else { // Handle MCP request handle_mcp_request(payload, &dynamodb_client, &table_name).await } } /// Handles HTTP requests from API Gateway /// /// Processes HTTP GET requests for pricing data, extracting the instance type /// from path parameters and returning formatted HTTP responses. /// /// # Arguments /// * `payload` - API Gateway HTTP request payload /// * `dynamodb_client` - DynamoDB client for data access /// * `table_name` - Name of the DynamoDB pricing table /// /// # Returns /// Returns HTTP response with status code, headers, and body /// /// # Errors /// Returns Error if instanceType path parameter is missing or DynamoDB query fails async fn handle_http_request( payload: Value, dynamodb_client: &DynamoDbClient, table_name: &str, ) -> Result { // Extract path parameters let instance_type = payload .get("pathParameters") .and_then(|p| p.get("instanceType")) .and_then(|it| it.as_str()) .ok_or("Missing instanceType in path")?; info!("HTTP GET /pricing/{}", instance_type); // Fetch pricing for the instance type (use defaults for HTTP requests) let result = db::get_pricing( dynamodb_client, table_name, instance_type, "us-east-1", &PricingType::Retail, None ).await; match result { Ok(Some(pricing)) => { let response = serde_json::json!({ "statusCode": 200, "headers": { "Content-Type": "application/json" }, "body": serde_json::to_string(&pricing)? }); Ok(response) } Ok(None) => { info!("Pricing not found for: {}", instance_type); let response = serde_json::json!({ "statusCode": 404, "headers": { "Content-Type": "application/json" }, "body": serde_json::to_string(&serde_json::json!({ "error": "Pricing not found" }))? }); Ok(response) } Err(e) => { error!("Failed to fetch pricing: {}", e); let response = serde_json::json!({ "statusCode": 500, "headers": { "Content-Type": "application/json" }, "body": serde_json::to_string(&serde_json::json!({ "error": e }))? }); Ok(response) } } } /// Handles MCP (Model Context Protocol) requests /// /// Processes structured MCP requests containing pricing operations, /// routes to appropriate handler, and returns structured responses. /// /// # Arguments /// * `payload` - MCP request payload containing operation details /// * `dynamodb_client` - DynamoDB client for data access /// * `table_name` - Name of the DynamoDB pricing table /// /// # Returns /// Returns structured PricingResponse serialized as JSON Value /// /// # Errors /// Returns Error if request parsing fails or operation execution fails async fn handle_mcp_request( payload: Value, dynamodb_client: &DynamoDbClient, table_name: &str, ) -> Result { // Parse MCP request let request: PricingRequest = match serde_json::from_value(payload) { Ok(req) => req, Err(e) => { error!("Invalid request: {}", e); let response = PricingResponse::error(400, &format!("Invalid request: {}", e)); return Ok(serde_json::to_value(response)?); } }; // Route operation let result = handle_operation( request.operation, dynamodb_client, table_name, ) .await; let response = match result { Ok(data) => PricingResponse::success(data), Err(e) => { error!("Operation failed: {}", e); PricingResponse::error(500, &e) } }; Ok(serde_json::to_value(response)?) } /// Routes pricing operations to appropriate handlers /// /// Dispatches each operation type (Get, Put, ListCommon, IncrementAccess) to its /// specific handler function and returns the result as JSON. /// /// # Arguments /// * `operation` - The pricing operation to execute /// * `dynamodb_client` - DynamoDB client for data access /// * `table_name` - Name of the DynamoDB pricing table /// /// # Returns /// Returns operation result serialized as JSON Value /// /// # Errors /// Returns error message string if operation fails async fn handle_operation( operation: PricingOperation, dynamodb_client: &DynamoDbClient, table_name: &str, ) -> Result { match operation { PricingOperation::Get { instance_type, region, pricing_type, aws_account_id, fetch_if_missing: _, } => { info!("Operation: Get (type={:?})", pricing_type); // Check cache let cached = db::get_pricing( dynamodb_client, table_name, &instance_type, ®ion, &pricing_type, aws_account_id.as_deref(), ) .await?; match cached { Some(mut pricing) => { // Cache hit - increment access count asynchronously let client = dynamodb_client.clone(); let table = table_name.to_string(); let inst = instance_type.clone(); let reg = region.clone(); let pt = pricing_type.clone(); let acc_id = aws_account_id.clone(); tokio::spawn(async move { let _ = db::increment_access_count( &client, &table, &inst, ®, &pt, acc_id.as_deref(), ) .await; }); pricing.access_count += 1; // Show incremented count in response Ok(serde_json::json!({ "pricing": pricing, "cacheStatus": "hit" })) } None => { Err("Pricing not found in cache".to_string()) } } } PricingOperation::Put { instance_type, region, pricing_type, pricing_data, } => { info!("Operation: Put ({} in {})", instance_type, region); db::put_pricing(dynamodb_client, table_name, &pricing_data).await?; Ok(serde_json::json!({ "success": true, "instanceType": instance_type, "region": region, "pricingType": pricing_type })) } PricingOperation::ListCommon { limit, min_access_count, } => { info!("Operation: ListCommon (limit={}, min_access={})", limit, min_access_count.unwrap_or(0)); let common = db::query_most_accessed(dynamodb_client, table_name, limit, min_access_count).await?; Ok(serde_json::json!({ "instances": common, "count": common.len() })) } PricingOperation::IncrementAccess { instance_type, region, } => { info!("Operation: IncrementAccess ({} in {})", instance_type, region); // Default to retail for access counting db::increment_access_count( dynamodb_client, table_name, &instance_type, ®ion, &PricingType::Retail, None, ) .await?; Ok(serde_json::json!({ "success": true, "instanceType": instance_type, "region": region })) } } } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn test_http_request_detection() { // Test that HTTP v2 request is detected correctly let http_payload = json!({ "requestContext": { "http": { "method": "GET", "path": "/pricing/m6g.xlarge" } }, "pathParameters": { "instanceType": "m6g.xlarge" } }); let is_http = http_payload .get("requestContext") .and_then(|rc| rc.get("http")) .and_then(|http| http.get("method")) .is_some(); assert!(is_http); } #[test] fn test_mcp_request_detection() { // Test that MCP request is detected correctly (no HTTP fields) let mcp_payload = json!({ "operation": { "type": "get", "instanceType": "m6g.xlarge", "region": "us-east-1", "pricingType": "retail" } }); let is_http = mcp_payload .get("requestContext") .and_then(|rc| rc.get("http")) .and_then(|http| http.get("method")) .is_some(); assert!(!is_http); } #[test] fn test_path_parameter_extraction() { let payload = json!({ "pathParameters": { "instanceType": "t3.medium" } }); let instance_type = payload .get("pathParameters") .and_then(|p| p.get("instanceType")) .and_then(|it| it.as_str()); assert_eq!(instance_type, Some("t3.medium")); } #[test] fn test_missing_path_parameter() { let payload = json!({ "pathParameters": {} }); let instance_type = payload .get("pathParameters") .and_then(|p| p.get("instanceType")) .and_then(|it| it.as_str()); assert_eq!(instance_type, None); } #[test] fn test_mcp_request_parsing_valid() { let payload = json!({ "operation": { "type": "get", "instanceType": "m6g.xlarge", "region": "us-east-1", "pricingType": "retail" } }); let result: Result = serde_json::from_value(payload); assert!(result.is_ok()); } #[test] fn test_mcp_request_parsing_invalid() { let payload = json!({ "operation": { "type": "invalid_operation" } }); let result: Result = serde_json::from_value(payload); assert!(result.is_err()); } #[test] fn test_operation_type_extraction() { let get_op = json!({ "operation": { "type": "get", "instanceType": "m6g.xlarge", "region": "us-east-1", "pricingType": "retail" } }); let request: PricingRequest = serde_json::from_value(get_op).unwrap(); assert!(matches!(request.operation, PricingOperation::Get { .. })); let list_op = json!({ "operation": { "type": "listCommon", "limit": 100 } }); let request: PricingRequest = serde_json::from_value(list_op).unwrap(); assert!(matches!(request.operation, PricingOperation::ListCommon { .. })); let inc_op = json!({ "operation": { "type": "incrementAccess", "instanceType": "t3.medium", "region": "us-east-1" } }); let request: PricingRequest = serde_json::from_value(inc_op).unwrap(); assert!(matches!(request.operation, PricingOperation::IncrementAccess { .. })); } }