Kafka Producer Bot Example
A long-polling bot that forwards every incoming Telegram update to Apache Kafka. With
forward-only: true, local handlers are never called — the bot acts as a pure ingest
gateway. A downstream Kafka consumer service handles the actual processing.
Full source: samples/longpolling-as-producer
Architecture
Telegram
↓ long-polling
BotContextSetterFilter
↓
BotUpdatePublishingFilter → Kafka topic "easygram-updates"
↓ (forward-only: true)
STOP — local @BotController handlers never run
Use Cases
- Audit / analytics — persist every update to a data warehouse via Kafka Connect
- Fan-out — multiple consumer groups each process the same update stream differently
- Decoupling — scale the Telegram gateway and processing service independently
- Event sourcing — Kafka as the source of truth for all bot interactions
Project Setup
Maven
<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>0.0.7</version>
</dependency>
application.yml
easygram:
token: "${BOT_TOKEN}"
# transport: LONG_POLLING is the default
messaging:
# true → forward ONLY to broker; local @BotController handlers are skipped
# false → forward to broker AND run local handlers
forward-only: true
producer:
type: KAFKA
kafka:
topic: easygram-updates
create-if-absent: true # Auto-create topic on startup
partitions: 1
replication-factor: 1
spring:
application:
name: longpolling-as-producer
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
logging:
level:
uz.osoncode: DEBUG
org.springframework.kafka: WARN
Application Class
package uz.example.producer.longpolling;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Pure long-polling producer bot.
* All infrastructure uses framework defaults — no overrides needed.
* Updates are forwarded to Kafka; local handlers are never called when forward-only=true.
*/
@SpringBootApplication
public class LongpollingProducerApplication {
public static void main(String[] args) {
SpringApplication.run(LongpollingProducerApplication.class, args);
}
}
Optional Local Controller
You can keep a @BotController even in forward-only: true mode. It will be available when
you switch to forward-only: false for hybrid operation:
@BotController
public class EchoBotController {
@BotCommand("/start")
public String onStart(User user) {
return "Hello, " + user.getFirstName()
+ "! Updates are being forwarded to the message broker.";
}
@BotTextDefault
public String onText(@BotTextValue String text) {
return "Forwarded to broker: " + text;
}
@BotDefaultHandler
public SendMessage onUnknown(BotRequest request) {
return SendMessage.builder()
.chatId(request.getChat().getId())
.text("Update forwarded to broker.")
.build();
}
}
With forward-only: true these handlers are never invoked. Set forward-only: false to run
both the broker publish and the local handlers simultaneously.
Kafka Message Format
Each update is serialized as a JSON string using the configured ObjectMapper:
{
"update_id": 123456789,
"message": {
"message_id": 42,
"from": { "id": 12345, "first_name": "Alice", "language_code": "en" },
"chat": { "id": 12345, "type": "private" },
"date": 1718000000,
"text": "/start"
}
}
Kafka message key: update_id as a string. This enables partition ordering per update ID
when using a custom key-based partitioner.
Downstream Consumer
Use the consumer transport to process updates through the full bot pipeline in a separate application:
# consumer/application.yml
easygram:
token: "${BOT_TOKEN}"
update:
transport: KAFKA_CONSUMER
messaging:
kafka:
topic: easygram-updates
group-id: my-bot-consumer-group
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
See the Kafka Consumer Guide for full configuration.
RabbitMQ Variant
Switch to RabbitMQ by changing two properties:
easygram:
messaging:
type: PRODUCER
producer:
type: RABBIT # was: KAFKA
rabbit:
exchange: easygram-exchange
routing-key: easygram.updates
queue: easygram-updates
create-if-absent: true
spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: 5672
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
Running with Docker Compose
The samples/producer-bot module provides ready-to-use compose files with Bitnami KRaft
(no ZooKeeper). For a minimal custom setup:
services:
kafka:
image: bitnami/kafka:latest
environment:
KAFKA_CFG_NODE_ID: "1"
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
ports: ["9092:9092"]
bot:
image: my-kafka-producer-bot:latest
depends_on: [kafka]
environment:
easygram.token: "${BOT_TOKEN}"
easygram.messaging.forward-only: "true"
easygram.messaging.producer.type: KAFKA
easygram.messaging.kafka.topic: easygram-updates
spring.kafka.bootstrap-servers: kafka:9092
See Docker Producer Bot for the full production-ready module.
Error Handling
- If Kafka is unavailable and retries are exhausted, the exception is logged and the update is dropped. The long-polling offset still advances — the update is not retried.
- For guaranteed delivery configure a Dead-Letter Topic (DLT):
spring:
kafka:
producer:
retries: 5
See also:
- Broker Publishing — full configuration reference
- Kafka Consumer Guide — consuming updates
- Echo Bot Example — local processing without a broker