Skip to content

Consumer Service

Overview

The Consumer service is the core processing engine that receives raw comments from Kafka, performs deduplication, calls sentiment analysis, stores results, and handles failures with retry logic.

Purpose

  • Process raw comments from raw-comments topic
  • Eliminate duplicates using Redis and LRU caching
  • Enrich comments with sentiment tags via gRPC
  • Store processed comments in PostgreSQL
  • Publish enriched data to processed-comments topic
  • Handle failures gracefully with retry and dead-letter queue

Architecture

┌─────────────────────────┐
│  Kafka: raw-comments    │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Deduplication Check    │
│  (Redis + LRU)          │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Sentiment gRPC Call    │
│  (with cache)           │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Save to PostgreSQL     │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Publish: processed-    │
│  comments               │
└─────────────────────────┘

            ▼ (on failure)
┌─────────────────────────┐
│  Retry Queue (5 max)    │
│  → Dead Letter Queue    │
└─────────────────────────┘

Key Components

1. Comment Deduplication (Redis)

Purpose: Prevents reprocessing the same comment multiple times

Location: consumer/src/redis.service.ts

Implementation:
async isCommentProcessed(commentId: string): Promise<boolean> {
  const key = `processed:${commentId}`
  const exists = await this.client.exists(key)
  return exists === 1
}
 
async markCommentAsProcessed(commentId: string): Promise<void> {
  const key = `processed:${commentId}`
  const ttl = process.env.REDIS_TTL || 10800  // 3 hours
  await this.client.setex(key, ttl, '1')
}
Configuration:
REDIS_TTL=10800  # 3 hours (10,800 seconds)
How it works:
  • Uses commentId (not text) as deduplication key
  • Key format: processed:{commentId}
  • If comment ID exists in Redis → skip processing
  • If not found → mark as processed and continue
  • 3-hour TTL balances memory usage vs duplicate window
Why commentId instead of text hash:
  • Same comment may be submitted multiple times (5% in producer)
  • Text hash would allow duplicate comments with different IDs
  • commentId is the source system's unique identifier

1.5. Sentiment Result Caching (LRU)

Purpose: Avoids redundant sentiment analysis for identical text

Location: consumer/src/cache.service.ts

Implementation:
hashText(text: string): string {
  return createHash('sha256')
    .update(text.toLowerCase().trim())
    .digest('hex')
}
 
getCachedSentiment(textHash: string): { tag: string, timestamp: number } | undefined {
  return this.textHashCache.get(textHash)
}
 
setCachedSentiment(textHash: string, tag: string): void {
  this.textHashCache.set(textHash, { tag, timestamp: Date.now() })
}
Configuration:
const textHashCache = new LRUCache({
  max: 100,  // CONSUMER_CACHE_SIZE
  ttl: 1000 * 60 * 60  // 1 hour
})
How it works:
  • Hash comment text (SHA256 of lowercase, trimmed)
  • Check cache for sentiment result
  • If hit → return cached tag, skip gRPC call
  • If miss → call sentiment service, cache result
Why separate from deduplication:
  • Different comments can have identical text
  • Caching sentiment results saves gRPC calls
  • LRU evicts least-recently-used entries when full
Processing Flow:
1. Check Redis: Is this commentId processed? 
   └─ Yes → Skip (duplicate comment)
   └─ No → Continue to step 2
 
2. Hash text, check LRU cache: Do we have sentiment for this text?
   └─ Yes → Use cached tag, skip gRPC
   └─ No → Call sentiment service, cache result

2. Sentiment Client Service

Location: consumer/src/sentiment-client.service.ts

Auto-registration on startup:
async onModuleInit() {
  try {
    const { consumerId } = await this.grpcService.RegisterConsumer({
      serviceName: 'consumer-service'
    })
    
    this.consumerId = consumerId
    this.logger.log(`Registered with sentiment service: ${consumerId}`)
  } catch (error) {
    this.logger.warn('Failed to register, will use unauthenticated mode')
    this.consumerId = 'unregistered'
  }
}
Sentiment Analysis Call:
async analyzeSentiment(text: string): Promise<SentimentTag> {
  const response = await this.grpcService.AnalyzeSentiment({
    consumerId: this.consumerId,
    text: text,
    timestamp: new Date().toISOString()
  })
  
  return response.tag  // positive | negative | neutral | unrelated
}
Rate Limits:
  • Registered (has consumerId): 100 requests/second
  • Unregistered: 10 requests/second

