Broker Publishing
Easygram can forward every incoming Telegram update to a message broker (Apache Kafka or RabbitMQ) before — or instead of — running local handlers. This enables event-driven architectures where multiple downstream services consume the same Telegram updates.
Architecture
Telegram
↓ transport (long-polling or webhook)
BotContextSetterFilter
↓
BotUpdatePublishingFilter ← messaging-producer
├─ KafkaBotUpdatePublisher (messaging-kafka)
└─ RabbitBotUpdatePublisher (messaging-rabbit)
↓
forward-only: true → STOP (local handlers skipped)
forward-only: false → continue to BotDispatcher
Dependencies
spring-boot-starter includes all messaging modules. For targeted setups:
<!-- Kafka publishing -->
<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>messaging-kafka</artifactId>
<version>0.0.1</version>
</dependency>
<!-- RabbitMQ publishing -->
<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>messaging-rabbit</artifactId>
<version>0.0.1</version>
</dependency>
<!-- Smart routing: Kafka OR RabbitMQ based on a single property -->
<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>messaging-producer</artifactId>
<version>0.0.1</version>
</dependency>
Kafka Producer Configuration
telegram:
bot:
messaging:
forward-only: true # Skip local handlers; updates go to Kafka only
producer:
producer-type: kafka
kafka:
topic: telegram-updates
create-if-absent: true # Auto-create topic on startup
partitions: 1
replication-factor: 1
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
RabbitMQ Producer Configuration
telegram:
bot:
messaging:
forward-only: true
producer:
producer-type: rabbit
rabbit:
exchange: telegram-exchange
routing-key: telegram.updates
queue: telegram-updates
create-if-absent: true # Auto-create exchange, queue, and binding
spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}
forward-only Mode
forward-only | Effect |
|---|---|
true | Updates are published to the broker. The BotDispatcher (and all local @BotController handlers) is never called. The bot acts as a pure ingest gateway. |
false | Updates are published to the broker and the local handler pipeline also runs. Useful for hybrid setups (quick local reply + broker for analytics). |
Pure ingest gateway — all processing in downstream services:
telegram:
bot:
messaging:
forward-only: true
Hybrid — local quick-reply + broker for analytics / audit:
telegram:
bot:
messaging:
forward-only: false
Message Format
Updates are serialized as JSON using the configured ObjectMapper. Example Kafka message:
{
"update_id": 123456789,
"message": {
"message_id": 42,
"from": { "id": 12345, "first_name": "Alice", "language_code": "en" },
"chat": { "id": 12345, "type": "private" },
"date": 1718000000,
"text": "/start"
}
}
- Kafka key:
update_idas a string - RabbitMQ routing key: configured via
rabbit.routing-key
Custom BotUpdatePublisher
To publish to a different broker (Google Pub/Sub, Amazon SNS, NATS, etc.), implement
BotUpdatePublisher:
public interface BotUpdatePublisher {
void publish(BotRequest request) throws Exception;
}
Register as a bean:
@Bean
@ConditionalOnMissingBean(BotUpdatePublisher.class)
public BotUpdatePublisher pubSubPublisher(PubSubTemplate pubSub,
ObjectMapper objectMapper) {
return request -> {
String json = objectMapper.writeValueAsString(request.getUpdate());
pubSub.publish("telegram-updates", json).get();
};
}
Consuming Updates from the Broker
Use the consumer transport modules to process updates through the full bot pipeline:
# Consumer application.yml
telegram:
bot:
transport: KAFKA_CONSUMER # or RABBIT_CONSUMER
messaging:
kafka:
topic: telegram-updates
group-id: my-bot-consumer-group
spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
See the Kafka Consumer Guide and RabbitMQ Consumer Guide for complete configuration.
Error Handling
- Kafka: Spring Kafka retries according to
spring.kafka.producer.retries. On exhaustion the exception is logged and the update is dropped. - RabbitMQ: Spring AMQP retries according to
spring.rabbitmq.template.retry.
Both built-in publishers are fire-and-forget. Configure a Dead-Letter Topic/Queue for workloads where update loss is unacceptable.
Full Example
See Kafka Producer Bot Example for a complete runnable sample.
See also: