Skip to main content
Version: 0.0.6

Kafka Producer Bot Example

A long-polling bot that forwards every incoming Telegram update to Apache Kafka. With forward-only: true, local handlers are never called — the bot acts as a pure ingest gateway. A downstream Kafka consumer service handles the actual processing.

Full source: samples/longpolling-as-producer

Architecture

Telegram
↓ long-polling
BotContextSetterFilter

BotUpdatePublishingFilter → Kafka topic "easygram-updates"
↓ (forward-only: true)
STOP — local @BotController handlers never run

Use Cases

  • Audit / analytics — persist every update to a data warehouse via Kafka Connect
  • Fan-out — multiple consumer groups each process the same update stream differently
  • Decoupling — scale the Telegram gateway and processing service independently
  • Event sourcing — Kafka as the source of truth for all bot interactions

Project Setup

Maven

<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>0.0.6</version>
</dependency>

application.yml

easygram:
token: "${BOT_TOKEN}"
# transport: LONG_POLLING is the default

messaging:
# true → forward ONLY to broker; local @BotController handlers are skipped
# false → forward to broker AND run local handlers
forward-only: true

producer:
type: KAFKA

kafka:
topic: easygram-updates
create-if-absent: true # Auto-create topic on startup
partitions: 1
replication-factor: 1

spring:
application:
name: longpolling-as-producer

kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3

logging:
level:
uz.osoncode: DEBUG
org.springframework.kafka: WARN

Application Class

package uz.example.producer.longpolling;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Pure long-polling producer bot.
* All infrastructure uses framework defaults — no overrides needed.
* Updates are forwarded to Kafka; local handlers are never called when forward-only=true.
*/
@SpringBootApplication
public class LongpollingProducerApplication {

public static void main(String[] args) {
SpringApplication.run(LongpollingProducerApplication.class, args);
}
}

Optional Local Controller

You can keep a @BotController even in forward-only: true mode. It will be available when you switch to forward-only: false for hybrid operation:

@BotController
public class EchoBotController {

@BotCommand("/start")
public String onStart(User user) {
return "Hello, " + user.getFirstName()
+ "! Updates are being forwarded to the message broker.";
}

@BotTextDefault
public String onText(@BotTextValue String text) {
return "Forwarded to broker: " + text;
}

@BotDefaultHandler
public SendMessage onUnknown(BotRequest request) {
return SendMessage.builder()
.chatId(request.getChat().getId())
.text("Update forwarded to broker.")
.build();
}
}
note

With forward-only: true these handlers are never invoked. Set forward-only: false to run both the broker publish and the local handlers simultaneously.

Kafka Message Format

Each update is serialized as a JSON string using the configured ObjectMapper:

{
"update_id": 123456789,
"message": {
"message_id": 42,
"from": { "id": 12345, "first_name": "Alice", "language_code": "en" },
"chat": { "id": 12345, "type": "private" },
"date": 1718000000,
"text": "/start"
}
}

Kafka message key: update_id as a string. This enables partition ordering per update ID when using a custom key-based partitioner.

Downstream Consumer

Use the consumer transport to process updates through the full bot pipeline in a separate application:

# consumer/application.yml
easygram:
token: "${BOT_TOKEN}"
messaging:
type: CONSUMER
consumer:
type: KAFKA
kafka:
topic: easygram-updates
group-id: my-bot-consumer-group

spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

See the Kafka Consumer Guide for full configuration.

RabbitMQ Variant

Switch to RabbitMQ by changing two properties:

easygram:
messaging:
type: PRODUCER
producer:
type: RABBIT # was: KAFKA
rabbit:
exchange: easygram-exchange
routing-key: easygram.updates
queue: easygram-updates
create-if-absent: true

spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: 5672
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}

Running with Docker Compose

The samples/producer-bot module provides ready-to-use compose files with Bitnami KRaft (no ZooKeeper). For a minimal custom setup:

services:
kafka:
image: bitnami/kafka:latest
environment:
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
ports: ["9092:9092"]

bot:
image: my-kafka-producer-bot:latest
depends_on: [kafka]
environment:
easygram.token: "${BOT_TOKEN}"
easygram.messaging.type: PRODUCER
easygram.messaging.forward-only: "true"
easygram.messaging.producer.type: KAFKA
easygram.messaging.kafka.topic: easygram-updates
spring.kafka.bootstrap-servers: kafka:9092

See Docker Producer Bot for the full production-ready module.

Error Handling

  • If Kafka is unavailable and retries are exhausted, the exception is logged and the update is dropped. The long-polling offset still advances — the update is not retried.
  • For guaranteed delivery configure a Dead-Letter Topic (DLT):
spring:
kafka:
producer:
retries: 5

See also: