Skip to content

API & SSE Service

Overview

The API service is a hybrid NestJS application that acts as both an HTTP server (REST endpoints) and a Kafka consumer. It consumes processed comments from Kafka, stores them in PostgreSQL, serves them via REST API, and broadcasts new comments in real-time using Server-Sent Events (SSE).

Purpose

  • Provide REST endpoints for dashboard data fetching
  • Stream real-time comment updates via Server-Sent Events
  • Consume processed-comments from Kafka
  • Calculate aggregate statistics (for REST endpoint)
  • Bridge between backend processing and frontend UI

Architecture

┌──────────────────────┐
│  Kafka Consumer      │
│  (processed-comments)│
└──────────┬───────────┘


┌──────────────────────┐
│  Kafka Consumer      │
│  Service             │
└──────────┬───────────┘

     ┌─────┴──────┐
     │            │
     ▼            ▼
┌─────────┐  ┌─────────┐
│Database │  │   SSE   │
│ Service │  │ Service │
└─────┬───┘  └────┬────┘
      │           │
      │           ▼
      │    ┌────────────┐
      │    │ Subject    │
      │    │ (RxJS)     │
      │    └────┬───────┘
      │         │
      ▼         ▼
┌─────────────────────┐
│  HTTP Endpoints     │
│  - GET /api/comments│
│  - GET /api/stats   │
│  - GET /api/sse/... │
│  - GET /health      │
└─────────────────────┘

Key Components

1. Kafka Consumer Service

Location: api/src/kafka-consumer.service.ts

Purpose: Listens to processed-comments topic and streams to SSE clients

Implementation:
@Controller()
export class KafkaConsumerService {
  @EventPattern(KAFKA_TOPICS.PROCESSED_COMMENTS)
  async handleProcessedComment(data: ProcessedComment) {
    // Comment is already saved by consumer service
    // API only needs to broadcast via SSE for real-time updates
    this.sseService.emitComment(data)
  }
}
Why Kafka consumer in API:
  • Receives processed comments for real-time streaming
  • Comments already persisted by consumer service
  • Decouples API from consumer processing
  • Multiple API instances can stream same topic
  • Event-driven updates without database polling

Note: API does not re-save comments to database. The consumer service already persisted them. API's role is purely streaming via SSE for real-time dashboard updates

2. Comments Service

Location: api/src/comments.service.ts

Query operations:
@Injectable()
export class CommentsService {
 
  async getComments(options: QueryOptions): Promise<PaginatedResponse> {
    const { page = 1, pageSize = 20, tag, search } = options
    
    const query = this.commentRepository
      .createQueryBuilder('comment')
      .orderBy('comment.processedAt', 'DESC')
    
    // Filter by tag if provided
    if (tag) {
      query.andWhere('comment.tag = :tag', { tag })
    }
    
    // Filter by search text if provided
    if (search) {
      query.andWhere('comment.text ILIKE :search', { 
        search: `%${search}%` 
      })
    }
    
    // Pagination
    const skip = (page - 1) * pageSize
    query.skip(skip).take(pageSize)
    
    const [data, total] = await query.getManyAndCount()
    
    return {
      data,
      total,
      page,
      pageSize,
      totalPages: Math.ceil(total / pageSize)
    }
  }
 
  async getStatistics(): Promise<CommentStatistics> {
    const all = await this.commentRepository.find()
    
    // Calculate totals by tag
    const byTag = all.reduce((acc, comment) => {
      acc[comment.tag] = (acc[comment.tag] || 0) + 1
      return acc
    }, {
      positive: 0,
      negative: 0,
      neutral: 0,
      unrelated: 0
    })
    
    // Last hour count
    const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000)
    const lastHour = await this.commentRepository
      .createQueryBuilder('comment')
      .where('comment.processedAt >= :oneHourAgo', { oneHourAgo })
      .getCount()
    
    return {
      total: all.length,
      byTag,
      lastHour
    }
  }
}

3. SSE Service

Location: api/src/sse.service.ts

Purpose: Manages Server-Sent Events broadcasting to connected clients

Implementation using RxJS:
@Injectable()
export class SSEService {
  private readonly eventSubject = new Subject<MessageEvent>()
 
  getEventStream(): Observable<MessageEvent> {
    return this.eventSubject.asObservable()
  }
 
  emitComment(comment: ProcessedComment): void {
    this.eventSubject.next({
      type: 'comment',
      data: JSON.stringify(comment)
    })
  }
}
Why RxJS Subject:
  • Multiple clients can subscribe
  • Broadcast to all connections
  • Automatic cleanup when connections close
  • Reactive programming pattern

4. HTTP Controllers

Comments Controller

Location: api/src/comments.controller.ts

Endpoint: GET /api/comments

Query Parameters:
  • page (default: 1)
  • pageSize (default: 20)
  • tag (optional: positive | negative | neutral | unrelated)
  • search (optional: text search)
Example request:
GET /api/comments?page=2&pageSize=10&tag=positive&search=food
Response:
{
  "data": [
    {
      "id": 123,
      "commentId": "abc-123",
      "text": "Amazing food!",
      "tag": "positive",
      "source": "instagram",
      "processedAt": "2026-03-26T10:30:00Z",
      "retryCount": 0
    }
  ],
  "total": 156,
  "page": 2,
  "pageSize": 10,
  "totalPages": 16
}

