Ingesting Information from S3 by Utilizing BatchPutMessage, AWS Lambda, and Amazon Kinesis


AWS IoT Analytics is a completely managed service that you should utilize to question and generate insights about your IoT knowledge. You would possibly need to use AWS IoT Analytics on knowledge that isn’t being despatched to the AWS IoT Core message dealer. Through the use of the AWS IoT Analytics BatchPutMessage API, you’ll be able to ingest knowledge instantly into AWS IoT Analytics from different knowledge sources. This weblog submit demonstrates the right way to use the BatchPutMessage API to add knowledge saved in Amazon S3 to AWS IoT Analytics. We’ll first stroll by way of some easy command-line examples. Then we’ll see the right way to use AWS Lambda and Amazon Kinesis to ingest knowledge recordsdata in an S3 bucket.

To comply with alongside, you’ll want to put in the AWS CLI. Notice that you could be want to put in the AWS CLI by way of pip3 as an alternative of pip to set up an up-to-date consumer that helps iotanalytics. Additionally, the steps on this submit had been written utilizing bash on macOS. In the event you use a special command-line interface, equivalent to Home windows PowerShell, you’ll want to regulate the instructions accordingly.

Earlier than we start, listed here are some vital AWS IoT Analytics ideas:
Channels ingest knowledge, again it up, and publish it to a number of pipelines.
Pipelines ingest knowledge from a channel and can help you course of the information by way of actions earlier than storing it in a knowledge retailer.
Information shops retailer knowledge. They’re scalable and queryable.
Datasets retrieve knowledge from a datastore. They’re the results of some SQL question run in opposition to the information retailer.

Let’s stroll by way of a easy instance that demonstrates these ideas in motion. We’ll create a channel, pipeline, knowledge retailer, and dataset. Then we’ll ship knowledge to AWS IoT Analytics by way of BatchPutMessage and question for that knowledge in our dataset.

Arrange AWS IoT Analytics

First, we’ll create the information retailer and channel.

aws iotanalytics create-datastore --datastore-name bpm_blog_datastore
aws iotanalytics create-channel --channel-name bpm_blog_channel

To create the pipeline, we’ll specify the pipeline configuration in a JSON file and cross the file to the create-pipeline command.

Our pipeline will likely be quite simple as a result of we’re not processing the information in any means. We’re simply ingesting the information from a channel and passing it to a knowledge retailer. (That is the “Sink” exercise.) Save this JSON to a file named pipeline_config.json.


Now cross pipeline_config.json to create-pipeline.

aws iotanalytics create-pipeline --cli-input-json file://pipeline_config.json

Ship BatchPutMessage

Now we’ll use the CLI to ship our BatchPutMessage request. On this instance, we’ll specify some temperature knowledge. Save the next to a file named batchPutMessage.json. It comprises the 2 issues a BatchPutMessage request requires: the identify of the channel the place we’re sending messages and a number of messages. A message comprises the information we’re importing and an ID that identifies the message. The messageId should be distinctive relative to the opposite messages within the BatchPutMessage request. The “batch” in BatchPutMessage is the power to ship a number of messages at a time, as much as 1,000 complete messages per second per account.

         "payload":"{"temp": 10}"
         "payload":"{"temp": 50}"

Ship the BatchPutMessage request.

aws iotanalytics batch-put-message --cli-input-json file://batchPutMessage.json

If the command is profitable, the CLI will return the next response:

"batchPutMessageErrorEntries": []

Question knowledge

We are able to now question the information again from our knowledge retailer. First, we’ll create a dataset that represents the output of a “choose temp from bpm_blog_datastore” question. Save the next JSON to a file named dataset_config.json.

            "sqlQuery":"select temp from bpm_blog_datastore"

Now cross the JSON file as enter to the create-dataset command.

aws iotanalytics create-dataset --cli-input-json file://dataset_config.json

Creating the dataset is not going to execute our question. We have to run create-dataset-content.

aws iotanalytics create-dataset-content --dataset-name bpm_blog_dataset

Fetch the question consequence with the get-dataset-content command. If the standing is “CREATING,” the question has not completed executing. Wait a second and take a look at once more.

aws iotanalytics get-dataset-content --dataset-name bpm_blog_dataset --version-id '$LATEST'

After the question has been executed, the response will comprise a hyperlink. Visiting that hyperlink in our browser will obtain the results of our question.

   "standing": {
      "state": "SUCCEEDED"

For the needs of this weblog submit, we have now generated and uploaded some knowledge, in .csv format, to the aws-iot-blog-assets bucket. The information is split into 5 folders, every with 20 recordsdata. The next is a JSON illustration of 1 datapoint in example_data_part_2.csv.

   "timestamp":"2018-04-18 19:04:35"

Launch knowledge ingestion template

We’ll ingest the information saved within the S3 bucket into AWS IoT Analytics through the use of two Lambda features and a Kinesis stream. One Lambda operate, “the launcher”, will iterate by way of our bucket and add every key to the stream. For every key ingested by the stream, a replica of the second Lambda operate will likely be invoked. That second Lambda operate, “the employee”, will obtain the information positioned at that S3 key and ship BatchPutMessage requests containing the information. If it encounters an error whereas doing so, will probably be invoked once more.

Deployment packages comprise the code for the Lambda features. We use deployment packages as a result of they permit us to add dependencies together with the the code. The operate definitions are displayed under.

Launcher Lambda:

import boto3
import json

from ratelimiter import RateLimiter
from break up import chop


def lambda_handler(occasion, context):
    bucket = occasion["bucket"]
    channel_name = occasion["channel_name"]
    stream_name = occasion["stream_name"]

    s3_client = boto3.consumer("s3")
    kinesis_client = boto3.consumer("kinesis")    
    total_jobs = 0
    paginator = s3_client.get_paginator("list_objects_v2")
    page_iterator = paginator.paginate(Bucket=bucket, Prefix="IngestingDatafromS3byUsingBatchPutMessageAWSLambdaAmazonKinesis/knowledge/")
    for web page in page_iterator:
        jobs = [{"key": object["Key"], "channel_name": channel_name, "bucket": bucket}
            for object in web page["Contents"]]
        for request_jobs in chop(MAX_RECORDS_PER_REQUEST, jobs):
            information = [{"Data": json.dumps(request_job), "PartitionKey": request_job["key"]} for request_job in request_jobs]
            put_records(kinesis_client, stream_name, information)
        total_jobs += len(jobs)
    return "{} keys despatched into {}".format(total_jobs, stream_name)

# 1 kinesis shard can ingest at most 1000 information per second
# we ratelimit to make sure we don't go over that price
@RateLimiter(max_calls= MAX_REQUESTS_PER_SECOND, interval=1)
def put_records(kinesis_client, stream_name, information):
    kinesis_client.put_records(StreamName=stream_name, Data=information)

Employee Lambda:

import base64
# as of 5/11/18, the model of boto3 utilized by lambda doesn't help iotanalytics
# so we included the latest model of boto3 within the deployment bundle
import boto3
import csv
import json

from io import StringIO
from ratelimiter import RateLimiter
from break up import chop


def lambda_handler(occasion, context):
    # we are going to solely recieve 1 occasion as a result of the set off BatchSize is 1 (set by way of the CloudFormation template)
    document = occasion["Records"][0]
    job_input = json.hundreds(base64.b64decode(document["kinesis"]["data"]))
    key = job_input["key"]
    bucket = job_input["bucket"]
    channel_name = job_input["channel_name"]
    print("Job Enter - Key: {} Bucket: {} Channel Title: {}".format(key, bucket, channel_name))

    s3_client = boto3.consumer("s3")
    file_contents = s3_client.get_object(Bucket=bucket, Key=key)["Body"].learn().decode("utf-8") 
    serialized_rows = serialize_rows(file_contents)
    messages = generate_messages(serialized_rows)

    num_requests = 0
    iot_analytics_client = boto3.consumer("iotanalytics")
    for messages_batch in chop(MESSAGES_PER_REQUEST, messages):
        send_batch_put_message(iot_analytics_client, channel_name, listing(messages_batch))
        num_requests += 1
    return "{} batchPutMessage requests despatched for {}".format(num_requests, key)

# batchPutMessage can obtain at most 1000 messages per second per account
# so we ratelimit to make sure we don't ship greater than that
# if you happen to allowed for concurrent employee invocations then you definately would wish to
# divide this worth by the max variety of concurrent staff
@RateLimiter(max_calls= MAX_REQUESTS_PER_SECOND, interval=1)
def send_batch_put_message(iot_analytics_client, channel_name, messages_batch):
    iot_analytics_client.batch_put_message(channelName=channel_name, messages=messages_batch)
def serialize_rows(file_contents):
    reader = csv.DictReader(StringIO(file_contents))
    return (row for row in reader)
def generate_messages(serialized_rows):
    for messageId, row in enumerate(serialized_rows):
        yield {"payload": json.dumps(row), "messageId": str(messageId)}

The next Launch Stack button goes to an AWS CloudFormation template that describes the Lambda features and Kinesis stream. It additionally describes IAM insurance policies and roles that allow the Lambda features to do the next:

  • Learn and listing objects from S3 buckets.
  • Ship knowledge into the Kinesis stream.
  • Be triggered by knowledge ingestion into the Kinesis stream.
  • Ship BatchPutMessage requests.
  • Retailer logs.

Simply click on Launch Stack under to launch the template. The stack will likely be deployed to the us-east-1 area. You do not want to specify values for the choices offered. As a substitute, select Subsequent 3 times. Then choose the I acknowledge that AWS CloudFormation would possibly create IAM sources examine field and click on Create. You might need to refresh to see the brand new AWS CloudFormation stack.

When the providers have been fully arrange, the standing of the stack will change to CREATE_COMPLETE. Choose the stack after which select the Outputs tab. Notice the names of the launcher Lambda operate and Kinesis stream.

Invoke the launcher Lambda operate with a payload that specifies the bucket it’ll iterate by way of, the identify of the Kinesis stream it’ll ship the keys to, and the AWS IoT Analytics channel the information will likely be despatched to.
Save the payload to file referred to as lambdaPayload.json.


Invoke the launcher Lambda operate.

aws lambda invoke --function-name EXAMPLE_FUNCTION_NAME --payload file://lambdaPayload.json --region us-east-1 --cli-binary-format raw-in-base64-out lambdaOutput.txt

You should utilize the AWS Lambda console to observe the state of the Lambda features. Click on the Launcher operate after which select the Monitoring tab. From there, you’ll be able to, for instance, see a graph of the variety of invocations over time. You can too view hyperlinks to logs for every operate. You’ll know the information ingestion course of is full when the the employee has stopped being invoked. For the information utilized by this weblog, the method could take about quarter-hour.

Validating knowledge

To validate the information ingested by AWS IoT Analytics, we will create datasets with queries we all know the proper reply to. For instance, within the this dataset, we all know there have been two areas the place knowledge was collected and 56,570 complete information. We are able to create a dataset that queries for these values.

Save the next to a file named validation_dataset_config.json.

            "sqlQuery":"SELECT count(DISTINCT location), count(DISTINCT rowid) from bpm_blog_datastore"

Execute the next instructions to confirm that they report the anticipated results of 2 and 56570.

aws iotanalytics create-dataset --cli-input-json file://validation_dataset_config.json
aws iotanalytics create-dataset-content --dataset-name bpm_blog_validation_dataset
aws iotanalytics get-dataset-content --dataset-name bpm_blog_validation_dataset --version-id '$LATEST'

We are able to additionally question for the information at a particular row. Save the next to validation_dataset_config2.json

            "sqlQuery":"select * from bpm_blog_datastore where rowid='575'"

Then execute these instructions.

aws iotanalytics create-dataset --cli-input-json file://validation_dataset_config2.json
aws iotanalytics create-dataset-content --dataset-name bpm_blog_validation_dataset2
aws iotanalytics get-dataset-content --dataset-name bpm_blog_validation_dataset2 --version-id '$LATEST'

The consequence ought to correspond to the row with the chosen rowid from the 1/example_data_part_2.csv excerpt proven right here.

Notes about this method and its options

The method described on this submit just isn’t idempotent. That’s, if you happen to run both Lambda operate with the identical enter a number of occasions, your knowledge retailer is not going to be in the identical finish state every time. A number of BatchPutMessage requests could be despatched for every row within the .csv file. As a result of knowledge shops don’t impose uniqueness constraints on keys, a number of copies of the information for every key could be saved within the knowledge retailer. Idempotency is related even when you don’t intend to rerun a Lambda a number of occasions with the identical enter as a result of it’s doable for the Lambda operate to fail and be invoked once more.

Nevertheless, writing duplicate knowledge to our knowledge retailer is okay so long as we filter it out once we create our dataset. We are able to simply specify that we wish distinct outcomes and embody the rowid key as one of many chosen objects. In consequence, every row from every .csv file could be included solely as soon as. For instance, a question counting complete information would seem like this:


You may cut back duplicate key processing by storing the already processed keys in a database and checking the database earlier than processing a key. That may end in much less house utilization for the information shops, sooner dataset creation, and presumably sooner runtime.

This method will run one employee Lambda operate at a time. You may enhance the processing velocity by permitting the employee Lambda features to run concurrently. To do this you would wish to extend the variety of shards utilized by the Kinesis stream as a result of you’ll be able to solely invoke one Lambda operate at a time per shard. You may enhance the variety of shards by modifying the ShardCount worth outlined within the AWS CloudFormation template. You’d additionally want to extend the variety of most allowed concurrent invocations. It’s set by the ReservedConcurrentExecutions worth within the AWS CloudFormation template. Lastly, you would wish to divide the MAX_REQUESTS_PER_SECOND worth within the employee Lambda operate by the worth you assigned to ReservedConcurrentExecutions.

To launch an altered model of the AWS CloudFormation template, you would wish to obtain it, make your changes, go to the CloudFormation Console, click on Create Stack, click on Select File, and specify your native copy of the template. To vary one of many AWS Lambda features, you would wish to add a Deployment bundle containing your required code to a public Amazon S3 folder. You’d then want to alter the S3Bucket and S3Key values within the template to level to that deployment bundle.

Lambda features execute for at most 5 minutes. If that weren’t sufficient time to iterate by way of the entire keys within the bucket, then this method wouldn’t add knowledge for the unvisited keys. Nevertheless, you might run the launcher code on an area machine. You may additionally invoke the launcher Lambda operate a number of occasions concurrently on completely different folders within the bucket. These invocations could possibly be created by a 3rd Lambda operate. Alternatively, you might restart the launcher Lambda operate by way of AWS Step Capabilities till it had iterated by way of the entire keys.

Lastly, Lambda features have a disk capability of 512 MB and at most 3 GB of reminiscence, so this method is not going to work to be used instances that require processing giant recordsdata.

In the event you can’t work inside these limitations, then it’s best to use AWS Glue, an ETL service that runs in a managed setting. You would wish to edit the script it generated to have it ship BatchPutMessage requests.


AWS IoT Analytics lets you enrich and question IoT knowledge. Through the use of the BatchPutMessage API, you’ll be able to ingest IoT knowledge into AWS IoT Analytics with out first ingesting the information into AWS IoT Core. The template offered on this weblog submit submits BatchPutMessage requests for knowledge saved in S3 through the use of two AWS Lambda features and an Amazon Kinesis stream. You’ll be able to validate the ingested knowledge by querying the information by way of dataset creation. Please learn the AWS IoT Analytics Consumer Information or different weblog posts for extra details about AWS IoT Analytics.