Retrieval Augmented Generation (RAG) enhances AI responses by combining the generative AI model’s capabilities with information from external data sources, rather than relying solely on the model’s built-in knowledge. In this post, we showcase the custom data connector capability in Amazon Bedrock Knowledge Bases that makes it straightforward to build RAG workflows with custom input data. Through this capability, Amazon Bedrock Knowledge Bases supports the ingestion of streaming data, which means developers can add, update, or delete data in their knowledge base through direct API calls.
Think of the examples of clickstream data, credit card swipes, Internet of Things (IoT) sensor data, log analysis and commodity prices—where both current data and historical trends are important to make a learned decision. Previously, to feed such critical data inputs, you had to first stage it in a supported data source and then either initiate or schedule a data sync job. Based on the quality and quantity of the data, the time to complete this process varied. With custom data connectors, you can quickly ingest specific documents from custom data sources without requiring a full sync and ingest streaming data without the need for intermediary storage. By avoiding time-consuming full syncs and storage steps, you gain faster access to data, reduced latency, and improved application performance.
However, with streaming ingestion using custom connectors, Amazon Bedrock Knowledge Bases processes such streaming data without using an intermediary data source, making it available almost immediately. This feature chunks and converts input data into embeddings using your chosen Amazon Bedrock model and stores everything in the backend vector database. This automation applies to both newly created and existing databases, streamlining your workflow so you can focus on building AI applications without worrying about orchestrating data chunking, embeddings generation, or vector store provisioning and indexing. Additionally, this feature provides the ability to ingest specific documents from custom data sources, all while reducing latency and alleviating operational costs for intermediary storage.
Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as Anthropic, Cohere, Meta, Stability AI, and Amazon through a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI. Using Amazon Bedrock, you can experiment with and evaluate top FMs for your use case, privately customize them with your data using techniques such as fine-tuning and RAG, and build agents that execute tasks using your enterprise systems and data sources.
Amazon Bedrock Knowledge Bases allows organizations to build fully managed RAG pipelines by augmenting contextual information from private data sources to deliver more relevant, accurate, and customized responses. With Amazon Bedrock Knowledge Bases, you can build applications that are enriched by the context that is received from querying a knowledge base. It enables a faster time to product release by abstracting from the heavy lifting of building pipelines and providing you an out-of-the-box RAG solution, thus reducing the build time for your application.
Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, which means you can add, update, or delete data in your knowledge base through direct API calls.
For this post, we implement a RAG architecture with Amazon Bedrock Knowledge Bases using a custom connector and topics built with Amazon Managed Streaming for Apache Kafka (Amazon MSK) for a user who may be interested to understand stock price trends. Amazon MSK is a streaming data service that manages Apache Kafka infrastructure and operations, making it straightforward to run Apache Kafka applications on Amazon Web Services (AWS). The solution enables real-time analysis of customer feedback through vector embeddings and large language models (LLMs).
The following architecture diagram has two components:
Preprocessing streaming data workflow noted in letters on the top of the diagram:
Runtime execution during user queries noted in numerals at the bottom of the diagram:
The implementation follows these high-level steps:
To build a generative AI stock analysis tool with Amazon Bedrock Knowledge Bases custom connector, use instructions in the following sections.
To try this architecture, deploy the AWS CloudFormation template from this GitHub repository in your AWS account. This template deploys the following components:
In the precreated MSK cluster, the required brokers are deployed ready for use. The next step is to use a SageMaker Studio terminal instance to connect to the MSK cluster and create the test stream topic. In this step, you follow the detailed instructions that are mentioned at Create a topic in the Amazon MSK cluster. The following are the general steps involved:
To create a knowledge base in Amazon Bedrock, follow these steps:
BedrockStreamIngestKnowledgeBase as the Knowledge Base name.BedrockStreamIngestKBCustomDS as the Data source name.Now, using API calls, you configure the consumer Lambda function so it gets triggered as soon as the input Apache Kafka topic receives data.
response = lambda_client.update_function_configuration(
FunctionName=<Consumer Lambda Function Name>,
Environment={
'Variables': {
'KBID': <Knowledge Base ID>,
'DSID': <Data Source ID>
}
}
)
response = lambda_client.create_event_source_mapping(
EventSourceArn=<MSK Cluster’s ARN>,
FunctionName=<Consumer Lambda Function Name>,
StartingPosition='LATEST',
Enabled=True,
Topics=['streamtopic']
)
The Apache Kafka consumer Lambda function reads data from the Apache Kafka topic, decodes it, extracts stock price information, and ingests it into the Amazon Bedrock knowledge base using the custom connector.
kb_id = os.environ['KBID']
ds_id = os.environ['DSID']
def decode_payload(event_data):
agg_data_bytes = base64.b64decode(event_data)
decoded_data = agg_data_bytes.decode(encoding="utf-8")
event_payload = json.loads(decoded_data)
return event_payload
records = event['records']['streamtopic-0']
for rec in records:
# Each record has separate eventID, etc.
event_payload = decode_payload(rec['value'])
ticker = event_payload['ticker']
price = event_payload['price']
timestamp = event_payload['timestamp']
myuuid = uuid.uuid4()
payload_ts = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
payload_string = "At " + payload_ts + " the price of " + ticker + " is " + str(price) + "."
response = bedrock_agent_client.ingest_knowledge_base_documents(
knowledgeBaseId = kb_id,
dataSourceId = ds_id,
documents= [
{
'content': {
'custom' : {
'customDocumentIdentifier': {
'id' : str(myuuid)
},
'inlineContent' : {
'textContent' : {
'data' : payload_string
},
'type' : 'TEXT'
},
'sourceType' : 'IN_LINE'
},
'dataSourceType' : 'CUSTOM'
}
}
]
)
Now that the required setup is done, you trigger the workflow by ingesting test data into your Apache Kafka topic hosted with the MSK cluster. For best results, repeat this section by changing the .csv input file to show stock price increase or decrease.
| ticker | price |
| OOOO | $44.50 |
| ZVZZT | $3,413.23 |
| ZNTRX | $22.34 |
| ZNRXX | $208.76 |
| NTEST | $0.45 |
| ZBZX | $36.23 |
| ZEXIT | $942.34 |
| ZIEXT | $870.23 |
| ZTEST | $23.75 |
| ZVV | $2,802.86 |
| ZXIET | $63.00 |
| ZAZZT | $18.86 |
| ZBZZT | $998.26 |
| ZCZZT | $72.34 |
| ZVZZC | $90.32 |
| ZWZZT | $698.24 |
| ZXZZT | $932.32 |
def put_to_topic(kafka_host, topic_name, ticker, amount, timestamp):
client = KafkaClient(hosts = kafka_host)
topic = client.topics[topic_name]
payload = {
'ticker': ticker,
'price': amount,
'timestamp': timestamp
}
ret_status = True
data = json.dumps(payload)
encoded_message = data.encode("utf-8")
print(f'Sending ticker data: {ticker}...')
with topic.get_sync_producer() as producer:
result=producer.produce(encoded_message)
return ret_status
df = pd.read_csv('TestData.csv')
start_test_time = time.time()
print(datetime.utcfromtimestamp(start_test_time).strftime('%Y-%m-%d %H:%M:%S'))
df = df.reset_index()
for index, row in df.iterrows():
put_to_topic(BootstrapBrokerString, KafkaTopic, row['ticker'], row['price'], time.time())
end_test_time = time.time()
print(datetime.utcfromtimestamp(end_test_time).strftime('%Y-%m-%d %H:%M:%S'))
If the data ingestion and subsequent processing is successful, navigate to the Amazon Bedrock Knowledge Bases data source page to check the uploaded information.
Within the Amazon Bedrock Knowledge Bases console, you have access to query the ingested data immediately, as shown in the following screenshot.
To do that, select an Amazon Bedrock FM that you have access to. In my case, I chose Amazon Nova Lite 1.0, as shown in the following screenshot.
When it’s completed, the question, “How is ZVZZT trending?”, yields the results based on the ingested data. Note how Amazon Bedrock Knowledge Bases shows how it derived the answer, even pointing to the granular data element from its source.
To make sure you’re not paying for resources, delete and clean up the resources created.
In this post, we showed you how Amazon Bedrock Knowledge Bases supports custom connectors and the ingestion of streaming data, through which developers can add, update, or delete data in their knowledge base through direct API calls. Amazon Bedrock Knowledge Bases offers fully managed, end-to-end RAG workflows to create highly accurate, low-latency, secure, and custom generative AI applications by incorporating contextual information from your company’s data sources. With this capability, you can quickly ingest specific documents from custom data sources without requiring a full sync, and ingest streaming data without the need for intermediary storage.
Send feedback to AWS re:Post for Amazon Bedrock or through your usual AWS contacts, and engage with the generative AI builder community at community.aws.
Prabhakar Chandrasekaran is a Senior Technical Account Manager with AWS Enterprise Support. Prabhakar enjoys helping customers build cutting-edge AI/ML solutions on the cloud. He also works with enterprise customers providing proactive guidance and operational assistance, helping them improve the value of their solutions when using AWS. Prabhakar holds eight AWS and seven other professional certifications. With over 22 years of professional experience, Prabhakar was a data engineer and a program leader in the financial services space prior to joining AWS.
Manuel Rioux est fièrement propulsé par WordPress