Skip to main content

Google Pub/Sub

Server Source code Package

Subscribes to a Google Cloud Pub/Sub topic and forwards each delivered message to the walkerOS collector. Ships two delivery models from the same package: a long-running streaming pull subscriber for container deployments, and an HTTP push webhook handler for serverless. Decoders for JSON, text, and raw payloads. Idempotent subscription provisioning with optional dead-letter wiring. Optional OIDC verification on the push handler.

The source ships inside @walkeros/server-source-gcp alongside the Cloud Functions handler; install the package once and import sourcePubSubPull or sourcePubSubPush.

Where this fits

Pub/Sub is a server source in the walkerOS flow:

Forwards each Pub/Sub message to the collector. Pull mode runs a streaming subscriber in long-lived containers; push mode receives HTTP envelopes in serverless deployments.

Installation

Loading...

Choosing pull vs push

AspectPull (sourcePubSubPull)Push (sourcePubSubPush)
Process modelLong-running streaming subscriberStateless HTTP request handler
HostingCloud Run with min-instances >= 1, GKE, GCE, on-premCloud Run scale-to-zero, Cloud Functions, Lambda
Idle costPay for always-on computeZero when idle
ThroughputHighest (gRPC streaming)Limited by HTTP overhead
LatencyLowest (warm stream, single-digit ms)Higher (HTTP per message, cold-start adds 100-500ms)
Auth directionSubscriber holds credentials, connects outboundPub/Sub holds credentials, posts inbound (OIDC verify)
Subscription typesubscription.type = "pull"subscription.type = "push" with pushConfig.pushEndpoint
Best fitLong-running collector services, high-volume streamsServerless walkerOS endpoints, hybrid setups

Both modes share the same @walkeros/server-source-gcp package, the same auth resolution, the same decoders, and the same setup command. Pick the one that matches your hosting model.

Pull subscriber

Loading...

The pull source is event-driven: init() opens the streaming subscription and forwards each delivered message to the collector via env.push. The source's push method is a deliberate no-op stub. There is no external invocation; the subscriber pushes events autonomously. destroy() closes the subscriber gracefully, honoring shutdownTimeoutMs (default 30000).

Pull settings

FieldTypeDefaultDescription
projectIdstring(required)GCP project id.
subscriptionstring(required)Subscription short name.
topicstring(optional)Topic short name. Used by setup auto-create when setup.createTopic is on.
credentialsstring | ServiceAccountCredentials(ADC fallback)Service account JSON (parsed object or JSON string).
apiEndpointstring(SDK default)Override for the emulator (e.g. localhost:8085).
decoder'json' | 'text' | 'raw''json'How the subscriber decodes message bodies before forwarding.
flowControl{ maxMessages?, maxBytes? }{ maxMessages: 100, maxBytes: 10 MB }Subscriber pull rate (in-flight cap).
ackDeadlinenumber60Per-message ack window in seconds.
shutdownTimeoutMsnumber30000Graceful drain budget on destroy().
onPushError'nack' | 'ack''nack'Behavior when forwarding to the collector throws.

Push webhook

Loading...

Pub/Sub posts each message envelope to the configured endpoint as a JSON body matching the Pub/Sub push spec. The handler decodes the envelope, base64-decodes the data field, runs the configured decoder, and forwards to the collector. Status codes:

  • 200: forwarded successfully.
  • 400: malformed envelope (missing message.data, invalid base64).
  • 401: OIDC verification failed (when enabled).
  • 500: collector push failed; Pub/Sub will retry per the subscription's retry policy.

Push settings

In addition to the shared base (projectId, credentials, apiEndpoint, decoder):

FieldTypeDefaultDescription
verifyOidcbooleanfalseWhen true, verifies the bearer token Pub/Sub attaches to push requests.
audiencestring-Required when verifyOidc is true. Must match your endpoint URL exactly.

Authentication

Three modes for the pull subscriber, evaluated in order:

  1. Application Default Credentials (ADC). Default. Works on GCP-native runtimes (Cloud Run, Cloud Functions, GKE) and locally with gcloud auth application-default login.
  2. Service account JSON. Pass settings.credentials, either parsed or as JSON string (the source JSON-parses strings). Combine with $env.NAME to inject from an environment variable.
  3. Pre-configured client. Pass an existing PubSub SDK instance as settings.client.

For the push handler, auth is reversed: Pub/Sub holds the identity and signs requests with an OIDC token. The handler verifies the token against GCP public keys when verifyOidc: true and audience is set. Off by default because misconfigured OIDC silently rejects all messages, so opt in once your endpoint is publicly reachable.

Setup

Provision the subscription once per environment:

Loading...

Idempotent. Defaults: 60-second ack deadline, project-default retention, no filter, no dead-letter policy. Drift on ackDeadlineSeconds, messageRetentionDuration, deadLetterPolicy, or filter emits WARN setup.drift {...} and never auto-mutates.

config.setup:

  • false (default): no provisioning.
  • true: provisions the subscription against an existing topic with safe defaults.
  • { ackDeadlineSeconds, messageRetentionDuration, filter, deadLetterPolicy, retryPolicy, ... }: object form for explicit overrides.

Optional one-shot helpers:

  • setup.createTopic: true: auto-create the topic if it does not exist (requires settings.topic). Useful when source and topic owners share a deployment.
  • setup.deadLetterPolicy.createDeadLetterTopic: true: auto-create the dead-letter topic referenced in deadLetterPolicy.deadLetterTopic.

The provisioning identity needs pubsub.subscriptions.create (and pubsub.topics.create if auto-creating). The runtime pull identity needs roles/pubsub.subscriber on the subscription.

Topic provisioning is owned by the Pub/Sub destination when the topic is the producer's responsibility; the source can also auto-create as a convenience for self-contained setups.

Decoders

DecoderBehavior
json (default)JSON.parse(data.toString('utf8'))
textdata.toString('utf8') (forwarded as-is)
rawThe raw Buffer is forwarded unchanged

A decoder that throws on malformed payloads triggers onPushError (pull mode): nack redelivers, ack drops with a debug log.

Emulator

Both pull and push honor PUBSUB_EMULATOR_HOST. For explicit configuration, set settings.apiEndpoint (e.g. localhost:8085).

Loading...

Lifecycle

The pull source is the canonical long-running listener pattern in walkerOS. Existing server sources (express, lambda, cloudfunction, fetch) are request-handlers; pull is fundamentally different:

  1. init() starts the streaming subscriber. It returns the Source.Instance and the subscriber runs in the background.
  2. The instance's push method is a no-op stub. The framework never calls it; the subscriber forwards events autonomously.
  3. destroy() closes the subscriber. On shutdown the source stops accepting new messages, drains in-flight ack/nacks, and force-closes after shutdownTimeoutMs.

Troubleshooting

  • NOT_FOUND on the subscription: the subscription does not exist. Run walkeros setup source.<id> once or create it via gcloud pubsub subscriptions create.
  • PERMISSION_DENIED / UNAUTHENTICATED on the pull subscriber: the runtime service account lacks roles/pubsub.subscriber on the subscription.
  • Push handler returns 401 for every message: OIDC misconfiguration. Verify settings.audience matches your endpoint URL exactly, including scheme and path.
  • Messages decoded incorrectly: switch the decoder. JSON is the default; use text for plain-text payloads, raw for binary.

Next steps

💡 Need implementation support?
elbwalker offers hands-on support: setup review, measurement planning, destination mapping, and live troubleshooting. Book a 2-hour session (€399)