Statistics Controller

Location: api/src/statistics.controller.ts

Endpoint: GET /api/statistics

Response:
{
  "total": 1523,
  "byTag": {
    "positive": 456,
    "negative": 123,
    "neutral": 789,
    "unrelated": 155
  },
  "lastHour": 234
}

SSE Controller

Location: api/src/sse.controller.ts

Endpoint: GET /api/sse/comments

Response: Event stream (text/event-stream)

event: comment
data: {"commentId":"abc-123","text":"Great!","tag":"positive",...}
 
event: comment
data: {"commentId":"def-456","text":"Terrible!","tag":"negative",...}
Implementation:
@Controller('api/sse')
export class SSEController {
  @Get('comments')
  streamComments(@Res() res: Response): Observable<MessageEvent> {
    res.setHeader('Content-Type', 'text/event-stream')
    res.setHeader('Cache-Control', 'no-cache')
    res.setHeader('Connection', 'keep-alive')
    
    return this.sseService.getEventStream()
  }
}
SSE Format:
  • event: comment - Comment event type
  • data: <json> - JSON payload
  • Empty line - Message separator

5. Health Endpoint

Endpoint: GET /health

Response:
{
  "status": "ok",
  "timestamp": "2026-03-26T10:30:00Z",
  "uptime": 3600,
  "commentsProcessed": 1523,
  "activeConnections": 5,
  "kafkaConnected": true
}

Behavior Details

SSE Connection Flow

Client (Dashboard) side:
const eventSource = new EventSource('http://localhost:3001/api/sse/comments')
 
eventSource.addEventListener('comment', (event) => {
  const comment = JSON.parse(event.data)
  // Validate with schema and insert into TanStack DB collection
})
 
eventSource.onerror = () => {
  // Browser automatically reconnects
}
Server side:
// When comment processed:
this.sseService.emitComment(comment)
// → Sent to all connected clients

Message Flow

Complete flow from Kafka to Dashboard:
  1. Consumer publishes to processed-comments topic:

    {"commentId":"abc","text":"Great!","tag":"positive",...}
  2. API's Kafka consumer receives message

  3. API saves to database:

    INSERT INTO comments ...
  4. API emits SSE event:

    event: comment
    data: {"commentId":"abc",...}
  5. All connected browsers receive event

  6. Dashboard updates UI in real-time

CORS Configuration

Enabled for dashboard origin:
app.enableCors({
  origin: 'http://localhost:3000',
  credentials: true
})
Why CORS:
  • Dashboard runs on different port (3000)
  • API runs on port 3001
  • Browser requires CORS headers

Hybrid Application

Bootstrap: api/src/main.ts

async function bootstrap() {
  const app = await NestFactory.create(AppModule)
  
  // HTTP server for REST + SSE
  app.enableCors()
  await app.listen(3001)
  
  // Kafka consumer for processed-comments
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: [KAFKA_BROKER]
      },
      consumer: {
        groupId: 'api-service-consumer'
      }
    }
  })
  
  await app.startAllMicroservices()
}
Why hybrid:
  • Single service handles both concerns
  • Shared database connection
  • Simpler deployment
  • Direct coordination between Kafka and HTTP

Performance Characteristics

REST Endpoints:
  • /api/comments: 20-50ms (database query)
  • /api/statistics: 10-30ms (cached or aggregated)
SSE:
  • Connection overhead: < 10ms
  • Event delivery: < 5ms
  • Concurrent connections: 100+ supported
Kafka consumption:
  • Lag: Typically 0-10 messages
  • Processing: 5-20ms per message

Monitoring

Logs:
[API] Kafka consumer connected to processed-comments
[API] Received processed comment: abc-123
[API] Saved comment to database: abc-123
[API] Emitted SSE event: comment
[API] New SSE connection from ::ffff:127.0.0.1
[API] SSE connection closed
Metrics:
  • Total comments in database
  • SSE connections active
  • Kafka consumer lag
  • REST endpoint latencies
  • Events emitted per second

Development

Run locally:
cd api
pnpm dev
Environment variables:
API_PORT=3001
API_CORS_ORIGIN=http://localhost:3000
KAFKA_BROKER=localhost:9092
POSTGRES_HOST=localhost

Testing

Test REST Endpoints

# Get comments
curl http://localhost:3001/api/comments
 
# Get with filters
curl "http://localhost:3001/api/comments?tag=positive&page=1&pageSize=5"
 
# Get statistics
curl http://localhost:3001/api/statistics
 
# Health check
curl http://localhost:3001/health

Test SSE

# Stream events (use curl or browser)
curl -N http://localhost:3001/api/sse/comments
 
# Should see events as they arrive:
# event: comment
# data: {...}

Verify Kafka Integration

# Publish test message to processed-comments
docker exec -it kafka-broker kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic processed-comments
 
# Type JSON:
{"commentId":"test","text":"Test","tag":"positive","source":"twitter","processedAt":"2026-03-26T10:00:00Z","retryCount":0}
 
# Check API logs - should see "Received processed comment: test"
# Check SSE stream - should emit event

Next Steps