Collector
The collector is the central processing engine of walkerOS that receives events from sources, enriches them with additional data, applies consent rules, and routes them to destinations. It acts as the intelligent middleware between event capture and event delivery.
What it does
The Collector transforms raw events into enriched, compliant data streams by:
- Event processing - Validates, normalizes, and enriches incoming events
- Consent management - Applies privacy rules and user consent preferences
- Data enrichment - Adds session data, user context, and custom properties
- Destination routing - Sends processed events to configured analytics platforms
Key features
- Compatibility - Works in both web browsers and server environments
- Privacy-first - Built-in consent management and data protection
- Event validation - Ensures data quality and consistency
- Flexible routing - Send events to multiple destinations simultaneously
- Delivery status - Built-in per-source and per-destination delivery tracking
Role in architecture
In the walkerOS data flow, the collector sits between sources and destinations:
Sources capture events and send them to the collector, which processes and routes them to your chosen destinations like Google Analytics, custom APIs, or data warehouses.
Installation
Basic setup
Settings
| Property | Type | Description | More |
|---|---|---|---|
run | boolean | Whether to run collector automatically on initialization | |
globalsStatic | WalkerOS.Properties | Static global properties that persist across collector runs | |
sessionStatic | Collector.SessionStatic | Static session data that persists across collector runs | |
logger | Logger.Config | Logger configuration (level, handler) | |
queueMax | number | Maximum events retained in collector.queue (late-registration replay). FIFO drop on overflow. Default 1000. | |
consent | WalkerOS.Consent | Initial consent state | |
user | any | Initial user data | |
globals | WalkerOS.Properties | Initial global properties | |
sources | Source.InitSources | Source configurations | |
destinations | Destination.InitDestinations | Destination configurations | |
transformers | Transformer.Configs | Transformer configurations | |
stores | Store.Configs | Store configurations | |
custom | WalkerOS.Properties | Initial custom implementation-specific properties | |
hooks | Collector.Hooks | Pipeline observation hooks |
Advanced setup
Event transformation
The collector works with mapping to transform events as they flow through the system. Mapping is configured at the destination level and controls how walkerOS events are converted to vendor-specific formats.
For example, transforming a product add event into GA4's add_to_cart:
Status
The collector tracks delivery metrics on collector.status, giving you real-time
visibility into event flow without external monitoring:
What failed counts
collector.status.failed is the single counter for any walkerOS-internal
pipeline failure. Contributing sites:
- a destination's
pushthrew or returned an error, - an exception escaped the inner pipeline of
collector.pushorcollector.command(any uncaught error inside the boundary), - mapping outer-wrap failures (an internal throw inside
processMappingValue), - source startup failures (
code()threw,init()threw, queued-on flush threw), - transformer init failures,
- destination init failures.
User-supplied callback throws are visibility-only: they log at error
level but do NOT increment status.failed. This keeps the counter a
clean pipeline-health signal. Sites in this group:
onsubscriptions (destination.on,source.on, consent rules,ready,run,session, generic),- mapping
condition,fn, andvalidatecallbacks.
In both cases the collector logs at error level:
A source whose init() throws stays with config.init === false
instead of being marked initialized. Operators reading
source.config.init see the source visibly stuck, not falsely healthy.
Alarm on the ratio of status.failed to status.in. The log message text
plus structured fields are enough to filter destination vs boundary
failures downstream.
PII note. Boundary error logs include the full failing event payload so operators can reproduce. If your event payloads carry sensitive data, configure redaction at the logger layer; do not parse and rewrite at every call site.
Fatal errors
Throw FatalError (exported from @walkeros/core) for invariant
violations or operator-initiated aborts that must crash the host process.
Standard Error is absorbed by the boundary catch, logged, and counted.
FatalError bypasses the catch and propagates, so a supervisor (CLI
runner, Express server, container orchestrator) can terminate cleanly.
Queue sizes and DLQ sizes can be read directly from destination instances, or from the status snapshot:
Buffer bounds
The collector keeps three internal buffers per process. Each is size-bounded
with a configurable cap; on overflow the oldest entries are evicted (FIFO),
the corresponding dropped counter is incremented, and a warning is logged
once per overflow window.
| Buffer | Purpose | Default cap | Config |
|---|---|---|---|
collector.queue | Replay buffer for late-registered destinations | 1000 | Collector.Config.queueMax |
destination.queuePush | Per-destination consent-denied buffer | 1000 | Destination.Config.queueMax |
destination.dlq | Per-destination dead-letter queue of failed pushes | 100 | Destination.Config.dlqMax |
Each step has its own knob; no cascade. Set the collector cap on the collector, and per-destination caps on each destination:
Operators alarm on the dropped counters to detect sustained overflow.
Counters live on collector.status.dropped, keyed by stepId (build the
key with stepId() from @walkeros/core):
See also
- Operating modes: integrated vs bundled approaches
- CLI documentation: configure with JSON and build with CLI
- Sources: available event capture sources
- Destinations: available event delivery destinations