AWS Firehose sending data to ElasticSearch as string instead of a JSON object

72 Views Asked by At

I am building a log ingestion service using AWS Kinesis Firehose and ElasticSearch. I sending logs to firehose using a nodejs api. This the structure of the log I am sending

{
"level": "error",
"message": "Failed to connect to DB",
"resourceId": "server-1234",
"timestamp": "2023-09-15T08:00:00Z",
"traceId": "abc-xyz-123",
"spanId": "span-456",
"commit": "5e5342f",
"metadata": {
    "parentResourceId": "server-0987"
}

}

Before sending to firehose I am using JSON.stringify to convert this log object to string as firehose expects a string.However in my firehose delivery stream I am using a lambda function to transform it back into a json obejct. This is the lambda funciton I am using:

exports.handler = async (event) => {
const output = event.records.map((record) => {
    // Decoding the base64 data
    const payload = Buffer.from(record.data, 'base64').toString('utf8');
    let parsedData;

    try {
        parsedData = JSON.parse(payload);
    } catch (error) {
        console.error('Error parsing JSON:', error);
        return {
            recordId: record.recordId,
            result: 'ProcessingFailed',
            data: record.data,
        };
    }

    // Checking if 'message' exists and is a string that can be parsed as JSON
    if (typeof parsedData.message === 'string') {
        try {
            const messageData = JSON.parse(parsedData.message);
            parsedData = { ...parsedData, ...messageData };
            delete parsedData.message; 
        } catch (error) {
            console.error('Error parsing message JSON:', error);
            
        }
    }
    console.log(parsedData)
    // Re-encode the transformed data back to base64
    const outputData = Buffer.from(JSON.stringify(parsedData)).toString('base64');
    return {
        recordId: record.recordId,
        result: 'Ok',
        data: outputData,
    };
});

return { records: output };

};

However even after doing all this in my elastic search index the data is getting stored like thisenter image description here

Why is it storing it like a string? If I directly send the log to elasticsearch without having firehose in between it gets stored as a proper json object.Can I not send a json object using a firehose stream or am I missing some configuration???

this is my index mapping :

{
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date"
      },
      "aws": {
        "properties": {
          "firehose": {
            "properties": {
              "arn": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              },
              "parameters": {
                "properties": {
                  "es_datastream_name": {
                    "type": "text",
                    "fields": {
                      "keyword": {
                        "type": "keyword",
                        "ignore_above": 256
                      }
                    }
                  }
                }
              },
              "request_id": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              }
            }
          },
          "kinesis": {
            "properties": {
              "name": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              },
              "type": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              }
            }
          }
        }
      },
      "cloud": {
        "properties": {
          "account": {
            "properties": {
              "id": {
                "type": "text",
                "fields": {
                  "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                  }
                }
              }
            }
          },
          "provider": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "region": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          }
        }
      },
      "commit": {
        "type": "keyword"
      },
      "level": {
        "type": "keyword"
      },
      "message": {
        "type": "text",
        "fielddata": true
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "parentResourceId": {
            "type": "keyword"
          }
        }
      },
      "resourceId": {
        "type": "keyword"
      },
      "spanId": {
        "type": "keyword"
      },
      "timestamp": {
        "type": "date"
      },
      "traceId": {
        "type": "keyword"
      }
    }
  }
}
0

There are 0 best solutions below