Skip to main content

Apache Kafka

Server Source code Package Beta

Server-side event streaming to Apache Kafka via the kafkajs client. Each event is serialized as JSON, keyed for partition-friendly ordering, and produced to a configurable topic. Supports SASL/SSL authentication (Confluent Cloud, AWS MSK, SCRAM), configurable compression (gzip, snappy, lz4, zstd), per-rule topic and key overrides, and graceful shutdown via destroy().

Where this fits

Kafka is a server destination in the walkerOS flow:

Receives events server-side from the collector, serializes them as JSON with a partition-friendly message key, and produces them to a Kafka topic for downstream stream processing (Flink, Spark, ksqlDB, Kafka Connect, consumers).

Installation

Loading...
Loading...

Configuration

This destination uses the standard destination config wrapper (consent, data, env, id, ...). For the shared fields see destination configuration. Package-specific fields live under config.settings and are listed below.

Settings

PropertyTypeDescriptionMore
kafka*kafkaKafka connection and producer settings.
brokers*Array<string>Kafka broker addresses (host:port). At least one required.
clientIdstringKafka client ID. Default: walkeros.
sslboolean | objectTLS configuration. Set true for default TLS, or provide a tls.ConnectionOptions object for mTLS.
saslsaslSASL authentication config. Required for Confluent Cloud, AWS MSK with IAM, etc.
mechanism*SASL authentication mechanism.
usernamestringUsername for plain/scram mechanisms.
passwordstringPassword for plain/scram mechanisms.
accessKeyIdstringAWS access key ID for IAM auth (mechanism: aws).
secretAccessKeystringAWS secret access key for IAM auth (mechanism: aws).
sessionTokenstringAWS session token for temporary credentials (mechanism: aws).
authorizationIdentitystringAWS authorization identity (mechanism: aws).
connectionTimeoutintegerConnection timeout in ms. Default: 1000.
requestTimeoutintegerRequest timeout in ms. Default: 30000.
topic*stringTarget Kafka topic name.
acksintegerAcknowledgement level. -1 = all replicas, 0 = fire-and-forget, 1 = leader only. Default: -1.
timeoutintegerBroker response timeout in ms. Default: 30000.
compressionMessage compression codec. Default: gzip. Snappy/LZ4/ZSTD require additional npm packages.
idempotentbooleanEnable idempotent producer for exactly-once delivery. Default: false.
allowAutoTopicCreationbooleanAllow auto-creation of topics on the broker. Default: false.
keystringMapping value path for message key derivation (e.g. user.id, data.userId). Default: entity_action.
headersRecord<string, string>Static headers added to every message.
retryretryRetry configuration for transient failures.
maxRetryTimeintegerMax total retry wait in ms. Default: 30000.
initialRetryTimeintegerFirst retry delay in ms. Default: 300.
retriesintegerMax retry count. Default: 5.
* Required fields

Mapping

Per-event rules under config.mapping. For the standard rule fields (consent, condition, data, batch, name, policy) see mapping.

PropertyTypeDescriptionMore
keystringOverride message key mapping path for this rule (e.g. data.id). Takes precedence over settings.kafka.key.
topicstringOverride Kafka topic for this rule. Takes precedence over settings.kafka.topic.

Examples

Default event

An event is produced to the configured Kafka topic with the full JSON body and entity_action as the message key.

Event
Out

Key from user id

A settings.kafka.key path resolves the message key from the event, here using user.id for per-user partitioning.

Event
Out

Mapped payload

A data mapping transforms the event payload before producing it as the Kafka message value.

Event
Mapping
Out

Renamed event

A mapping renames the event which also changes the default Kafka message key used for partitioning.

Event
Mapping
Out

Topic override

A mapping rule overrides the destination topic so specific events are routed to a dedicated stream.

Event
Mapping
Out

The destination creates a single long-lived kafkajs producer during init() and calls producer.connect() before accepting events. On flow hot-swap or server shutdown, destroy() calls producer.disconnect() to flush in-flight messages and close TCP connections.

Message format

Events are serialized as JSON and produced with the following structure:

  • topic, from settings.kafka.topic (or mapping.settings.topic override)
  • key, resolved from settings.kafka.key (or mapping.settings.key override) mapping path; defaults to the event name with spaces replaced by _ (e.g. page_view, order_complete) for partition-based ordering
  • value, mapped payload (when data.map is configured) or the full walkerOS event, JSON.stringify()-ed
  • headers, content-type: application/json plus any static settings.kafka.headers
  • timestamp, event timestamp as string (ms since epoch)

Use mapping.settings.topic to route specific events to dedicated topics (e.g. orders to orders-stream, identities to identity-stream). Use mapping.settings.key to set a key path per rule (e.g. data.order_id for order events).

Setup

No safe defaults

Kafka topic creation requires explicit numPartitions and replicationFactor. There is no universally correct default for either: replicationFactor must be less than or equal to broker count, and numPartitions is a function of expected throughput and consumer parallelism. The boolean form setup: true is rejected with an error listing the required fields. Only the object form is valid.

Provision a topic once per environment with the CLI:

Loading...

Output: setup: ok destination.kafka plus a JSON line reporting { topicCreated, schemaRegistered }. The command is idempotent, safe to re-run. Drift on numPartitions, replicationFactor, or configEntries is logged as WARN setup.drift {...} and never auto-mutates the broker.

config.setup:

  • false (default): no provisioning. Operator must run setup explicitly.
  • true: rejected at runtime with an actionable error. There are no safe defaults for partition count or replication factor.
  • { numPartitions, replicationFactor, ... }: object form is the only valid form. See the Setup interface in the package for full options.

Required fields

FieldTypeNotes
numPartitionsnumberRequired at runtime. No safe default.
replicationFactornumberRequired at runtime. Must be <= broker count.
topicstringFalls back to settings.kafka.topic when omitted.

Optional fields

FieldTypeNotes
configEntriesobjectTopic-level config, e.g. { "retention.ms": "604800000" }.
schemaRegistryobjectConfluent Schema Registry binding (see below).
validateOnlybooleankafkajs broker-side dry-run. No topic is created.

Example

Loading...

Schema Registry (optional)

Loading...

The schema is registered via the Confluent Schema Registry REST API. The optional compatibility level is set on the subject after registration.

Runtime error when the topic is missing

When setup was not run and the topic does not exist on the cluster, push() catches the kafkajs UNKNOWN_TOPIC_OR_PARTITION error and logs an actionable message pointing the operator at walkeros setup destination.<id>. Run setup with explicit numPartitions and replicationFactor to provision the topic.

Authentication

Confluent Cloud (SASL/PLAIN)

Loading...

AWS MSK (IAM)

Loading...
💡 Need implementation support?
elbwalker offers hands-on support: setup review, measurement planning, destination mapping, and live troubleshooting. Book a 2-hour session (€399)