Using Amazon Bedrock Knowledge Base Application Logs for Notifications

Introduction In the earlier blog post Building a Data Ingestion Solution for Amazon Bedrock Knowledge Bases, we developed a data ingestion solution that includes job completion notifications with a status pull mechanism which wasn’t as efficient as it could be. Since then, we examined Knowledge Bases logging which publishes ingestion job log events to CloudWatch Logs, which opens up a new opportunity for a better design with a status push mechanism based on subscription filters. In this blog post, we will examine how to update the original solution with the new design. Updated Design Overview The overall design of the updated solution is depicted in the following diagram: The updated solution works as follows: Logging is configured for the Bedrock Knowledge Base to deliver logs to CloudWatch Logs. A subscription filter is created in the associated log group to filter ingestion job status change events that correspond to an end state and send log events to a Lambda function. A Lambda function, triggered by an EventBridge schedule rule, periodically starts an ingestion (a.k.a. sync) job for each specified knowledge base and data source. Note that the SQS queue is removed as it is no longer necessary. Another Lambda function serves as the destination for the subscription filter. For each event message that is extracted from the log events, the function uses the job ID information to get details about the ingestion job. A notification is sent to one of the two SNS topics depending on whether the job is successful or failed. Updating the Components As the SQS queue is not required, the only change to the Lambda function that starts the ingestion job is a minor cleanup. The updated Lambda function code is as follows: import boto3 import json from botocore.exceptions import ClientError bedrock_agent = boto3.client('bedrock-agent') ssm = boto3.client('ssm') def lambda_handler(event, context): try: # Retrieve the JSON config from Parameter Store response = ssm.get_parameter(Name='/start-kb-ingestion-jobs/config-json') config_json = response['Parameter']['Value'] config = json.loads(config_json) for record in config: knowledge_base_id = record.get('knowledge_base_id') for data_source_id in record.get('data_source_ids'): # Start the ingestion job print(f'Starting ingestion job for data source {data_source_id} of knowledge base {knowledge_base_id}') response = bedrock_agent.start_ingestion_job( knowledgeBaseId=knowledge_base_id, dataSourceId=data_source_id ) return { 'statusCode': 200, 'body': 'Success' } except ClientError as e: return { 'statusCode': 500, 'body': f'Client error: {str(e)}' } except Exception as e: return { 'statusCode': 500, 'body': f'Unexpected error: {str(e)}' } Meanwhile, updating the component that checks ingestion job statuses is slightly more complex. First, we need to update the check-kb-job-statuses Lambda function to be a subscription filter target. As described in the Log group-level subscription filters page of the CloudWatch Logs user guide, the log data received by the function is compressed, Base64-encoded, and batched. I was able to easily find this StackOverflow question which has the exact code we need in the first answer. Next, we need to know what a relevant log event looks like. The examples of knowledge base logs in the AWS documentation provide the general format for an ingestion job event, however it is preferable to look at an actual log event. Here’s one that captures a job completion event for a successful job: { "event_timestamp": 1740895462316, "event": { "ingestion_job_id": "W0V45LVZY6", "data_source_id": "ATUWOVZJOD", "ingestion_job_status": "COMPLETE", "knowledge_base_arn": "arn:aws:bedrock:us-east-1::knowledge-base/R1K1UIZKKQ", "resource_statistics": { "number_of_resources_updated": 366, "number_of_resources_ingested": 0, "number_of_resources_deleted": 0, "number_of_resources_with_metadata_updated": 0, "number_of_resources_failed": 15 } }, "event_version": "1.0", "event_type": "StartIngestionJob.StatusChanged", "level": "INFO" } The Lambda function must extract the following details from the log event: The knowledge base ID, which can be extracted from the value of the event.knowledge_base_arn, specifically after the /. The data source ID, which is the value of the event.data_source_id field. The ingestion job ID, which is the value of the event.ingestion_job_id field. Although we are able to extract all of the information required for notification, the log event does not contain

Apr 2, 2025 - 06:20
 0
Using Amazon Bedrock Knowledge Base Application Logs for Notifications

Introduction

