EventBridge Pipes

Get started with EventBridge Pipes on LocalStack

Introduction

EventBridge Pipes allows users to create point-to-point integrations between event producers and consumers with transform, filter and enrichment steps. Pipes are particularly useful for scenarios involving real-time data processing, application integration, and automated workflows, while simplifying the process of routing events between AWS services. Pipes offer a point-to-point connection from one source to one target (one-to-one). In contrast, EventBridge Event Bus offers a one-to-many integration where an event router delivers one event to zero or more destinations.

LocalStack allows you to use the Pipes APIs in your local environment to create Pipes with SQS queues and Kinesis streams as source and target. You can also filter events using EventBridge event patterns and enrich events using Lambda.

The supported APIs are available on our API coverage page, which provides information on the extent of Pipe’s integration with LocalStack.

Getting started

This guide is designed for users new to EventBridge Pipes and assumes basic knowledge of the AWS CLI and our awslocal wrapper script.

Start your LocalStack container using your preferred method. We will demonstrate how to create a Pipe with SQS queues as source and target, and send events to the source queue which will be routed to the target queue.

Create an SQS queue

Create two SQS queues that will be used as source and target for the Pipe. Run the following command to create a queue using the CreateQueue API:

$ awslocal sqs create-queue --queue-name source-queue
$ awslocal sqs create-queue --queue-name target-queue

You can fetch their queue ARNs using the GetQueueAttributes API:

$ SOURCE_QUEUE_ARN=$(awslocal sqs get-queue-attributes --queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/source-queue --attribute-names QueueArn --output text)
$ TARGET_QUEUE_ARN=$(awslocal sqs get-queue-attributes --queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/target-queue --attribute-names QueueArn --output text)

Create a Pipe

You can now create a Pipe, using the CreatePipe API. Run the following command, by specifying the source and target queue ARNs we created earlier:

$ awslocal pipes create-pipe --name sample-pipe \
        --source $SOURCE_QUEUE_ARN \
        --target $TARGET_QUEUE_ARN \
        --role-arn arn:aws:iam::000000000000:role/pipes-role

The following output would be retrieved:

{
    "Arn": "arn:aws:pipes:us-east-1:000000000000:pipe/sample-pipe",
    "CreationTime": "2024-01-26T11:55:27.069088+05:30",
    "CurrentState": "CREATING",
    "DesiredState": "RUNNING",
    "LastModifiedTime": "2024-01-26T11:55:27.069088+05:30",
    "Name": "sample-pipe"
}

Describe the Pipe

You can use the DescribePipe API to get information about the Pipe:

$ awslocal pipes describe-pipe --name sample-pipe

The following output would be retrieved:

{
    "Arn": "arn:aws:pipes:us-east-1:000000000000:pipe/sample-pipe",
    "CreationTime": "2024-01-26T11:55:27.069088+05:30",
    "CurrentState": "RUNNING",
    "DesiredState": "RUNNING",
    "EnrichmentParameters": {},
    "LastModifiedTime": "2024-01-26T11:55:27.069088+05:30",
    "Name": "sample-pipe",
    "RoleArn": "arn:aws:iam::000000000000:role/pipe-role",
    "Source": "arn:aws:sqs:us-east-1:000000000000:source-queue",
    "SourceParameters": {
        "SqsQueueParameters": {
            "BatchSize": 10
        }
    },
    "StateReason": "USER_INITIATED",
    "Tags": {},
    "Target": "arn:aws:sqs:us-east-1:000000000000:target-queue",
    "TargetParameters": {}
}

Send events to the source queue

You can now send events to the source queue, which will be routed to the target queue. Run the following command to send an event to the source queue:

$ awslocal sqs send-message \
    --queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/source-queue \
    --message-body "message-1"

Receive events from the target queue

You can fetch the message from the target queue using the ReceiveMessage API:

$ awslocal sqs receive-message \
    --queue-url http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/target-queue

Supported sources

LocalStack supports the following sources for Pipes:

  • Amazon DynamoDB stream
  • Amazon Kinesis stream
  • Amazon SQS queue

Please create a feature request on GitHub if you miss support for Amazon MQ broker, Amazon MSK stream, or Apache Kafka stream.

Supported enrichments

LocalStack supports the following enrichments for Pipes:

  • Lambda function

Please create a feature request on GitHub if you miss support for API destination, Amazon API Gateway, or Step Functions state machine

Supported targets

LocalStack supports the following targets for Pipes:

  • EventBride bus
  • Kinesis stream
  • Lambda function (SYNC or ASYNC)
  • Amazon SNS topic
  • Amazon SQS queue
  • Step Functions state machine
    • Standard workflows (ASYNC)

Please create a feature request on GitHub if you miss support for API destination, API Gateway, Batch job queue, CloudWatch log group, ECS task, Firehose delivery stream, Inspector assessment template, Redshift cluster data API queries, SageMaker Pipeline, or Step Functions state machine: Express workflows (SYNC or ASYNC).

Supported log destinations

LocalStack supports the following log destinations for detailed Pipes logging:

  • CloudWatch Logs

Please create a feature request on GitHub if you miss support for Firehose stream logs, or Amazon S3 logs.

Current Limitations

The EventBridge Pipes implementation in LocalStack is currently in preview stage and has the following limitations:

  • Lack of input transformers.
  • Lack of concurrency support (i.e., ParallelizationFactor), resulting in slower processing in high-throughput scenarios.
  • Lack of lifecycle management for pipe states (i.e., missing tests for state transitions).
  • Lack of re-sharding support when polling from Kinesis and DynamoDB streams.
  • Batch handling behavior may have parity issues (e.g., batch flushing rules by size, length, time, etc. are not implemented).