2.5. Consumer Registration

Purpose: Authenticate with sentiment service for higher rate limits

Location: consumer/src/sentiment-client.service.ts

Registration Flow:
async onModuleInit() {
  try {
    const { consumerId } = await this.grpcService.RegisterConsumer({
      serviceName: 'consumer-service'
    })
    
    this.consumerId = consumerId
    this.logger.log(`Registered with sentiment service: ${consumerId}`)
  } catch (error) {
    this.logger.warn('Failed to register, will use unauthenticated mode')
    this.consumerId = 'unregistered'
  }
}
What happens:
  • On service startup, consumer registers with sentiment service
  • Receives unique consumerId (UUID format)
  • Includes consumerId in all gRPC sentiment analysis requests
  • Falls back to "unregistered" if registration fails
Rate Limits:
  • Registered consumers: 100 requests/second
  • Unregistered: 10 requests/second
Database tracking:
  • consumerId is stored with each processed comment
  • Enables tracking which consumer instance processed each comment
  • Useful for debugging and monitoring distributed consumer instances

3. Retry Mechanism

Configuration:
CONSUMER_MAX_RETRIES=5
CONSUMER_RETRY_DELAY=1000  # Base delay in ms
Exponential Backoff:
Initial attempt: Immediate (no delay)
Retry attempt 1: 1s delay (1000ms * 2^0)
Retry attempt 2: 2s delay (1000ms * 2^1)
Retry attempt 3: 4s delay (1000ms * 2^2)
Retry attempt 4: 8s delay (1000ms * 2^3)
Retry attempt 5: 16s delay (1000ms * 2^4)
After max retries: → Dead Letter Queue

Formula: baseDelay * 2^(attempt - 1) where baseDelay = 1000ms

Implementation:
async processWithRetry(comment: RawComment, retryCount = 0): Promise<void> {
  try {
    const tag = await this.sentimentClient.analyzeSentiment(comment.text)
    await this.saveToDatabase({ ...comment, tag, retryCount })
    await this.publishProcessed({ ...comment, tag })
  } catch (error) {
    if (retryCount < MAX_RETRIES) {
      const delay = RETRY_DELAY * Math.pow(2, retryCount)
      await sleep(delay)
      return this.processWithRetry(comment, retryCount + 1)
    } else {
      await this.publishToDeadLetterQueue(comment, error)
    }
  }
}
Why exponential backoff:
  • Gives service time to recover
  • Prevents overwhelming failed service
  • Industry standard pattern

4. Database Storage

Entity: consumer/src/entities/processed-comment.entity.ts

@Entity('processed_comments')
export class ProcessedComment {
  @PrimaryGeneratedColumn()
  id: number
 
  @Column({ unique: true })
  commentId: string
 
  @Column()
  text: string
 
  @Column({ type: 'varchar', length: 64 })
  textHash: string  // SHA256 hash for sentiment caching
 
  @Column({ type: 'enum', enum: ['positive', 'negative', 'neutral', 'unrelated'] })
  tag: string
 
  @Column()
  source: string
 
  @Column({ type: 'timestamp' })
  processedAt: Date
 
  @Column()
  consumerId: string  // Which consumer instance processed it
 
  @Column({ default: 0 })
  retryCount: number
}

Indexes: commentId (unique), textHash, tag, processedAt

Key fields:
  • textHash: SHA256 hash used for sentiment result caching
  • consumerId: Tracks which consumer instance processed the comment
  • retryCount: Number of retry attempts before successful processing
Why these indexes:
  • commentId: Fast duplicate checks and ensures no reprocessing
  • textHash: Quick sentiment cache lookups
  • tag: Dashboard filters by sentiment
  • processedAt: "Last hour" statistics and time-based queries

5. Kafka Topics

Consumes from:
  • raw-comments - New comments from producer
  • retry-queue - Failed comments to retry
