API & SSE Service
Overview
The API service is a hybrid NestJS application that acts as both an HTTP server (REST endpoints) and a Kafka consumer. It consumes processed comments from Kafka, stores them in PostgreSQL, serves them via REST API, and broadcasts new comments in real-time using Server-Sent Events (SSE).
Purpose
- Provide REST endpoints for dashboard data fetching
- Stream real-time comment updates via Server-Sent Events
- Consume
processed-commentsfrom Kafka - Calculate aggregate statistics (for REST endpoint)
- Bridge between backend processing and frontend UI
Architecture
┌──────────────────────┐
│ Kafka Consumer │
│ (processed-comments)│
└──────────┬───────────┘
│
▼
┌──────────────────────┐
│ Kafka Consumer │
│ Service │
└──────────┬───────────┘
│
┌─────┴──────┐
│ │
▼ ▼
┌─────────┐ ┌─────────┐
│Database │ │ SSE │
│ Service │ │ Service │
└─────┬───┘ └────┬────┘
│ │
│ ▼
│ ┌────────────┐
│ │ Subject │
│ │ (RxJS) │
│ └────┬───────┘
│ │
▼ ▼
┌─────────────────────┐
│ HTTP Endpoints │
│ - GET /api/comments│
│ - GET /api/stats │
│ - GET /api/sse/... │
│ - GET /health │
└─────────────────────┘Key Components
1. Kafka Consumer Service
Location: api/src/kafka-consumer.service.ts
Purpose: Listens to processed-comments topic and streams to SSE clients
@Controller()
export class KafkaConsumerService {
@EventPattern(KAFKA_TOPICS.PROCESSED_COMMENTS)
async handleProcessedComment(data: ProcessedComment) {
// Comment is already saved by consumer service
// API only needs to broadcast via SSE for real-time updates
this.sseService.emitComment(data)
}
}- Receives processed comments for real-time streaming
- Comments already persisted by consumer service
- Decouples API from consumer processing
- Multiple API instances can stream same topic
- Event-driven updates without database polling
Note: API does not re-save comments to database. The consumer service already persisted them. API's role is purely streaming via SSE for real-time dashboard updates
2. Comments Service
Location: api/src/comments.service.ts
@Injectable()
export class CommentsService {
async getComments(options: QueryOptions): Promise<PaginatedResponse> {
const { page = 1, pageSize = 20, tag, search } = options
const query = this.commentRepository
.createQueryBuilder('comment')
.orderBy('comment.processedAt', 'DESC')
// Filter by tag if provided
if (tag) {
query.andWhere('comment.tag = :tag', { tag })
}
// Filter by search text if provided
if (search) {
query.andWhere('comment.text ILIKE :search', {
search: `%${search}%`
})
}
// Pagination
const skip = (page - 1) * pageSize
query.skip(skip).take(pageSize)
const [data, total] = await query.getManyAndCount()
return {
data,
total,
page,
pageSize,
totalPages: Math.ceil(total / pageSize)
}
}
async getStatistics(): Promise<CommentStatistics> {
const all = await this.commentRepository.find()
// Calculate totals by tag
const byTag = all.reduce((acc, comment) => {
acc[comment.tag] = (acc[comment.tag] || 0) + 1
return acc
}, {
positive: 0,
negative: 0,
neutral: 0,
unrelated: 0
})
// Last hour count
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000)
const lastHour = await this.commentRepository
.createQueryBuilder('comment')
.where('comment.processedAt >= :oneHourAgo', { oneHourAgo })
.getCount()
return {
total: all.length,
byTag,
lastHour
}
}
}3. SSE Service
Location: api/src/sse.service.ts
Purpose: Manages Server-Sent Events broadcasting to connected clients
Implementation using RxJS:@Injectable()
export class SSEService {
private readonly eventSubject = new Subject<MessageEvent>()
getEventStream(): Observable<MessageEvent> {
return this.eventSubject.asObservable()
}
emitComment(comment: ProcessedComment): void {
this.eventSubject.next({
type: 'comment',
data: JSON.stringify(comment)
})
}
}- Multiple clients can subscribe
- Broadcast to all connections
- Automatic cleanup when connections close
- Reactive programming pattern
4. HTTP Controllers
Comments Controller
Location: api/src/comments.controller.ts
Endpoint: GET /api/comments
page(default: 1)pageSize(default: 20)tag(optional: positive | negative | neutral | unrelated)search(optional: text search)
GET /api/comments?page=2&pageSize=10&tag=positive&search=food{
"data": [
{
"id": 123,
"commentId": "abc-123",
"text": "Amazing food!",
"tag": "positive",
"source": "instagram",
"processedAt": "2026-03-26T10:30:00Z",
"retryCount": 0
}
],
"total": 156,
"page": 2,
"pageSize": 10,
"totalPages": 16
}Statistics Controller
Location: api/src/statistics.controller.ts
Endpoint: GET /api/statistics
{
"total": 1523,
"byTag": {
"positive": 456,
"negative": 123,
"neutral": 789,
"unrelated": 155
},
"lastHour": 234
}SSE Controller
Location: api/src/sse.controller.ts
Endpoint: GET /api/sse/comments
Response: Event stream (text/event-stream)
event: comment
data: {"commentId":"abc-123","text":"Great!","tag":"positive",...}
event: comment
data: {"commentId":"def-456","text":"Terrible!","tag":"negative",...}@Controller('api/sse')
export class SSEController {
@Get('comments')
streamComments(@Res() res: Response): Observable<MessageEvent> {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
return this.sseService.getEventStream()
}
}event: comment- Comment event typedata: <json>- JSON payload- Empty line - Message separator
5. Health Endpoint
Endpoint: GET /health
{
"status": "ok",
"timestamp": "2026-03-26T10:30:00Z",
"uptime": 3600,
"commentsProcessed": 1523,
"activeConnections": 5,
"kafkaConnected": true
}Behavior Details
SSE Connection Flow
Client (Dashboard) side:const eventSource = new EventSource('http://localhost:3001/api/sse/comments')
eventSource.addEventListener('comment', (event) => {
const comment = JSON.parse(event.data)
// Validate with schema and insert into TanStack DB collection
})
eventSource.onerror = () => {
// Browser automatically reconnects
}// When comment processed:
this.sseService.emitComment(comment)
// → Sent to all connected clientsMessage Flow
Complete flow from Kafka to Dashboard:-
Consumer publishes to
processed-commentstopic:{"commentId":"abc","text":"Great!","tag":"positive",...} -
API's Kafka consumer receives message
-
API saves to database:
INSERT INTO comments ... -
API emits SSE event:
event: comment data: {"commentId":"abc",...} -
All connected browsers receive event
-
Dashboard updates UI in real-time
CORS Configuration
Enabled for dashboard origin:app.enableCors({
origin: 'http://localhost:3000',
credentials: true
})- Dashboard runs on different port (3000)
- API runs on port 3001
- Browser requires CORS headers
Hybrid Application
Bootstrap: api/src/main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule)
// HTTP server for REST + SSE
app.enableCors()
await app.listen(3001)
// Kafka consumer for processed-comments
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.KAFKA,
options: {
client: {
brokers: [KAFKA_BROKER]
},
consumer: {
groupId: 'api-service-consumer'
}
}
})
await app.startAllMicroservices()
}- Single service handles both concerns
- Shared database connection
- Simpler deployment
- Direct coordination between Kafka and HTTP
Performance Characteristics
REST Endpoints:/api/comments: 20-50ms (database query)/api/statistics: 10-30ms (cached or aggregated)
- Connection overhead: < 10ms
- Event delivery: < 5ms
- Concurrent connections: 100+ supported
- Lag: Typically 0-10 messages
- Processing: 5-20ms per message
Monitoring
Logs:[API] Kafka consumer connected to processed-comments
[API] Received processed comment: abc-123
[API] Saved comment to database: abc-123
[API] Emitted SSE event: comment
[API] New SSE connection from ::ffff:127.0.0.1
[API] SSE connection closed- Total comments in database
- SSE connections active
- Kafka consumer lag
- REST endpoint latencies
- Events emitted per second
Development
Run locally:cd api
pnpm devAPI_PORT=3001
API_CORS_ORIGIN=http://localhost:3000
KAFKA_BROKER=localhost:9092
POSTGRES_HOST=localhost
Testing
Test REST Endpoints
# Get comments
curl http://localhost:3001/api/comments
# Get with filters
curl "http://localhost:3001/api/comments?tag=positive&page=1&pageSize=5"
# Get statistics
curl http://localhost:3001/api/statistics
# Health check
curl http://localhost:3001/healthTest SSE
# Stream events (use curl or browser)
curl -N http://localhost:3001/api/sse/comments
# Should see events as they arrive:
# event: comment
# data: {...}Verify Kafka Integration
# Publish test message to processed-comments
docker exec -it kafka-broker kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic processed-comments
# Type JSON:
{"commentId":"test","text":"Test","tag":"positive","source":"twitter","processedAt":"2026-03-26T10:00:00Z","retryCount":0}
# Check API logs - should see "Received processed comment: test"
# Check SSE stream - should emit eventNext Steps
- Frontend - How dashboard consumes the API
- Consumer Service - How comments are processed
- Getting Started - Full data flow