In the earlier blog post Building a Data Ingestion Solution for Amazon Bedrock Knowledge Bases, we developed a data ingestion solution that includes job completion notifications with a status pull mechanism which wasn’t as efficient as it could be. Since then, we examined Knowledge Bases logging which publishes ingestion job log events to CloudWatch Logs, which opens up a new opportunity for a better design with a status push mechanism based on subscription filters. In this blog post, we will examine how to update the original solution with the new design.

Updated Design Overview

The overall design of the updated solution is depicted in the following diagram:

Updated solution architecture

The updated solution works as follows:

  1. Logging is configured for the Bedrock Knowledge Base to deliver logs to CloudWatch Logs. A subscription filter is created in the associated log group to filter ingestion job status change events that correspond to an end state and send log events to a Lambda function.

  2. A Lambda function, triggered by an EventBridge schedule rule, periodically starts an ingestion (a.k.a. sync) job for each specified knowledge base and data source. Note that the SQS queue is removed as it is no longer necessary.

  3. Another Lambda function serves as the destination for the subscription filter. For each event message that is extracted from the log events, the function uses the job ID information to get details about the ingestion job. A notification is sent to one of the two SNS topics depending on whether the job is successful or failed.

Updating the Components

As the SQS queue is not required, the only change to the Lambda function that starts the ingestion job is a minor cleanup. The updated Lambda function code is as follows:

import boto3
import json
from botocore.exceptions import ClientError

bedrock_agent = boto3.client('bedrock-agent')
ssm = boto3.client('ssm')


def lambda_handler(event, context):
    try:
        # Retrieve the JSON config from Parameter Store
        response = ssm.get_parameter(Name='/start-kb-ingestion-jobs/config-json')
        config_json = response['Parameter']['Value']
        config = json.loads(config_json)

        for record in config:
            knowledge_base_id = record.get('knowledge_base_id')
            for data_source_id in record.get('data_source_ids'):
                # Start the ingestion job
                print(f'Starting ingestion job for data source {data_source_id} of knowledge base {knowledge_base_id}')
                response = bedrock_agent.start_ingestion_job(
                    knowledgeBaseId=knowledge_base_id,
                    dataSourceId=data_source_id
                )
        return {
            'statusCode': 200,
            'body': 'Success'
        }
    except ClientError as e:
        return {
            'statusCode': 500,
            'body': f'Client error: {str(e)}'
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': f'Unexpected error: {str(e)}'
        }

Meanwhile, updating the component that checks ingestion job statuses is slightly more complex. First, we need to update the check-kb-job-statuses Lambda function to be a subscription filter target. As described in the Log group-level subscription filters page of the CloudWatch Logs user guide, the log data received by the function is compressed, Base64-encoded, and batched. I was able to easily find this StackOverflow question which has the exact code we need in the first answer.

Next, we need to know what a relevant log event looks like. The examples of knowledge base logs in the AWS documentation provide the general format for an ingestion job event, however it is preferable to look at an actual log event. Here’s one that captures a job completion event for a successful job:

{
    "event_timestamp": 1740895462316,
    "event": {
        "ingestion_job_id": "W0V45LVZY6",
        "data_source_id": "ATUWOVZJOD",
        "ingestion_job_status": "COMPLETE",
        "knowledge_base_arn": "arn:aws:bedrock:us-east-1::knowledge-base/R1K1UIZKKQ",
        "resource_statistics": {
            "number_of_resources_updated": 366,
            "number_of_resources_ingested": 0,
            "number_of_resources_deleted": 0,
            "number_of_resources_with_metadata_updated": 0,
            "number_of_resources_failed": 15
        }
    },
    "event_version": "1.0",
    "event_type": "StartIngestionJob.StatusChanged",
    "level": "INFO"
}

The Lambda function must extract the following details from the log event:

  1. The knowledge base ID, which can be extracted from the value of the event.knowledge_base_arn, specifically after the /.

  2. The data source ID, which is the value of the event.data_source_id field.

  3. The ingestion job ID, which is the value of the event.ingestion_job_id field.

Although we are able to extract all of the information required for notification, the log event does not contain the verbose content ingestion failures which we get from the response of the GetIngestionJob API action. Although this approach is slightly less efficient, we will still call the API for completeness. The resulting Lambda function should look like this:

import base64
import boto3
import gzip
import json
from botocore.exceptions import ClientError

bedrock_agent = boto3.client('bedrock-agent')
ssm = boto3.client('ssm')
sns = boto3.client('sns')


def get_ssm_parameter(name):
    response = ssm.get_parameter(Name=name, WithDecryption=True)
    return response['Parameter']['Value']


def get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id):
    response = bedrock_agent.get_ingestion_job(
        knowledgeBaseId=knowledge_base_id,
        dataSourceId=data_source_id,
        ingestionJobId=ingestion_job_id
    )
    return response['ingestionJob']


def lambda_handler(event, context):
    try:
        success_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/success-sns-topic-arn')
        failure_sns_topic_arn = get_ssm_parameter('/check-kb-ingestion-job-statuses/failure-sns-topic-arn')

        encoded_zipped_data = event['awslogs']['data']
        zipped_data = base64.b64decode(encoded_zipped_data)
        data = json.loads(gzip.decompress(zipped_data))
        log_events = data['logEvents']
        for log_event in log_events:
            message = json.loads(log_event['message'])
            knowledge_base_arn = message['event']['knowledge_base_arn']
            knowledge_base_id = knowledge_base_arn.split('/')[-1]
            data_source_id = message['event']['data_source_id']
            ingestion_job_id = message['event']['ingestion_job_id']

            print(
                f'Checking ingestion job status for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id}')
            ingestion_job = get_ingestion_job(knowledge_base_id, data_source_id, ingestion_job_id)
            print(
                f'Ingestion job summary: \n\n{json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)}')
            job_status = ingestion_job['status']
            if job_status == 'COMPLETE':
                sns.publish(
                    TopicArn=success_sns_topic_arn,
                    Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} Completed',
                    Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                )
            elif job_status == 'FAILED':
                sns.publish(
                    TopicArn=failure_sns_topic_arn,
                    Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} FAILED',
                    Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                )
            elif job_status == 'STOPPED':
                sns.publish(
                    TopicArn=failure_sns_topic_arn,
                    Subject=f'Ingestion job for knowledge base {knowledge_base_id} data source {data_source_id} job {ingestion_job_id} STOPPED',
                    Message=json.dumps(ingestion_job, indent=2, sort_keys=True, default=str)
                )
        return {
            'statusCode': 200,
            'body': 'Success'
        }
    except ClientError as e:
        return {
            'statusCode': 500,
            'body': f'Client error: {str(e)}'
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': f'Unexpected error: {str(e)}'
        }

Lastly, we need to create a subscription filter in the log group that acts as the log delivery destination of the knowledge base. Since we are only interested in log events for ingestion job completion, we need to define an appropriate subscription filter pattern. There are two fields which we need for this purpose:

  1. The event_type field with the value StartIngestionJob.StatusChanged.

  2. The event.ingestion_job_status field with the value matching one of COMPLETE, FAILED, CRAWLING_COMPLETED, as described in the data ingestion job log example.

Based on some testing, a CRAWLING_COMPLETED event does not indicate a full completion of an ingestion job. A COMPLETE (and presumably FAILED) event is always sent upon job completion. So we can use COMPLETE and FAILED for the filter. Furthermore, stopping a job does not generate an event and there is no status value for it. This seems like a miss on AWS’ part, so I’ll open an AWS support case for it. For now, we will still add STOPPED to the filter for the sake of completeness.

Referring to subscription filter pattern for JSON log events, we can define our compound expression that checks for the event type and ingestion job status as follows:

{$.event_type = "StartIngestionJob.StatusChanged" && ($.event.ingestion_job_status = "COMPLETE" || $.event.ingestion_job_status = "FAILED" || $.event.ingestion_job_status = "STOPPED")}

We can first test the pattern in the AWS Management Console's subscription filter creation dialog without creating the filter. Later, we will implement it using Terraform. Here is a screenshot of what the dialog looks like:

Testing pattern in the subscription filter creation dialog

In this example, the subscription filter is created in the log group as evident by the standard naming pattern. Knowledge base logs are written to the log stream bedrock/knowledgebaselogs, so we need to select that. Using the Test pattern button, we can see one filtered entry in the test results among 50 log events. The log events were generated from a single ingestion job, and the other events are either resource change events or non-related status change events.

Updating the Terraform Configuration

The following changes are required to the original solution’s Terraform configuration to support the new design:

  • Remove the SQS queue, the associated IAM permissions, and the SSM parameters.

  • Update the Lambda permission for the check-kb-intgestion-job-statuses to allow trigger from CloudWatch logs via the log group to which the Bedrock knowledge base writes its application logs.

Lastly, we need a new resource for the subscription filter as follows:

resource "aws_cloudwatch_log_subscription_filter" "check_kb_ingestion_job_statuses" {
  name            = "check-kb-ingestion-job-statuses"
  log_group_name  = var.kb_app_log_group_name
  filter_pattern  = "{$.event_type = \"StartIngestionJob.StatusChanged\" && ($.event.ingestion_job_status = \"COMPLETE\" || $.event.ingestion_job_status = \"FAILED\" || $.event.ingestion_job_status = \"STOPPED\")}"
  destination_arn = aws_lambda_function.check_kb_ingestion_job_statuses.arn
  depends_on      = [aws_lambda_permission.check_kb_ingestion_job_statuses]
}

Note that the log group name is provided as a variable. The log group name should follow the default format provided by AWS, which is /aws/vendedlogs/bedrock/knowledge-base/APPLICATION_LOGS/, where is the Bedrock knowledge base ID.

Deploying and Testing the Solution

✅ You can find the complete Terraform configuration and source code in the 5_kb_data_ingestion_via_logs directory in this GitHub repository.

To deploy and test the solution, you need a knowledge base with at least one data source that has content to ingest either in an S3 bucket or a crawlable website. You can set this up in the Bedrock console using the vector database quick start options. Alternatively, deploy a sample knowledge base using the Terraform configuration from my blog post How To Manage an Amazon Bedrock Knowledge Base Using Terraform. This configuration is also available in the same GitHub repository under the 2_knowledge_base directory.

Additionally, you must also change the knowledge base’s logging configuration to deliver application logs to CloudWatch Logs. You can enable it either manually following the AWS documentation or using the Terraform configuration from my previous blog post Enabling Logging for Amazon Bedrock Knowledge Bases using Terraform. This configuration is also available in the same GitHub repository under the 4_kb_logging directory.

With the prerequisites in place, deploy the solution as follows:

  1. From the root of the cloned GitHub repository, navigate to 5_kb_data_ingestion_via_logs.

  2. Copy terraform.tfvars.example as terraform.tfvars and update the variables to match your configuration.

* By default, the `start-kb-ingestion-jobs` Lambda function runs daily at 0:00 UTC.
  1. Configure your AWS credentials.

  2. Run terraform init and terraform apply -var-file terraform.tfvars.

Once deployed, test the solution by adding an email subscription to the SNS topics check-kb-ingestion-job-statuses-success and check-kb-ingestion-job-statuses-failure for your e-mail address so that you can receive email notifications. Confirm your subscriptions using the link in the verification emails.

Adding an email subscription to the SNS topics

Next, manually invoke the start-kb-ingestion-jobs Lambda function in the Lambda console.

Invoking the start-kb-ingestion-jobs Lambda function manually

As the ingestion jobs run and complete, logs are written to CloudWatch Logs and pass through the subscription filter. The status change events should be filtered and sent to the Lambda function for notification, ultimately leading to the emails you’ll receive. Here’s an example:

Success email notification

Once you've verified the solution works, remove the SNS subscription and replace it with those that better fit your needs. If you don’t plan to keep the knowledge base, delete it along with the vector store (for example, the OSS index) to avoid unnecessary costs.

Summary

In this blog post, we improved the original Bedrock Knowledge Bases data ingestion solution with push-based notification using CloudWatch features. This is likely more efficient than a scheduled pull-based mechanism and allows us to leverage a Lambda subscription filter.

That being said, the ideal solution would be an EventBridge rule to react to native ingestion job events from Bedrock. The Bedrock service unfortunately does not publish such events today, but I’ve made a feature request via an AWS support case. Hopefully this will be supported soon and we can evolve our data ingestion solution further.

I hope you find this blog post helpful and engaging. Please feel free to check out my other blog posts in the Avangards Blog. Take care and happy learning!