Google Pub/Sub
Subscribes to a Google Cloud Pub/Sub topic and forwards each delivered message to the walkerOS collector. Ships two delivery models from the same package: a long-running streaming pull subscriber for container deployments, and an HTTP push webhook handler for serverless. Decoders for JSON, text, and raw payloads. Idempotent subscription provisioning with optional dead-letter wiring. Optional OIDC verification on the push handler.
The source ships inside @walkeros/server-source-gcp alongside the Cloud Functions handler; install the package once and import sourcePubSubPull or sourcePubSubPush.
Pub/Sub is a server source in the walkerOS flow:
Forwards each Pub/Sub message to the collector. Pull mode runs a streaming subscriber in long-lived containers; push mode receives HTTP envelopes in serverless deployments.
Installation
Choosing pull vs push
| Aspect | Pull (sourcePubSubPull) | Push (sourcePubSubPush) |
|---|---|---|
| Process model | Long-running streaming subscriber | Stateless HTTP request handler |
| Hosting | Cloud Run with min-instances >= 1, GKE, GCE, on-prem | Cloud Run scale-to-zero, Cloud Functions, Lambda |
| Idle cost | Pay for always-on compute | Zero when idle |
| Throughput | Highest (gRPC streaming) | Limited by HTTP overhead |
| Latency | Lowest (warm stream, single-digit ms) | Higher (HTTP per message, cold-start adds 100-500ms) |
| Auth direction | Subscriber holds credentials, connects outbound | Pub/Sub holds credentials, posts inbound (OIDC verify) |
| Subscription type | subscription.type = "pull" | subscription.type = "push" with pushConfig.pushEndpoint |
| Best fit | Long-running collector services, high-volume streams | Serverless walkerOS endpoints, hybrid setups |
Both modes share the same @walkeros/server-source-gcp package, the same auth resolution, the same decoders, and the same setup command. Pick the one that matches your hosting model.
Pull subscriber
The pull source is event-driven: init() opens the streaming subscription and forwards each delivered message to the collector via env.push. The source's push method is a deliberate no-op stub. There is no external invocation; the subscriber pushes events autonomously. destroy() closes the subscriber gracefully, honoring shutdownTimeoutMs (default 30000).
Pull settings
| Field | Type | Default | Description |
|---|---|---|---|
projectId | string | (required) | GCP project id. |
subscription | string | (required) | Subscription short name. |
topic | string | (optional) | Topic short name. Used by setup auto-create when setup.createTopic is on. |
credentials | string | ServiceAccountCredentials | (ADC fallback) | Service account JSON (parsed object or JSON string). |
apiEndpoint | string | (SDK default) | Override for the emulator (e.g. localhost:8085). |
decoder | 'json' | 'text' | 'raw' | 'json' | How the subscriber decodes message bodies before forwarding. |
flowControl | { maxMessages?, maxBytes? } | { maxMessages: 100, maxBytes: 10 MB } | Subscriber pull rate (in-flight cap). |
ackDeadline | number | 60 | Per-message ack window in seconds. |
shutdownTimeoutMs | number | 30000 | Graceful drain budget on destroy(). |
onPushError | 'nack' | 'ack' | 'nack' | Behavior when forwarding to the collector throws. |
Push webhook
Pub/Sub posts each message envelope to the configured endpoint as a JSON body matching the Pub/Sub push spec. The handler decodes the envelope, base64-decodes the data field, runs the configured decoder, and forwards to the collector. Status codes:
200: forwarded successfully.400: malformed envelope (missingmessage.data, invalid base64).401: OIDC verification failed (when enabled).500: collector push failed; Pub/Sub will retry per the subscription's retry policy.
Push settings
In addition to the shared base (projectId, credentials, apiEndpoint, decoder):
| Field | Type | Default | Description |
|---|---|---|---|
verifyOidc | boolean | false | When true, verifies the bearer token Pub/Sub attaches to push requests. |
audience | string | - | Required when verifyOidc is true. Must match your endpoint URL exactly. |
Authentication
Three modes for the pull subscriber, evaluated in order:
- Application Default Credentials (ADC). Default. Works on GCP-native runtimes (Cloud Run, Cloud Functions, GKE) and locally with
gcloud auth application-default login. - Service account JSON. Pass
settings.credentials, either parsed or as JSON string (the source JSON-parses strings). Combine with$env.NAMEto inject from an environment variable. - Pre-configured client. Pass an existing
PubSubSDK instance assettings.client.
For the push handler, auth is reversed: Pub/Sub holds the identity and signs requests with an OIDC token. The handler verifies the token against GCP public keys when verifyOidc: true and audience is set. Off by default because misconfigured OIDC silently rejects all messages, so opt in once your endpoint is publicly reachable.
Setup
Provision the subscription once per environment:
Idempotent. Defaults: 60-second ack deadline, project-default retention, no filter, no dead-letter policy. Drift on ackDeadlineSeconds, messageRetentionDuration, deadLetterPolicy, or filter emits WARN setup.drift {...} and never auto-mutates.
config.setup:
false(default): no provisioning.true: provisions the subscription against an existing topic with safe defaults.{ ackDeadlineSeconds, messageRetentionDuration, filter, deadLetterPolicy, retryPolicy, ... }: object form for explicit overrides.
Optional one-shot helpers:
setup.createTopic: true: auto-create the topic if it does not exist (requiressettings.topic). Useful when source and topic owners share a deployment.setup.deadLetterPolicy.createDeadLetterTopic: true: auto-create the dead-letter topic referenced indeadLetterPolicy.deadLetterTopic.
The provisioning identity needs pubsub.subscriptions.create (and pubsub.topics.create if auto-creating). The runtime pull identity needs roles/pubsub.subscriber on the subscription.
Topic provisioning is owned by the Pub/Sub destination when the topic is the producer's responsibility; the source can also auto-create as a convenience for self-contained setups.
Decoders
| Decoder | Behavior |
|---|---|
json (default) | JSON.parse(data.toString('utf8')) |
text | data.toString('utf8') (forwarded as-is) |
raw | The raw Buffer is forwarded unchanged |
A decoder that throws on malformed payloads triggers onPushError (pull mode): nack redelivers, ack drops with a debug log.
Emulator
Both pull and push honor PUBSUB_EMULATOR_HOST. For explicit configuration, set settings.apiEndpoint (e.g. localhost:8085).
Lifecycle
The pull source is the canonical long-running listener pattern in walkerOS. Existing server sources (express, lambda, cloudfunction, fetch) are request-handlers; pull is fundamentally different:
init()starts the streaming subscriber. It returns theSource.Instanceand the subscriber runs in the background.- The instance's
pushmethod is a no-op stub. The framework never calls it; the subscriber forwards events autonomously. destroy()closes the subscriber. On shutdown the source stops accepting new messages, drains in-flight ack/nacks, and force-closes aftershutdownTimeoutMs.
Troubleshooting
NOT_FOUNDon the subscription: the subscription does not exist. Runwalkeros setup source.<id>once or create it viagcloud pubsub subscriptions create.PERMISSION_DENIED/UNAUTHENTICATEDon the pull subscriber: the runtime service account lacksroles/pubsub.subscriberon the subscription.- Push handler returns 401 for every message: OIDC misconfiguration. Verify
settings.audiencematches your endpoint URL exactly, including scheme and path. - Messages decoded incorrectly: switch the decoder. JSON is the default; use
textfor plain-text payloads,rawfor binary.
Next steps
- Pub/Sub destination for publishing events to a Pub/Sub topic
- GCP source overview for the Cloud Functions HTTP handler