Publishes to:
  • processed-comments - Successfully processed comments
  • dead-letter-queue - Failed after max retries

Behavior Details

Message Processing Flow

Step-by-step:
  1. Receive message from raw-comments

    {
      "commentId": "abc-123",
      "text": "Great food!",
      "source": "twitter",
      "timestamp": "2026-03-26T10:30:00Z"
    }
  2. Check if duplicate
    • Hash text: sha256("Great food!")3a2f1b...
    • Check LRU cache for hash
    • Check Redis for hash
    • If found: Log "Duplicate detected" and skip
  3. Call Sentiment service
    const tag = await sentimentClient.analyzeSentiment("Great food!")
    // Returns: "positive"
  4. Save to database
    INSERT INTO comments (commentId, text, tag, source, processedAt, retryCount)
    VALUES ('abc-123', 'Great food!', 'positive', 'twitter', NOW(), 0)
  5. Publish to processed-comments
    {
      "commentId": "abc-123",
      "text": "Great food!",
      "tag": "positive",
      "source": "twitter",
      "processedAt": "2026-03-26T10:30:05Z",
      "retryCount": 0
    }

Failure Handling

Scenario 1: Sentiment service timeout
Attempt 1: gRPC call fails
Wait 1s
Attempt 2: gRPC call fails
Wait 2s
Attempt 3: gRPC call fails
Wait 4s
Attempt 4: gRPC call fails
Wait 8s
Attempt 5: gRPC call fails
→ Publish to dead-letter-queue
Scenario 2: Database connection lost
Attempt 1: Database save fails
Wait 1s
Attempt 2: Database save succeeds
→ Continue to publish processed-comments
Dead Letter Queue message:
{
  "originalMessage": { /* raw comment */ },
  "error": "gRPC call timeout after 5s",
  "retryCount": 5,
  "failedAt": "2026-03-26T10:35:00Z"
}

Standalone Mode

Bootstrap: consumer/src/main.ts

async function bootstrap() {
  const app = await NestJS.createMicroservice<MicroserviceOptions>({
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: [KAFKA_BROKER]
      },
      consumer: {
        groupId: 'restaurant-comments-consumer'
      }
    }
  })
  
  await app.listen()
  // No HTTP server
}
Why microservice mode:
  • Pure Kafka consumer
  • No HTTP endpoints needed
  • Lightweight

Performance Characteristics

Processing Rate:
  • Without duplicates: ~50-100 comments/second
  • With duplicates: ~200+ comments/second (cached lookups)
Latency breakdown:
  • Deduplication: 1-5ms (LRU) or 10-20ms (Redis)
  • gRPC call: 50-200ms (depends on text length)
  • Database save: 10-30ms
  • Kafka publish: 5-10ms

Total: ~75-265ms per comment

Monitoring

Logs to watch:
[Consumer] Processing comment: abc-123
[Consumer] Duplicate detected: def-456 (skipped)
[Consumer] Sentiment analysis: positive (took 125ms)
[Consumer] Saved to database: abc-123
[Consumer] Published to processed-comments: abc-123
[Consumer] Retry attempt 3/5: ghi-789
[Consumer] Moved to DLQ: jkl-012 (max retries exceeded)
Metrics:
  • Processed count
  • Duplicate count
  • Retry count
  • DLQ count
  • Average processing time
  • Cache hit rate

Development

Run locally:
cd consumer
pnpm dev
Run migrations:
cd consumer
pnpm run migration:run
Create new migration:
cd consumer
pnpm run migration:create AddIndexToComments

Testing

Manual Test - Send Comment

# Publish to raw-comments
docker exec -it kafka-broker kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic raw-comments
 
# Then type JSON:
{"commentId":"test-123","text":"Test comment","source":"twitter","timestamp":"2026-03-26T10:00:00Z"}

Check Processing

# View consumer logs
docker logs -f kafka-consumer
 
# Query database
docker exec kafka-postgres psql -U postgres -d restaurant_comments \
  -c "SELECT * FROM comments ORDER BY processedAt DESC LIMIT 5"
 
# Check processed-comments topic
docker exec -it kafka-broker kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic processed-comments \
  --from-beginning

Next Steps