AWS SQS
Subscribes to an Amazon SQS queue and forwards each delivered message to the walkerOS collector. Long-running streaming subscriber backed by SQS long-poll. Decoders for JSON, text, and raw payloads. Idempotent queue provisioning with optional dead-letter wiring and SNS-topic subscription. EU-central-1 default.
The source ships inside @walkeros/server-source-aws alongside the Lambda handler; install the package once and import sourceSqs for SQS.
SQS is a server source in the walkerOS flow:
Pulls messages from an SQS queue at consumer pace via long-poll. Standard pattern: SNS topic fans out to one or more SQS queues; walkerOS pulls each queue independently.
Installation
Configuration
This source uses the standard source config wrapper (consent, data, env, id, ...). For the shared fields see source configuration. Package-specific fields live under config.settings and are listed below.
Settings
| Property | Type | Description | More |
|---|---|---|---|
queueName | string | SQS queue short name (like walkeros-events). Required for both setup and runtime poll. | |
region | string | AWS region (like eu-central-1). Default: eu-central-1. | |
queueUrl | string | Optional pre-resolved queue URL. When set, init skips the GetQueueUrl lookup. | |
client | any | Pre-configured AWS SQSClient instance. Bypasses construction when supplied. | |
config | any | AWS SDK SQSClientConfig (credentials, endpoint overrides, retries). | |
decoder | 'json' | 'text' | 'raw' | Decoder for the message body. json (default) parses JSON, text forwards UTF-8, raw forwards a Buffer. | |
maxMessages | integer | SQS receive batch size. Cap 10. Default: 10. | |
waitTimeSeconds | integer | Long-poll duration in seconds. Cap 20. Default: 20. | |
visibilityTimeout | integer | Per-receive visibility timeout override. Default: queue-configured value. | |
shutdownTimeoutMs | integer | Graceful shutdown timeout in milliseconds. Default: 30000. After this window, destroy() force-closes. | |
onPushError | 'nack' | 'ack' | Behavior when forwarding to the collector throws. nack (default) skips DeleteMessage so the message redelivers; ack drops it. |
Mapping
This package does not define custom rule-level settings. For the standard rule fields (consent, condition, data, batch, name, policy) see mapping.
Lifecycle
The SQS source follows the long-running listener pattern, same shape as the Pub/Sub pull source:
init()resolves the queue URL viaGetQueueUrl, captures the queue ARN, and starts the long-poll loop as a background task.- The instance's
push()method is a deliberate no-op stub for production. The framework never calls it; the loop forwards events autonomously. destroy()stops the loop, drains in-flight ack/nacks, and force-closes aftershutdownTimeoutMs(default 30000).
Authentication
The AWS SDK v3 picks up credentials in this order:
- Environment variables:
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_REGION. Easiest for local development. - Shared credentials file (
~/.aws/credentials) and config file (~/.aws/config). Standard CLI / SDK convention. - IAM role attached to the runtime (EC2 instance profile, ECS task role, EKS service account, Lambda execution role). Recommended for production.
- Pre-configured
SQSClient: pass an instance viasettings.clientfor full control.
The runtime identity needs sqs:ReceiveMessage, sqs:DeleteMessage, and sqs:GetQueueUrl on the target queue (and sqs:GetQueueAttributes for the ARN capture at init).
Setup
Provision the queue once per environment:
Idempotent and authoritative for declared fields. Re-running on an existing queue is a safe no-op when declared state matches; if it differs, declared state wins. Operator-managed extras (additional tags, additional policy SIDs) are left untouched.
config.setup:
false(default): no provisioning. Operator must run setup explicitly.true: provisions with safe defaults (eu-central-1, standard queue, default visibility timeout).- Object form: explicit overrides. See the schema in the Configuration block above.
Optional one-shot helpers:
setup.deadLetter: wire a dead-letter queue. Auto-creates the DLQ if it does not exist.setup.snsTopicArn: subscribe the queue to an SNS topic and apply the queue policy grantingsns:SendMessagefrom that topic.
The provisioning identity needs sqs:CreateQueue, sqs:GetQueueAttributes, sqs:GetQueueUrl, sqs:TagQueue, sqs:SetQueueAttributes, and sns:Subscribe (only when subscribing to an SNS topic).
Decoders
| Decoder | Behavior |
|---|---|
json (default) | JSON.parse(body). Throws on malformed JSON. |
text | The raw body string is forwarded as-is. |
raw | The body is wrapped in { data: { payload: body } }. |
A decoder that throws on malformed payloads triggers onPushError: nack lets visibility expire (SQS redelivers; eventually the DLQ if wired), ack deletes the message with a debug log.
Backpressure
SQS long-poll is naturally backpressured: each ReceiveMessage returns up to maxMessages (cap 10) and waits up to waitTimeSeconds (cap 20) for messages to arrive. The loop processes the batch sequentially and only issues the next ReceiveMessage after the previous batch is forwarded and acked. If the collector slows down, polling slows down with it. No tuning required for typical loads.
For higher throughput run multiple worker instances against the same queue; SQS distributes messages across consumers automatically.
Troubleshooting
QueueDoesNotExistat init: the queue name is correct but the queue is not provisioned. Runwalkeros setup source.<id>once.AccessDeniedonReceiveMessage: the runtime IAM identity lackssqs:ReceiveMessageon the queue. Grant the queue ARN explicitly.- Messages stuck in flight: a handler threw before deleting and visibility timeout has not yet expired. With
onPushError: 'nack'(default) this is intentional: SQS redelivers after the visibility window. TunevisibilityTimeoutto your handler's expected processing time. - Decoder throws on every message: the queue carries a different payload shape than declared. Switch the decoder (
text,raw) or wire a transformer that pre-processes the payload.
Next steps
- AWS SNS destination for the producer side. Standard pattern: SNS topic fans out to this SQS queue.
- GCP Pub/Sub source for the GCP analog (streaming pull subscriber).
- AWS Lambda source for invocation-triggered ingestion.