Skip to main content
Version: 0.0.5

Broker Publishing

Easygram can forward every incoming Telegram update to a message broker (Apache Kafka or RabbitMQ) before — or instead of — running local handlers. This enables event-driven architectures where multiple downstream services consume the same Telegram updates.

Architecture

Telegram
↓ transport (long-polling or webhook)
BotMdcFilter ← sets MDC: bot.update.id, bot.transport
BotContextSetterFilter ← extracts Chat + User

BotUpdatePublishingFilter ← messaging-api (producer auto-config)
KafkaBotUpdatePublisher (messaging-api, requires spring-kafka)
RabbitBotUpdatePublisher (messaging-api, requires spring-amqp)

forward-only: true → STOP (local handlers skipped)
forward-only: false → continue to BotDispatcher

Dependencies

spring-boot-starter includes messaging-api automatically. For targeted setups:

<!-- All broker functionality: Kafka + RabbitMQ publishing and consuming -->
<dependency>
<groupId>uz.osoncode.easygram</groupId>
<artifactId>messaging-api</artifactId>
<version>0.0.5</version>
</dependency>

<!-- Required for Kafka: spring-kafka is optional in messaging-api -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- Required for RabbitMQ: spring-amqp is optional in messaging-api -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Kafka Producer Configuration

easygram:
token: ${BOT_TOKEN}
messaging:
type: PRODUCER
forward-only: true # Skip local handlers; updates go to Kafka only
producer:
type: KAFKA
kafka:
topic: easygram-updates
create-if-absent: true
partitions: 1
replication-factor: 1

spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

RabbitMQ Producer Configuration

easygram:
token: ${BOT_TOKEN}
messaging:
type: PRODUCER
forward-only: true
producer:
type: RABBIT
rabbit:
exchange: easygram-exchange
routing-key: easygram.updates
queue: easygram-updates
create-if-absent: true

spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}
port: ${RABBITMQ_PORT:5672}
username: ${RABBITMQ_USER:guest}
password: ${RABBITMQ_PASS:guest}

forward-only Mode

forward-onlyEffect
trueUpdates are published to the broker. The BotDispatcher (and all local @BotController handlers) is never called. The bot acts as a pure ingest gateway.
falseUpdates are published to the broker and the local handler pipeline also runs. Useful for hybrid setups (quick local reply + broker for analytics).

Pure ingest gateway — all processing in downstream services:

easygram:
messaging:
forward-only: true

Hybrid — local quick-reply + broker for analytics / audit:

easygram:
messaging:
forward-only: false

Message Format

Updates are serialized as JSON using the configured ObjectMapper. Example Kafka message:

{
"update_id": 123456789,
"message": {
"message_id": 42,
"from": { "id": 12345, "first_name": "Alice", "language_code": "en" },
"chat": { "id": 12345, "type": "private" },
"date": 1718000000,
"text": "/start"
}
}
  • Kafka key: update_id as a string
  • RabbitMQ routing key: configured via rabbit.routing-key

Custom BotUpdatePublisher

To publish to a different broker (Google Pub/Sub, Amazon SNS, NATS, etc.), implement BotUpdatePublisher:

public interface BotUpdatePublisher {
void publish(BotRequest request) throws Exception;
}

Register as a bean:

@Bean
@ConditionalOnMissingBean(BotUpdatePublisher.class)
public BotUpdatePublisher pubSubPublisher(PubSubTemplate pubSub,
ObjectMapper objectMapper) {
return request -> {
String json = objectMapper.writeValueAsString(request.getUpdate());
pubSub.publish("easygram-updates", json).get();
};
}

Consuming Updates from the Broker

Use the consumer transport to process updates through the full bot pipeline. A consumer bot does not poll Telegram directly — it reads from the broker topic/queue:

# Consumer application.yml (Kafka)
easygram:
token: ${BOT_TOKEN}
messaging:
type: CONSUMER
consumer:
type: KAFKA
kafka:
topic: easygram-updates
group-id: my-bot-consumer-group

spring:
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Consumer application.yml (RabbitMQ)
easygram:
token: ${BOT_TOKEN}
messaging:
type: CONSUMER
consumer:
type: RABBIT
rabbit:
exchange: easygram-exchange
queue: easygram-updates

spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost}

See the Kafka Consumer Guide and RabbitMQ Consumer Guide for complete configuration.

Error Handling

  • Kafka: Spring Kafka retries according to spring.kafka.producer.retries. On exhaustion the exception is logged and the update is dropped.
  • RabbitMQ: Spring AMQP retries according to spring.rabbitmq.template.retry.
warning

Both built-in publishers are fire-and-forget. Configure a Dead-Letter Topic/Queue for workloads where update loss is unacceptable.

Full Example

See Kafka Producer Bot Example for a complete runnable sample.


See also: