Skip to main content

AWS SQS

Server Source code Package

Subscribes to an Amazon SQS queue and forwards each delivered message to the walkerOS collector. Long-running streaming subscriber backed by SQS long-poll. Decoders for JSON, text, and raw payloads. Idempotent queue provisioning with optional dead-letter wiring and SNS-topic subscription. EU-central-1 default.

The source ships inside @walkeros/server-source-aws alongside the Lambda handler; install the package once and import sourceSqs for SQS.

Where this fits

SQS is a server source in the walkerOS flow:

Pulls messages from an SQS queue at consumer pace via long-poll. Standard pattern: SNS topic fans out to one or more SQS queues; walkerOS pulls each queue independently.

Installation

Loading...
Loading...

Configuration

This source uses the standard source config wrapper (consent, data, env, id, ...). For the shared fields see source configuration. Package-specific fields live under config.settings and are listed below.

Settings

PropertyTypeDescriptionMore
queueName*stringSQS queue short name (like walkeros-events). Required for both setup and runtime poll.
regionstringAWS region (like eu-central-1). Default: eu-central-1.
queueUrlstringOptional pre-resolved queue URL. When set, init skips the GetQueueUrl lookup.
clientanyPre-configured AWS SQSClient instance. Bypasses construction when supplied.
configanyAWS SDK SQSClientConfig (credentials, endpoint overrides, retries).
decoder'json' | 'text' | 'raw'Decoder for the message body. json (default) parses JSON, text forwards UTF-8, raw forwards a Buffer.
maxMessagesintegerSQS receive batch size. Cap 10. Default: 10.
waitTimeSecondsintegerLong-poll duration in seconds. Cap 20. Default: 20.
visibilityTimeoutintegerPer-receive visibility timeout override. Default: queue-configured value.
shutdownTimeoutMsintegerGraceful shutdown timeout in milliseconds. Default: 30000. After this window, destroy() force-closes.
onPushError'nack' | 'ack'Behavior when forwarding to the collector throws. nack (default) skips DeleteMessage so the message redelivers; ack drops it.
* Required fields

Mapping

This package does not define custom rule-level settings. For the standard rule fields (consent, condition, data, batch, name, policy) see mapping.

Lifecycle

The SQS source follows the long-running listener pattern, same shape as the Pub/Sub pull source:

  1. init() resolves the queue URL via GetQueueUrl, captures the queue ARN, and starts the long-poll loop as a background task.
  2. The instance's push() method is a deliberate no-op stub for production. The framework never calls it; the loop forwards events autonomously.
  3. destroy() stops the loop, drains in-flight ack/nacks, and force-closes after shutdownTimeoutMs (default 30000).

Authentication

The AWS SDK v3 picks up credentials in this order:

  1. Environment variables: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION. Easiest for local development.
  2. Shared credentials file (~/.aws/credentials) and config file (~/.aws/config). Standard CLI / SDK convention.
  3. IAM role attached to the runtime (EC2 instance profile, ECS task role, EKS service account, Lambda execution role). Recommended for production.
  4. Pre-configured SQSClient: pass an instance via settings.client for full control.

The runtime identity needs sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueUrl on the target queue (and sqs:GetQueueAttributes for the ARN capture at init).

Setup

Provision the queue once per environment:

Loading...

Idempotent and authoritative for declared fields. Re-running on an existing queue is a safe no-op when declared state matches; if it differs, declared state wins. Operator-managed extras (additional tags, additional policy SIDs) are left untouched.

config.setup:

  • false (default): no provisioning. Operator must run setup explicitly.
  • true: provisions with safe defaults (eu-central-1, standard queue, default visibility timeout).
  • Object form: explicit overrides. See the schema in the Configuration block above.

Optional one-shot helpers:

  • setup.deadLetter: wire a dead-letter queue. Auto-creates the DLQ if it does not exist.
  • setup.snsTopicArn: subscribe the queue to an SNS topic and apply the queue policy granting sns:SendMessage from that topic.

The provisioning identity needs sqs:CreateQueue, sqs:GetQueueAttributes, sqs:GetQueueUrl, sqs:TagQueue, sqs:SetQueueAttributes, and sns:Subscribe (only when subscribing to an SNS topic).

Decoders

DecoderBehavior
json (default)JSON.parse(body). Throws on malformed JSON.
textThe raw body string is forwarded as-is.
rawThe body is wrapped in { data: { payload: body } }.

A decoder that throws on malformed payloads triggers onPushError: nack lets visibility expire (SQS redelivers; eventually the DLQ if wired), ack deletes the message with a debug log.

Backpressure

SQS long-poll is naturally backpressured: each ReceiveMessage returns up to maxMessages (cap 10) and waits up to waitTimeSeconds (cap 20) for messages to arrive. The loop processes the batch sequentially and only issues the next ReceiveMessage after the previous batch is forwarded and acked. If the collector slows down, polling slows down with it. No tuning required for typical loads.

For higher throughput run multiple worker instances against the same queue; SQS distributes messages across consumers automatically.

Troubleshooting

  • QueueDoesNotExist at init: the queue name is correct but the queue is not provisioned. Run walkeros setup source.<id> once.
  • AccessDenied on ReceiveMessage: the runtime IAM identity lacks sqs:ReceiveMessage on the queue. Grant the queue ARN explicitly.
  • Messages stuck in flight: a handler threw before deleting and visibility timeout has not yet expired. With onPushError: 'nack' (default) this is intentional: SQS redelivers after the visibility window. Tune visibilityTimeout to your handler's expected processing time.
  • Decoder throws on every message: the queue carries a different payload shape than declared. Switch the decoder (text, raw) or wire a transformer that pre-processes the payload.

Next steps

💡 Need implementation support?
elbwalker offers hands-on support: setup review, measurement planning, destination mapping, and live troubleshooting. Book a 2-hour session (€399)