Event Schema Documentation
Event Schema Documentation
Project:
{{PROJECT_NAME}}Bilko Version:{{VERSION}}0.1 Date:{{DATE}}2026-02-23 Author:{{AUTHOR}}Platform Architect Status: Draft| In Review | ApprovedReviewers:{{REVIEWERS}}Tech Lead, Security Reviewer
Document History
| Version | Date | Author | Changes |
|---|---|---|---|
| 0.1 | Initial draft |
1. Event-DrivenEvent Architecture Overview
Bilko is a modular monolith — there is no message broker (no Kafka, no RabbitMQ). Events are handled in two ways:
- Audit events — Every mutating database operation is captured by Prisma middleware and written to the
LoggedActiontable (append-only, immutable). - Domain events — Status transitions on Invoice and Expense trigger synchronous side effects (GL transactions, PDF generation, email delivery) within the same request/transaction boundary.
- Webhooks — Phase 2 feature: HTTP POST callbacks to registered URLs on Invoice status changes.
graph LR
subgraph "Publishers"Request UserService[Lifecycle"
Client["user-service"Next.js Frontend"]
OrderService[Handler["order-service"Route Handler"]
PaymentService[Service["payment-service"Service Layer"]
Engine["Accounting Engine"]
end
subgraph "MessagePersistence"
Broker"DB["PostgreSQL\n(Prisma)"]
Broker[Audit["{{Kafka / RabbitMQ / AWS SQS + SNS}}LoggedAction\n(Append-Only)"]
end
subgraph "Consumers"Side NotifService[Effects (Synchronous)"notification-service"
GL["GL Transaction\ndebit + credit"]
AnalyticsService[PDF["analytics-service"Puppeteer PDF\nR2 storage"]
SearchService[Email["search-service"]SendGrid\nemail AuditService["audit-service"delivery"]
end
UserServicesubgraph "Webhooks (Phase 2)"
WH["HTTP POST\nregistered URLs"]
end
Client --> Handler
Handler --> Service
Service --> Engine
Engine --> GL
Engine --> DB
Service --> PDF
Service --> Email
DB -->|user.*Prisma events|middleware| BrokerAudit
OrderServiceService -.->|order.*Phase events|2| BrokerWH
PaymentServicestyle -->|payment.*Audit events|fill:#dc2626,color:#fff
Brokerstyle BrokerDB -->|filtered|fill:#336791,color:#fff
NotifServicestyle BrokerWH -->|allstroke-dasharray:5 events| AnalyticsService
Broker -->|entity events| SearchService
Broker -->|all events| AuditService5
Event-driven use cases in this system:
Decoupled notifications (user.created → send welcome email)Search index updates (entity.updated → reindex)Audit trail (all mutations → audit log)Cross-service data sync (order.created → update inventory)
2. MessageAudit BrokerTrail Configuration— LoggedAction
Purpose
LoggedAction is the immutable audit trail required for financial compliance (GDPR, Serbian/Bosnian/Croatian accounting regulations). Every INSERT, UPDATE, and DELETE on audited tables is recorded.
Broker:Retention: storage. {{Apache7 Kafkayears |(regulatory RabbitMQrequirement). |After AWS7 SQS/SNSyears, |archived NATSto |cold Upstash Kafka}}Version:NEVER deleted. {{3.x}}
Hosting: {{Confluent Cloud / self-hosted / AWS MSK}}
TopicPrisma / Queue Naming ConventionSchema
model LoggedAction {{DOMAIN}
id String @id @default(uuid())
organizationId String
tableName String // Which model was mutated
userId String? // Who performed the action (null = system job)
action String // INSERT | UPDATE | DELETE
rowId String? // PK of the affected row
rowData Json? // Snapshot for DELETE actions
changedFields Json? // Only the changed fields for UPDATE actions
clientIp String? // Hashed client IP address
applicationName String @default("bilko-api")
createdAt DateTime @default(now())
organization Organization @relation(fields: [organizationId], references: [id])
@@index([organizationId, tableName, createdAt])
@@index([organizationId, userId])
}.{{ENTITY}}.{{ACTION}}
Examples:
user.user.created
order.order.status_changed
payment.invoice.generated
notification.email.sent
PatternImplementation
rules:
- Prisma
AllMiddlewarelowercase,//dot-separatedsrc/lib/prisma.ts
Domainimportprefix{ AsyncLocalStorage } from 'async_hooks' export const requestContext =servicenewnameAsyncLocalStorage<{ userId?: string organizationId?: string clientIp?: string }>(without)-service)const EntityAUDITED_MODELS =singular[noun'Invoice',
Action'InvoiceItem', 'Expense', 'Transaction', 'Contact', 'User', 'Organization', 'BankAccount', 'BankTransaction', 'Account', ] prisma.$use(async (params, next) => { const result =pastawaittensenext(params)verbif (created,updated,AUDITED_MODELS.includes(params.modeldeleted,??completed)'')
TopicAudited ConfigurationTables and Actions
|
No | Invoices are never hard-deleted | |||
|
draft |
||||
|
No | Expenses are never hard-deleted | |||
|
|||||
Contact |
Yes | Yes | Yes | Soft-delete preferred | |
User |
Yes | Yes | No | Users deactivated, not deleted | |
Organization |
Yes | Yes | No | Orgs deactivated, not deleted | |
Account |
Yes | Yes | No | Chart of accounts entries | |
BankAccount |
Yes | Yes | No | Bank account registrations | |
BankTransaction |
Yes | Yes | No | Imported bank statement rows |
LoggedAction TypeScript Interface
interface LoggedAction {
id: string // UUID
organizationId: string // Tenant scope
tableName: string // Model name (e.g. "Invoice")
userId: string | null // null = system cron job
action: 'INSERT' | 'UPDATE' | 'DELETE' | 'UPSERT'
rowId: string | null // UUID of affected row
rowData: Record<string, unknown> | null // Full snapshot on DELETE
changedFields: Record<string, unknown> | null // Only changed fields on UPDATE
clientIp: string | null // SHA-256 hashed IP — NEVER plain IP
applicationName: string // Always "bilko-api"
createdAt: string // ISO 8601 UTC
}
Example: Invoice Status Update
{
"id": "9f3a1bc2-...",
"organizationId": "org_abc123",
"tableName": "Invoice",
"userId": "usr_def456",
"action": "UPDATE",
"rowId": "inv_xyz789",
"rowData": null,
"changedFields": {
"status": "sent",
"sentAt": "2026-02-23T10:30:00.000Z",
"pdfUrl": "https://r2.bilko.io/invoices/org_abc123/INV-2026-001.pdf"
},
"clientIp": "a3f5c8d1e2b4...",
"applicationName": "bilko-api",
"createdAt": "2026-02-23T10:30:01.123Z"
}
Example: Transaction Created (Invoice Sent)
{
"id": "2e8b4f1a-...",
"organizationId": "org_abc123",
"tableName": "Transaction",
"userId": "usr_def456",
"action": "INSERT",
"rowId": "txn_ghi012",
"rowData": null,
"changedFields": null,
"clientIp": "a3f5c8d1e2b4...",
"applicationName": "bilko-api",
"createdAt": "2026-02-23T10:30:01.456Z"
}
Example: System Job (Overdue Invoice)
{
"id": "7c2d9e4b-...",
"organizationId": "org_abc123",
"tableName": "Invoice",
"userId": null,
"action": "UPDATE",
"rowId": "inv_abc001",
"rowData": null,
"changedFields": {
"status": "overdue"
},
"clientIp": null,
"applicationName": "bilko-api",
"createdAt": "2026-02-23T00:05:01.789Z"
}
3. Domain Events — Invoice Status Transitions
Invoice status transitions trigger synchronous side effects in the same request. These are not queued — they execute within the handler or are awaited before response.
Status Machine
stateDiagram-v2
[*] --> draft : POST /invoices
draft --> sent : POST /invoices/:id/send
sent --> viewed : Tracking pixel fired
sent --> paid : POST /invoices/:id/mark-paid
viewed --> paid : POST /invoices/:id/mark-paid
sent --> overdue : Cron job (dueDate < today)
viewed --> overdue : Cron job (dueDate < today)
overdue --> paid : POST /invoices/:id/mark-paid
draft --> cancelled : DELETE /invoices/:id
sent --> cancelled : POST /invoices/:id/cancel
viewed --> cancelled : POST /invoices/:id/cancel
overdue --> cancelled : POST /invoices/:id/cancel
Status Transition Event Payloads
invoice.created (draft)
Triggered: POST /api/v1/invoices
| Side Effect | Action |
|---|---|
| LoggedAction | INSERT on Invoice |
| Invoice number | Auto-generated INV-YYYY-NNN |
| GL Transaction | None |
interface InvoiceCreatedEvent {
invoiceId: string
organizationId: string
invoiceNumber: string // e.g. "INV-2026-001"
contactId: string
totalAmount: string // NUMERIC(19,4) as string
currency: string // ISO 4217 e.g. "RSD"
dueDate: string // ISO 8601 date
status: 'draft'
createdBy: string // userId
createdAt: string // ISO 8601 UTC
}
invoice.sent
Triggered: POST /api/v1/invoices/:id/send
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Invoice (status, sentAt, pdfUrl) |
| GL Transaction | INSERT: Debit 1200 Accounts Receivable / Credit 4000 Revenue |
| Puppeteer renders → uploaded to R2 → URL saved to invoice.pdfUrl | |
| SendGrid: invoice PDF to contact.email with tracking pixel |
interface InvoiceSentEvent {
invoiceId: string
organizationId: string
invoiceNumber: string
contactId: string
contactEmail: string
totalAmount: string // NUMERIC(19,4) as string
baseAmount: string // totalAmount * exchangeRate (locked at invoiceDate)
currency: string
exchangeRate: string // Locked at invoiceDate — NEVER changes
pdfUrl: string // R2 URL: invoices/{orgId}/INV-YYYY-NNN.pdf
transactionId: string // GL transaction ID created
debitAccountId: string // 1200 Accounts Receivable
creditAccountId: string // 4000 Revenue
sentBy: string // userId
sentAt: string // ISO 8601 UTC
}
GL Transaction created:
{
"id": "txn_abc123",
"organizationId": "org_abc123",
"debitAccountId": "acc_1200",
"creditAccountId": "acc_4000",
"amount": "1250.0000",
"currency": "RSD",
"baseAmount": "10.6400",
"exchangeRate": "0.0085",
"description": "Invoice INV-2026-001 sent",
"referenceType": "INVOICE",
"referenceId": "inv_xyz789",
"date": "2026-02-23",
"locked": false,
"createdAt": "2026-02-23T10:30:01.000Z"
}
invoice.viewed
Triggered: GET /api/v1/invoices/track/:trackingToken (tracking pixel)
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Invoice (status, viewedAt) |
| GL Transaction | None |
| None |
interface InvoiceViewedEvent {
invoiceId: string
organizationId: string
invoiceNumber: string
trackingToken: string
viewedAt: string // ISO 8601 UTC
}
invoice.paid
Triggered: PATCH /api/v1/invoices/:id/mark-paid
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Invoice (status, paidAt, paidAmount, paymentRef) |
| GL Transaction | INSERT: Debit 1000 Bank Account / Credit 1200 Accounts Receivable |
| None (future: payment receipt) | |
| Webhook (Phase 2) | POST to registered webhook URL |
interface InvoicePaidEvent {
invoiceId: string
organizationId: string
invoiceNumber: string
paidAmount: string // NUMERIC(19,4) as string
currency: string
paymentReference: string // Bank reference number
bankAccountId: string // Which bank account received payment
transactionId: string // GL transaction ID created
debitAccountId: string // 1000 Bank Account
creditAccountId: string // 1200 Accounts Receivable
markedPaidBy: string // userId
paidAt: string // ISO 8601 UTC
}
invoice.overdue
Triggered: Cron job overdue-invoices (daily 00:05 UTC)
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Invoice (status → overdue) |
| GL Transaction | None |
| None (future: overdue reminder) | |
| userId | null (system job) |
interface InvoiceOverdueEvent {
invoiceId: string
organizationId: string
invoiceNumber: string
dueDate: string // Date that has passed
daysPastDue: number // dueDate diff from today
markedOverdueAt: string // ISO 8601 UTC
}
invoice.cancelled
Triggered: POST /api/v1/invoices/:id/cancel
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Invoice (status → cancelled) |
| GL Transaction | Reversal — if invoice was sent, creates offsetting transaction |
| Retained in R2 (not deleted) |
interface InvoiceCancelledEvent {
invoiceId: string
organizationId: string
invoiceNumber: string
previousStatus: 'draft' | 'sent' | 'viewed' | 'overdue'
reversalTransactionId: string | null // null if cancelled from draft
cancelledBy: string // userId
cancelledAt: string // ISO 8601 UTC
}
Reversal GL Transaction (if invoice was sent):
If previousStatus was 'sent' or 'viewed':
Debit 4000 Revenue / Credit 1200 Accounts Receivable
(reversal of the original sent transaction)
4. Domain Events — Expense Status Transitions
Status Machine
stateDiagram-v2
[*] --> pending : POST /expenses
pending --> approved : POST /expenses/:id/approve
pending --> rejected : POST /expenses/:id/reject
approved --> paid : POST /expenses/:id/mark-paid
Status Transition Event Payloads
expense.created
Triggered: POST /api/v1/expenses
| Side Effect | Action |
|---|---|
| LoggedAction | INSERT on Expense |
| GL Transaction | None |
| Receipt scan | ClamAV virus scan on attachment |
interface ExpenseCreatedEvent {
expenseId: string
organizationId: string
expenseNumber: string // EXP-YYYY-NNN
amount: string // NUMERIC(19,4) as string
currency: string
categoryAccountId: string // 5xxx Expense account
receiptUrl: string | null
submittedBy: string // userId
createdAt: string
}
expense.approved
Triggered: POST /api/v1/expenses/:id/approve
Requires role: admin or owner
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Expense (status, approvedBy, approvedAt) |
| GL Transaction | INSERT: Debit 5xxx Expense / Credit 2000 Accounts Payable |
interface ExpenseApprovedEvent {
expenseId: string
organizationId: string
expenseNumber: string
amount: string // NUMERIC(19,4) as string
currency: string
transactionId: string // GL transaction ID
debitAccountId: string // 5xxx Expense account (category-specific)
creditAccountId: string // 2000 Accounts Payable
approvedBy: string // userId (admin or owner)
approvedAt: string
}
expense.rejected
Triggered: POST /api/v1/expenses/:id/reject
Requires role: admin or owner
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Expense (status, rejectedBy, rejectedAt, rejectReason) |
| GL Transaction | None |
interface ExpenseRejectedEvent {
expenseId: string
organizationId: string
expenseNumber: string
rejectReason: string
rejectedBy: string
rejectedAt: string
}
expense.paid
Triggered: POST /api/v1/expenses/:id/mark-paid
Requires role: admin or owner
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on Expense (status, paidAt) |
| GL Transaction | INSERT: Debit 2000 Accounts Payable / Credit 1000 Bank Account |
interface ExpensePaidEvent {
expenseId: string
organizationId: string
expenseNumber: string
amount: string
currency: string
bankAccountId: string // Bank account that paid
transactionId: string // GL transaction ID
debitAccountId: string // 2000 Accounts Payable
creditAccountId: string // 1000 Bank Account
paidBy: string
paidAt: string
}
5. Domain Events — Banking
bank.transaction.imported
Triggered: POST /api/v1/banking/accounts/:id/import (CSV upload)
| Side Effect | Action |
|---|---|
| LoggedAction | INSERT on BankTransaction (per row) |
| ClamAV scan | Virus check on uploaded CSV |
| Auto-match | Attempt to match against open GL transactions |
interface BankTransactionImportedEvent {
organizationId: string
bankAccountId: string
importedCount: number
matchedCount: number // Auto-matched to GL transactions
unmatchedCount: number // Require manual reconciliation
importedBy: string
importedAt: string
}
bank.transaction.reconciled
Triggered: POST /api/v1/banking/transactions/:id/reconcile
| Side Effect | Action |
|---|---|
| LoggedAction | UPDATE on BankTransaction (reconciled, glTransactionId) |
| LoggedAction | UPDATE on Transaction (locked = true) |
interface BankTransactionReconciledEvent {
bankTransactionId: string
organizationId: string
glTransactionId: string // Linked GL Transaction
matchScore: number // 0-100 confidence score
matchType: 'AUTO' | 'MANUAL'
reconciledBy: string // userId (SYSTEM for auto-match)
reconciledAt: string
}
6. Webhook Events (Phase 2)
Webhooks are outbound HTTP POST callbacks to URLs registered per organization. They are not implemented in Phase 1 — documented here for future implementation.
Registration
POST /api/v1/organizations/webhooks
{
"url": "https://partner.example.com/bilko-webhook",
"secret": "whsec_...",
"events": ["invoice.sent", "invoice.paid", "invoice.cancelled"]
}
Delivery
- Method: HTTP POST
- Content-Type:
application/json - Signature header:
X-Bilko-Signature: sha256=<HMAC-SHA256(payload, secret)> - Timeout: 10 seconds
- Retries: 3× exponential backoff (1s, 4s, 16s) then DLQ
- Retry trigger: Non-2xx response or timeout
Webhook Envelope
{
"id": "wh_01HX7M2K5N3P4Q5R6S7T8V9W0",
"type": "invoice.paid",
"organizationId": "org_abc123",
"apiVersion": "2026-02-23",
"createdAt": "2026-02-23T10:30:01.000Z",
"data": {
"invoiceId": "inv_xyz789",
"invoiceNumber": "INV-2026-001",
"paidAmount": "1250.0000",
"currency": "RSD",
"paidAt": "2026-02-23T10:30:00.000Z"
}
}
Supported Webhook Events (Phase 2 Scope)
| Event Type | Trigger |
|---|---|
invoice.sent |
Invoice status → sent |
invoice.viewed |
Invoice tracking pixel fired |
invoice.paid |
Invoice marked paid |
invoice.overdue |
Invoice auto-marked overdue by cron |
invoice.cancelled |
Invoice cancelled |
expense.approved |
Expense approved |
expense.paid |
Expense marked paid |
Signature Verification (Consumer Code)
import { createHmac } from 'crypto'
function verifyWebhookSignature(
payload: string,
signature: string,
secret: string
): boolean {
const expected = createHmac('sha256', secret)
.update(payload)
.digest('hex')
return `sha256=${expected}` === signature
}
7. GL Transaction Event Mapping
Every financial state transition that creates a GL transaction follows this pattern:
| Event | Debit Account | Credit Account | Reference Type |
|---|---|---|---|
| Invoice sent | 1200 Accounts Receivable | 4000 Revenue | INVOICE |
| Invoice paid | 1000 Bank Account | 1200 Accounts Receivable | INVOICE |
| Invoice cancelled (was sent) | 4000 Revenue | 1200 Accounts Receivable | INVOICE_REVERSAL |
| Expense approved | 5xxx Expense Account | 2000 Accounts Payable | EXPENSE |
| Expense paid | 2000 Accounts Payable | 1000 Bank Account | EXPENSE |
Transaction immutability rules:
- Once a Transaction is created, it is never updated or deleted
- After reconciliation (
locked = true), even corrections require a new offsetting transaction LoggedActionrecords INSERT on every Transaction creation
8. Event Naming Conventions
| Component | Rule | Bilko Examples | |
|---|---|---|---|
| lowercase |
| | |
, , |
|||
| Entity | , , |
||
| Action | created, , , , reconciled |
||
| Full event | {domain}.{action} |
invoice.sent, expense.approved |
Do NOT use:
Present tense (user.user.create— wrong)Generic names (user.user.changed— too vague)Abbreviations (usr.usr.crtd— unreadable)
4. Event Envelope Format (CloudEvents 1.0)
{
"specversion": "1.0",
"type": "{{DOMAIN}}.{{ENTITY}}.{{ACTION}}",
"source": "{{SERVICE_NAME}}",
"id": "evt_01HX7M2K5N3P4Q5R6S7T8V9W0",
"time": "2024-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "{{optional: entity ID}}",
"data": {
"{{field}}": "{{value}}"
}
}
Envelope fields:
| | ||
| |||
| |||
| |||
| |||
| | ||
| |||
|
TypeScript interface:
interface CloudEvent<T = Record<string, unknown>> {
specversion: '1.0';
type: string;
source: string;
id: string;
time: string;
datacontenttype: 'application/json';
subject?: string;
data: T;
}
5. Per-Event Documentation
5.1 User Service Events
user.user.created
Published when a new user account is created.
| |
| |
| |
| |
Payload schema (JSON Schema):
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"required": ["userId", "email", "name", "role", "createdAt"],
"properties": {
"userId": { "type": "string", "format": "uuid" },
"email": { "type": "string", "format": "email" },
"name": { "type": "string", "minLength": 1 },
"role": { "type": "string", "enum": ["admin", "user", "viewer"] },
"createdAt": { "type": "string", "format": "date-time" }
}
}
Example event:
{
"specversion": "1.0",
"type": "user.user.created",
"source": "user-service",
"id": "evt_01HX7M2K5N3P4Q5R6S7T8V9W0",
"time": "2024-01-15T10:30:00.000Z",
"datacontenttype": "application/json",
"subject": "usr_01HX7...",
"data": {
"userId": "usr_01HX7...",
"email": "[email protected]",
"name": "Jane Doe",
"role": "user",
"createdAt": "2024-01-15T10:30:00.000Z"
}
}
user.user.updated
Published when user profile data changes.
| |
| |
| |
|
Payload schema:
{
"type": "object",
"required": ["userId", "updatedFields", "updatedAt"],
"properties": {
"userId": { "type": "string" },
"updatedFields": {
"type": "array",
"items": { "type": "string" },
"description": "List of field names that changed"
},
"before": { "type": "object", "description": "Previous values (only changed fields)" },
"after": { "type": "object", "description": "New values (only changed fields)" },
"updatedAt": { "type": "string", "format": "date-time" }
}
}
Example event:
{
"specversion": "1.0",
"type": "user.user.updated",
"source": "user-service",
"id": "evt_01HX8...",
"time": "2024-01-16T08:00:00.000Z",
"datacontenttype": "application/json",
"subject": "usr_01HX7...",
"data": {
"userId": "usr_01HX7...",
"updatedFields": ["name"],
"before": { "name": "Jane Doe" },
"after": { "name": "Jane Smith" },
"updatedAt": "2024-01-16T08:00:00.000Z"
}
}
user.user.deleted
Published when a user account is soft-deleted.
| |
| |
| |
Example event:
{
"specversion": "1.0",
"type": "user.user.deleted",
"source": "user-service",
"id": "evt_01HX9...",
"time": "2024-01-17T12:00:00.000Z",
"datacontenttype": "application/json",
"subject": "usr_01HX7...",
"data": {
"userId": "usr_01HX7...",
"deletedAt": "2024-01-17T12:00:00.000Z",
"reason": "user_requested"
}
}
5.2 Order Service Events
order.order.created
| |
| |
| |
Payload schema:
{
"type": "object",
"required": ["orderId", "userId", "items", "total", "currency", "createdAt"],
"properties": {
"orderId": { "type": "string" },
"userId": { "type": "string" },
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"productId": { "type": "string" },
"quantity": { "type": "integer" },
"unitPrice": { "type": "number" }
}
}
},
"total": { "type": "number" },
"currency": { "type": "string", "pattern": "^[A-Z]{3}$" },
"createdAt": { "type": "string", "format": "date-time" }
}
}
5.3 {{DOMAIN}} Service Events
{{domain}}.{{entity}}.{{action}}
| |
| |
| |
|
Payload schema: TODO: Define JSON Schema
Example event: TODO: Add example
6. Dead Letter Queue Handling
DLQ naming: {{topic}}.dlq — e.g., user.user.created.dlq
DLQ workflow:
Event Published
↓
Consumer processes
↓ Fails
Retry (exp. backoff: 1s, 2s, 4s, 8s, 16s — max 5 retries)
↓ All retries exhausted
Move to DLQ
↓
Alert fires: PagerDuty P3
↓
On-call investigates
↓
Option A: Fix consumer bug → replay from DLQ
Option B: Skip message (data was invalid) → log + discard
DLQ message format:
{
"originalEvent": { /* original CloudEvent */ },
"failureReason": "Consumer threw: Cannot read property 'id' of undefined",
"attemptCount": 5,
"firstFailedAt": "2024-01-15T10:30:00Z",
"lastFailedAt": "2024-01-15T10:32:00Z",
"consumerGroup": "notification-service-consumer"
}
DLQ retention: 14 days.
DLQ alert threshold: > 10 messages in DLQ within 5 minutes.
7. Event Versioning Strategy
Strategy: Backward-compatible field addition + major version in event type.
Rules:
Adding optional fields: allowed without version bumpRemoving fields: NOT allowed (use deprecation first, remove after all consumers updated)Changing field types: NOT allowed (breaking change)Adding required fields: requires version bumpMajor breaking change: new event typeuser.user.created.v2
Deprecation process:
1. Mark field as deprecated in schema docs
2. Notify all consumer teams
3. Wait 2 sprint cycles (4 weeks minimum)
4. Remove field from schema
5. Update documentation
Schema registry: {{Confluent Schema Registry | AWS Glue Schema Registry | Manual docs}}
Validation: Consumer validates incoming events against pinned schema version.
8. Event Replay Capability
Replay supported: {{Yes — Kafka log retention | No — events are ephemeral}}
Replay scenarios:
Bug in consumer → fix bug → replay affected time windowNew consumer onboarded → replay historical events to build initial stateData migration → replay events to new storage
Replay procedure:
Identify topic and time range to replayCoordinate with all consumer teams (replay may cause duplicate side effects)Ensure consumers are idempotent before replaySet consumer offset to target timestamp:kafka-consumer-groups --reset-offsets --to-datetimeRestart consumer with temporary consumer group to avoid affecting production offsetVerify replayed state is correctSwitch production consumer to new state
Retention periods by topic: See topic configuration table in Section 2.
9. Monitoring & Observability
| What to Watch | Alert Threshold | |||
|---|---|---|---|---|
→ P1 |
||||
→ P1 |
||||
| Puppeteer error |
> | |||
| SendGrid 4xx/5xx | > | → P2 | ||
| DLQ depth > 10 → P3 | |
Dashboard:Log pattern for every domain event:
logger.info('invoice.sent', {{https://monitoring.domain.com/dashboards/events}invoiceId, organizationId, transactionId, pdfUrl, durationMs: Date.now() - startTime, })Distributed tracing:All events carrytraceparentheader (OpenTelemetry W3C Trace Context).
10. Testing Event-DrivenDomain FlowsEvents
Unit TestsTest Pattern
// Test producer: verify event shape
it('should publishcreate user.createdGL eventtransaction withwhen correctinvoice schema',is async () => {
await userService.create(createUserDto);
expect(eventBus.publish).toHaveBeenCalledWith(
expect.objectContaining({
type: 'user.user.created',
source: 'user-service',
data: expect.objectContaining({
userId: expect.any(String),
email: createUserDto.email,
}),
})
);
});
// Test consumer: verify handler idempotency
it('should not send welcome email twice for duplicate event'sent', async () => {
const eventinvoice = buildUserCreatedEvent();await createDraftInvoice(orgId)
await handler.handle(event);invoicesService.sendInvoice(invoice.id, userId)
const transaction = await handler.handle(event);prisma.transaction.findFirst({
//where: duplicate{ expect(emailService.send).toHaveBeenCalledTimes(1);referenceId: invoice.id, referenceType: 'INVOICE' }
});
expect(transaction).toBeTruthy()
expect(transaction!.debitAccountId).toBe(ACCOUNTS_RECEIVABLE_ID)
expect(transaction!.creditAccountId).toBe(REVENUE_ACCOUNT_ID)
expect(transaction!.amount).toBe(invoice.totalAmount)
})
it('should write LoggedAction on invoice status change', async () => {
const invoice = await createDraftInvoice(orgId)
await invoicesService.sendInvoice(invoice.id, userId)
const auditEntry = await prisma.loggedAction.findFirst({
where: { tableName: 'Invoice', rowId: invoice.id, action: 'UPDATE' },
orderBy: { createdAt: 'desc' },
})
expect(auditEntry).toBeTruthy()
expect(auditEntry!.changedFields).toMatchObject({ status: 'sent' })
expect(auditEntry!.userId).toBe(userId)
})
Integration TestsTest: Full Invoice Lifecycle
// Use real broker in integration tests (testcontainers)
const kafka = await new KafkaContainer('confluentinc/cp-kafka:7.5.0').start();
E2E Tests
Test full event chain: API action → event published → consumer processes → side effect visible.
POST /usersapi/v1/invoices → poll201 draft created
POST /api/v1/invoices/:id/send → 200 sent
↳ Assert: LoggedAction INSERT (Invoice, status=sent)
↳ Assert: Transaction created (Debit 1200, Credit 4000)
↳ Assert: PDF URL set on invoice
↳ Assert: SendGrid sandbox received email
GET /api/v1/invoices/track/:token → 200 viewed
↳ Assert: invoice.status = 'viewed'
PATCH /api/v1/invoices/:id/mark-paid → 200 paid
↳ Assert: LoggedAction INSERT (Invoice, status=paid)
↳ Assert: Second Transaction (Debit 1000, Credit 1200)
↳ Assert: Total LoggedAction rows for welcomeinvoice email= 3 (SendGridcreated sandbox)+ →sent assert+ received within 5spaid)
ApprovalRelated Documents
| Design | |||