Skip to main content

Event Schema Documentation

Event Schema Documentation

Project: Bilko{{PROJECT_NAME}} Version: 0.1{{VERSION}} Date: 2026-02-23{{DATE}} Author: Platform Architect{{AUTHOR}} Status: Draft | In Review | Approved Reviewers: Tech Lead, Security Reviewer{{REVIEWERS}}

Document History

Version Date Author Changes
0.1 2026-02-23{{DATE}} Platform Architect{{AUTHOR}} Initial draft

1. EventEvent-Driven Architecture Overview

Bilko is a modular monolith — there is no message broker (no Kafka, no RabbitMQ). Events are handled in two ways:

  1. Audit events — Every mutating database operation is captured by Prisma middleware and written to the LoggedAction table (append-only, immutable).
  2. Domain events — Status transitions on Invoice and Expense trigger synchronous side effects (GL transactions, PDF generation, email delivery) within the same request/transaction boundary.
  3. Webhooks — Phase 2 feature: HTTP POST callbacks to registered URLs on Invoice status changes.
graph LR
    subgraph "RequestPublishers"
        Lifecycle"
        Client[UserService["Next.js Frontend"user-service"]
        Handler[OrderService["Route Handler"order-service"]
        Service[PaymentService["Service Layer"]
        Engine["Accounting Engine"payment-service"]
    end

    subgraph "Persistence"Message DB[Broker"
        Broker["PostgreSQL\n(Prisma)"]{{Kafka Audit["LoggedAction\n(Append-Only)/ RabbitMQ / AWS SQS + SNS}}"]
    end

    subgraph "SideConsumers"
        Effects (Synchronous)NotifService["
        GL["GL Transaction\ndebit + credit"notification-service"]
        PDF[AnalyticsService["Puppeteer PDF\nR2 storage"analytics-service"]
        Email[SearchService["SendGrid\nemailsearch-service"]
        delivery"AuditService["audit-service"]
    end

    subgraph "Webhooks (Phase 2)"
        WH["HTTP POST\nregistered URLs"]
    end

    Client --> Handler
    Handler --> Service
    Service --> Engine
    Engine --> GL
    Engine --> DB
    Service --> PDF
    Service --> Email
    DBUserService -->|Prismauser.* middleware|events| AuditBroker
    ServiceOrderService -.->|Phaseorder.* 2|events| WHBroker
    stylePaymentService Audit-->|payment.* fill:#dc2626,color:#fffevents| styleBroker

    DBBroker fill:#336791,color:#fff-->|filtered| styleNotifService
    WHBroker stroke-dasharray:5-->|all 5events| AnalyticsService
    Broker -->|entity events| SearchService
    Broker -->|all events| AuditService

Event-driven use cases in this system:

  • Decoupled notifications (user.created → send welcome email)
  • Search index updates (entity.updated → reindex)
  • Audit trail (all mutations → audit log)
  • Cross-service data sync (order.created → update inventory)

2. AuditMessage TrailBroker — LoggedActionConfiguration

Purpose

LoggedAction is the immutable audit trail required for financial compliance (GDPR, Serbian/Bosnian/Croatian accounting regulations). Every INSERT, UPDATE, and DELETE on audited tables is recorded.

Retention:Broker: 7{{Apache yearsKafka (regulatory| requirement).RabbitMQ After| 7AWS years,SQS/SNS archived| toNATS cold| storage.Upstash Kafka}} NEVERVersion: deleted.{{3.x}} Hosting: {{Confluent Cloud / self-hosted / AWS MSK}}

PrismaTopic Schema/ Queue Naming Convention

model{{DOMAIN}}.{{ENTITY}}.{{ACTION}}

LoggedActionExamples:
  {user.user.created
  idorder.order.status_changed
  Stringpayment.invoice.generated
  @id @default(uuid())
  organizationId  String
  tableName       String                        // Which model was mutated
  userId          String?                       // Who performed the action (null = system job)
  action          String                        // INSERT | UPDATE | DELETE
  rowId           String?                       // PK of the affected row
  rowData         Json?                         // Snapshot for DELETE actions
  changedFields   Json?                         // Only the changed fields for UPDATE actions
  clientIp        String?                       // Hashed client IP address
  applicationName String   @default("bilko-api")
  createdAt       DateTime @default(now())

  organization    Organization @relation(fields: [organizationId], references: [id])

  @@index([organizationId, tableName, createdAt])
  @@index([organizationId, userId])
}notification.email.sent

Implementation

Pattern rules:

Prisma
    Middleware

  • All
    //lowercase, src/lib/prisma.tsdot-separated
  • import
  • Domain { AsyncLocalStorage } from 'async_hooks' export const requestContextprefix = newservice AsyncLocalStorage<{name userId?:(without string-service)
  • organizationId?: string clientIp?: string }>() const AUDITED_MODELS
  • Entity = [singular 'Invoice',noun
  • 'InvoiceItem', 'Expense', 'Transaction', 'Contact', 'User', 'Organization', 'BankAccount', 'BankTransaction', 'Account', ] prisma.$use(async (params, next) => { const result
  • Action = awaitpast next(params)tense ifverb (created, AUDITED_MODELS.includes(params.modelupdated, ??deleted, '')completed)
  • && ['create', 'update', 'delete', 'upsert'].includes(params.action) ) { const ctx = requestContext.getStore() const actionMap: Record<string, string> = { create: 'INSERT', update: 'UPDATE', delete: 'DELETE', upsert: 'UPSERT', } await prisma.loggedAction.create({ data: { organizationId: ctx?.organizationId ?? 'SYSTEM', tableName: params.model!, userId: ctx?.userId ?? null, action: actionMap[params.action], rowId: result?.id ?? params.args?.where?.id ?? null, rowData: params.action === 'delete' ? params.args.where : null, changedFields: params.action === 'update' ? params.args.data : null, clientIp: ctx?.clientIp ?? null, applicationName: 'bilko-api', }, }) } return result })

    AuditedTopic Tables and ActionsConfiguration

    TableTopic INSERTPartitions UPDATEReplication DELETERetention NotesCompaction
    Invoiceuser.user.* Yes6 Yes37 days NoInvoices are never hard-deleted
    InvoiceItemorder.order.* Yes12 Yes3 Yes30 days Only while invoice is draftNo
    Expensepayment.invoice.* Yes6 Yes390 days NoExpenses are never hard-deleted
    Transaction*.*.deleted Yes6 No3 No30 days GLLog entries are immutable once created
    ContactYesYesYesSoft-delete preferred
    UserYesYesNoUsers deactivated, not deleted
    OrganizationYesYesNoOrgs deactivated, not deleted
    AccountYesYesNoChart of accounts entries
    BankAccountYesYesNoBank account registrations
    BankTransactionYesYesNoImported bank statement rowscompaction

    LoggedAction TypeScript Interface

    interface LoggedAction {
      id: string                    // UUID
      organizationId: string        // Tenant scope
      tableName: string             // Model name (e.g. "Invoice")
      userId: string | null         // null = system cron job
      action: 'INSERT' | 'UPDATE' | 'DELETE' | 'UPSERT'
      rowId: string | null          // UUID of affected row
      rowData: Record<string, unknown> | null   // Full snapshot on DELETE
      changedFields: Record<string, unknown> | null  // Only changed fields on UPDATE
      clientIp: string | null       // SHA-256 hashed IP — NEVER plain IP
      applicationName: string       // Always "bilko-api"
      createdAt: string             // ISO 8601 UTC
    }
    

    Example: Invoice Status Update

    {
      "id": "9f3a1bc2-...",
      "organizationId": "org_abc123",
      "tableName": "Invoice",
      "userId": "usr_def456",
      "action": "UPDATE",
      "rowId": "inv_xyz789",
      "rowData": null,
      "changedFields": {
        "status": "sent",
        "sentAt": "2026-02-23T10:30:00.000Z",
        "pdfUrl": "https://r2.bilko.io/invoices/org_abc123/INV-2026-001.pdf"
      },
      "clientIp": "a3f5c8d1e2b4...",
      "applicationName": "bilko-api",
      "createdAt": "2026-02-23T10:30:01.123Z"
    }
    

    Example: Transaction Created (Invoice Sent)

    {
      "id": "2e8b4f1a-...",
      "organizationId": "org_abc123",
      "tableName": "Transaction",
      "userId": "usr_def456",
      "action": "INSERT",
      "rowId": "txn_ghi012",
      "rowData": null,
      "changedFields": null,
      "clientIp": "a3f5c8d1e2b4...",
      "applicationName": "bilko-api",
      "createdAt": "2026-02-23T10:30:01.456Z"
    }
    

    Example: System Job (Overdue Invoice)

    {
      "id": "7c2d9e4b-...",
      "organizationId": "org_abc123",
      "tableName": "Invoice",
      "userId": null,
      "action": "UPDATE",
      "rowId": "inv_abc001",
      "rowData": null,
      "changedFields": {
        "status": "overdue"
      },
      "clientIp": null,
      "applicationName": "bilko-api",
      "createdAt": "2026-02-23T00:05:01.789Z"
    }
    

    3. Domain Events — Invoice Status Transitions

    Invoice status transitions trigger synchronous side effects in the same request. These are not queued — they execute within the handler or are awaited before response.

    Status Machine

    stateDiagram-v2
        [*] --> draft : POST /invoices
        draft --> sent : POST /invoices/:id/send
        sent --> viewed : Tracking pixel fired
        sent --> paid : POST /invoices/:id/mark-paid
        viewed --> paid : POST /invoices/:id/mark-paid
        sent --> overdue : Cron job (dueDate < today)
        viewed --> overdue : Cron job (dueDate < today)
        overdue --> paid : POST /invoices/:id/mark-paid
        draft --> cancelled : DELETE /invoices/:id
        sent --> cancelled : POST /invoices/:id/cancel
        viewed --> cancelled : POST /invoices/:id/cancel
        overdue --> cancelled : POST /invoices/:id/cancel
    

    Status Transition Event Payloads

    invoice.created (draft)

    Triggered: POST /api/v1/invoices

    Side EffectAction
    LoggedActionINSERT on Invoice
    Invoice numberAuto-generated INV-YYYY-NNN
    GL TransactionNone
    interface InvoiceCreatedEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string     // e.g. "INV-2026-001"
      contactId: string
      totalAmount: string       // NUMERIC(19,4) as string
      currency: string          // ISO 4217 e.g. "RSD"
      dueDate: string           // ISO 8601 date
      status: 'draft'
      createdBy: string         // userId
      createdAt: string         // ISO 8601 UTC
    }
    

    invoice.sent

    Triggered: POST /api/v1/invoices/:id/send

    Side EffectAction
    LoggedActionUPDATE on Invoice (status, sentAt, pdfUrl)
    GL TransactionINSERT: Debit 1200 Accounts Receivable / Credit 4000 Revenue
    PDFPuppeteer renders → uploaded to R2 → URL saved to invoice.pdfUrl
    EmailSendGrid: invoice PDF to contact.email with tracking pixel
    interface InvoiceSentEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string
      contactId: string
      contactEmail: string
      totalAmount: string           // NUMERIC(19,4) as string
      baseAmount: string            // totalAmount * exchangeRate (locked at invoiceDate)
      currency: string
      exchangeRate: string          // Locked at invoiceDate — NEVER changes
      pdfUrl: string                // R2 URL: invoices/{orgId}/INV-YYYY-NNN.pdf
      transactionId: string         // GL transaction ID created
      debitAccountId: string        // 1200 Accounts Receivable
      creditAccountId: string       // 4000 Revenue
      sentBy: string                // userId
      sentAt: string                // ISO 8601 UTC
    }
    

    GL Transaction created:

    {
      "id": "txn_abc123",
      "organizationId": "org_abc123",
      "debitAccountId": "acc_1200",
      "creditAccountId": "acc_4000",
      "amount": "1250.0000",
      "currency": "RSD",
      "baseAmount": "10.6400",
      "exchangeRate": "0.0085",
      "description": "Invoice INV-2026-001 sent",
      "referenceType": "INVOICE",
      "referenceId": "inv_xyz789",
      "date": "2026-02-23",
      "locked": false,
      "createdAt": "2026-02-23T10:30:01.000Z"
    }
    

    invoice.viewed

    Triggered: GET /api/v1/invoices/track/:trackingToken (tracking pixel)

    Side EffectAction
    LoggedActionUPDATE on Invoice (status, viewedAt)
    GL TransactionNone
    EmailNone
    interface InvoiceViewedEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string
      trackingToken: string
      viewedAt: string    // ISO 8601 UTC
    }
    

    invoice.paid

    Triggered: PATCH /api/v1/invoices/:id/mark-paid

    Side EffectAction
    LoggedActionUPDATE on Invoice (status, paidAt, paidAmount, paymentRef)
    GL TransactionINSERT: Debit 1000 Bank Account / Credit 1200 Accounts Receivable
    EmailNone (future: payment receipt)
    Webhook (Phase 2)POST to registered webhook URL
    interface InvoicePaidEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string
      paidAmount: string          // NUMERIC(19,4) as string
      currency: string
      paymentReference: string    // Bank reference number
      bankAccountId: string       // Which bank account received payment
      transactionId: string       // GL transaction ID created
      debitAccountId: string      // 1000 Bank Account
      creditAccountId: string     // 1200 Accounts Receivable
      markedPaidBy: string        // userId
      paidAt: string              // ISO 8601 UTC
    }
    

    invoice.overdue

    Triggered: Cron job overdue-invoices (daily 00:05 UTC)

    Side EffectAction
    LoggedActionUPDATE on Invoice (status → overdue)
    GL TransactionNone
    EmailNone (future: overdue reminder)
    userIdnull (system job)
    interface InvoiceOverdueEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string
      dueDate: string         // Date that has passed
      daysPastDue: number     // dueDate diff from today
      markedOverdueAt: string // ISO 8601 UTC
    }
    

    invoice.cancelled

    Triggered: POST /api/v1/invoices/:id/cancel

    Side EffectAction
    LoggedActionUPDATE on Invoice (status → cancelled)
    GL TransactionReversal — if invoice was sent, creates offsetting transaction
    PDFRetained in R2 (not deleted)
    interface InvoiceCancelledEvent {
      invoiceId: string
      organizationId: string
      invoiceNumber: string
      previousStatus: 'draft' | 'sent' | 'viewed' | 'overdue'
      reversalTransactionId: string | null  // null if cancelled from draft
      cancelledBy: string                    // userId
      cancelledAt: string                    // ISO 8601 UTC
    }
    

    Reversal GL Transaction (if invoice was sent):

    If previousStatus was 'sent' or 'viewed':
      Debit 4000 Revenue / Credit 1200 Accounts Receivable
      (reversal of the original sent transaction)
    

    4. Domain Events — Expense Status Transitions

    Status Machine

    stateDiagram-v2
        [*] --> pending : POST /expenses
        pending --> approved : POST /expenses/:id/approve
        pending --> rejected : POST /expenses/:id/reject
        approved --> paid : POST /expenses/:id/mark-paid
    

    Status Transition Event Payloads

    expense.created

    Triggered: POST /api/v1/expenses

    Side EffectAction
    LoggedActionINSERT on Expense
    GL TransactionNone
    Receipt scanClamAV virus scan on attachment
    interface ExpenseCreatedEvent {
      expenseId: string
      organizationId: string
      expenseNumber: string       // EXP-YYYY-NNN
      amount: string              // NUMERIC(19,4) as string
      currency: string
      categoryAccountId: string   // 5xxx Expense account
      receiptUrl: string | null
      submittedBy: string         // userId
      createdAt: string
    }
    

    expense.approved

    Triggered: POST /api/v1/expenses/:id/approve Requires role: admin or owner

    Side EffectAction
    LoggedActionUPDATE on Expense (status, approvedBy, approvedAt)
    GL TransactionINSERT: Debit 5xxx Expense / Credit 2000 Accounts Payable
    interface ExpenseApprovedEvent {
      expenseId: string
      organizationId: string
      expenseNumber: string
      amount: string              // NUMERIC(19,4) as string
      currency: string
      transactionId: string       // GL transaction ID
      debitAccountId: string      // 5xxx Expense account (category-specific)
      creditAccountId: string     // 2000 Accounts Payable
      approvedBy: string          // userId (admin or owner)
      approvedAt: string
    }
    

    expense.rejected

    Triggered: POST /api/v1/expenses/:id/reject Requires role: admin or owner

    Side EffectAction
    LoggedActionUPDATE on Expense (status, rejectedBy, rejectedAt, rejectReason)
    GL TransactionNone
    interface ExpenseRejectedEvent {
      expenseId: string
      organizationId: string
      expenseNumber: string
      rejectReason: string
      rejectedBy: string
      rejectedAt: string
    }
    

    expense.paid

    Triggered: POST /api/v1/expenses/:id/mark-paid Requires role: admin or owner

    Side EffectAction
    LoggedActionUPDATE on Expense (status, paidAt)
    GL TransactionINSERT: Debit 2000 Accounts Payable / Credit 1000 Bank Account
    interface ExpensePaidEvent {
      expenseId: string
      organizationId: string
      expenseNumber: string
      amount: string
      currency: string
      bankAccountId: string       // Bank account that paid
      transactionId: string       // GL transaction ID
      debitAccountId: string      // 2000 Accounts Payable
      creditAccountId: string     // 1000 Bank Account
      paidBy: string
      paidAt: string
    }
    

    5. Domain Events — Banking

    bank.transaction.imported

    Triggered: POST /api/v1/banking/accounts/:id/import (CSV upload)

    Side EffectAction
    LoggedActionINSERT on BankTransaction (per row)
    ClamAV scanVirus check on uploaded CSV
    Auto-matchAttempt to match against open GL transactions
    interface BankTransactionImportedEvent {
      organizationId: string
      bankAccountId: string
      importedCount: number
      matchedCount: number       // Auto-matched to GL transactions
      unmatchedCount: number     // Require manual reconciliation
      importedBy: string
      importedAt: string
    }
    

    bank.transaction.reconciled

    Triggered: POST /api/v1/banking/transactions/:id/reconcile

    Side EffectAction
    LoggedActionUPDATE on BankTransaction (reconciled, glTransactionId)
    LoggedActionUPDATE on Transaction (locked = true)
    interface BankTransactionReconciledEvent {
      bankTransactionId: string
      organizationId: string
      glTransactionId: string     // Linked GL Transaction
      matchScore: number          // 0-100 confidence score
      matchType: 'AUTO' | 'MANUAL'
      reconciledBy: string        // userId (SYSTEM for auto-match)
      reconciledAt: string
    }
    

    6. Webhook Events (Phase 2)

    Webhooks are outbound HTTP POST callbacks to URLs registered per organization. They are not implemented in Phase 1 — documented here for future implementation.

    Registration

    POST /api/v1/organizations/webhooks
    {
      "url": "https://partner.example.com/bilko-webhook",
      "secret": "whsec_...",
      "events": ["invoice.sent", "invoice.paid", "invoice.cancelled"]
    }
    

    Delivery

    • Method: HTTP POST
    • Content-Type: application/json
    • Signature header: X-Bilko-Signature: sha256=<HMAC-SHA256(payload, secret)>
    • Timeout: 10 seconds
    • Retries: 3× exponential backoff (1s, 4s, 16s) then DLQ
    • Retry trigger: Non-2xx response or timeout

    Webhook Envelope

    {
      "id": "wh_01HX7M2K5N3P4Q5R6S7T8V9W0",
      "type": "invoice.paid",
      "organizationId": "org_abc123",
      "apiVersion": "2026-02-23",
      "createdAt": "2026-02-23T10:30:01.000Z",
      "data": {
        "invoiceId": "inv_xyz789",
        "invoiceNumber": "INV-2026-001",
        "paidAmount": "1250.0000",
        "currency": "RSD",
        "paidAt": "2026-02-23T10:30:00.000Z"
      }
    }
    

    Supported Webhook Events (Phase 2 Scope)

    Event TypeTrigger
    invoice.sentInvoice status → sent
    invoice.viewedInvoice tracking pixel fired
    invoice.paidInvoice marked paid
    invoice.overdueInvoice auto-marked overdue by cron
    invoice.cancelledInvoice cancelled
    expense.approvedExpense approved
    expense.paidExpense marked paid

    Signature Verification (Consumer Code)

    import { createHmac } from 'crypto'
    
    function verifyWebhookSignature(
      payload: string,
      signature: string,
      secret: string
    ): boolean {
      const expected = createHmac('sha256', secret)
        .update(payload)
        .digest('hex')
      return `sha256=${expected}` === signature
    }
    

    7. GL Transaction Event Mapping

    Every financial state transition that creates a GL transaction follows this pattern:

    EventDebit AccountCredit AccountReference Type
    Invoice sent1200 Accounts Receivable4000 RevenueINVOICE
    Invoice paid1000 Bank Account1200 Accounts ReceivableINVOICE
    Invoice cancelled (was sent)4000 Revenue1200 Accounts ReceivableINVOICE_REVERSAL
    Expense approved5xxx Expense Account2000 Accounts PayableEXPENSE
    Expense paid2000 Accounts Payable1000 Bank AccountEXPENSE

    Transaction immutability rules:

    • Once a Transaction is created, it is never updated or deleted
    • After reconciliation (locked = true), even corrections require a new offsetting transaction
    • LoggedAction records INSERT on every Transaction creation

    8. Event Naming Conventions

    Component Rule Bilko Examples
    Full event type{domain}.{entity}.{action}user.user.created
    Domain lowercaseLowercase, nounmatches service prefix invoiceuser, expenseorder, bankpayment
    Entity singular,Singular noun, lowercase with underscores invoiceuser, expenseorder_item, transactioninvoice
    Action past-Past-tense verbverb, lowercase with underscores created, sentupdated, paidstatus_changed, approvedpayment_failed

    Do NOT use:

    • Present tense (user.user.create — wrong)
    • Generic names (user.user.changed — too vague)
    • Abbreviations (usr.usr.crtd — unreadable)

    4. Event Envelope Format (CloudEvents 1.0)

    {
      "specversion": "1.0",
      "type": "{{DOMAIN}}.{{ENTITY}}.{{ACTION}}",
      "source": "{{SERVICE_NAME}}",
      "id": "evt_01HX7M2K5N3P4Q5R6S7T8V9W0",
      "time": "2024-01-15T10:30:00.000Z",
      "datacontenttype": "application/json",
      "subject": "{{optional: entity ID}}",
      "data": {
        "{{field}}": "{{value}}"
      }
    }
    

    Envelope fields:

    FieldTypeRequiredDescription
    reconciledspecversionstringYesAlways "1.0"
    FulltypestringYesEvent type (see naming convention)
    sourcestringYesEmitting service name
    idstringYesUnique event ID (ULID format)
    timestringYesISO 8601 timestamp (UTC)
    datacontenttypestringYesAlways "application/json"
    subjectstringNoPrimary entity ID (for routing)
    dataobjectYesDomain-specific event payload

    TypeScript interface:

    interface CloudEvent<T = Record<string, unknown>> {
      specversion: '1.0';
      type: string;
      source: string;
      id: string;
      time: string;
      datacontenttype: 'application/json';
      subject?: string;
      data: T;
    }
    

    5. Per-Event Documentation


    5.1 User Service Events

    user.user.created

    Published when a new user account is created.

    PropertyValue
    Publisheruser-service
    Consumersnotification-service, analytics-service, audit-service
    Topicuser.user.created
    Ordering guaranteePer user ID (partitioned by subject)
    Idempotency keyid (event ID) — consumers must deduplicate
    Retry behaviorConsumer retries up to 5x before DLQ

    Payload schema (JSON Schema):

    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "required": ["userId", "email", "name", "role", "createdAt"],
      "properties": {
        "userId": { "type": "string", "format": "uuid" },
        "email": { "type": "string", "format": "email" },
        "name": { "type": "string", "minLength": 1 },
        "role": { "type": "string", "enum": ["admin", "user", "viewer"] },
        "createdAt": { "type": "string", "format": "date-time" }
      }
    }
    

    Example event:

    {
      "specversion": "1.0",
      "type": "user.user.created",
      "source": "user-service",
      "id": "evt_01HX7M2K5N3P4Q5R6S7T8V9W0",
      "time": "2024-01-15T10:30:00.000Z",
      "datacontenttype": "application/json",
      "subject": "usr_01HX7...",
      "data": {
        "userId": "usr_01HX7...",
        "email": "[email protected]",
        "name": "Jane Doe",
        "role": "user",
        "createdAt": "2024-01-15T10:30:00.000Z"
      }
    }
    

    user.user.updated

    Published when user profile data changes.

    PropertyValue
    Publisheruser-service
    Consumerssearch-service, notification-service, analytics-service
    Topicuser.user.updated
    Ordering guaranteePer user ID
    Idempotency keyid (event ID)

    Payload schema:

    {
      "type": "object",
      "required": ["userId", "updatedFields", "updatedAt"],
      "properties": {
        "userId": { "type": "string" },
        "updatedFields": {
          "type": "array",
          "items": { "type": "string" },
          "description": "List of field names that changed"
        },
        "before": { "type": "object", "description": "Previous values (only changed fields)" },
        "after": { "type": "object", "description": "New values (only changed fields)" },
        "updatedAt": { "type": "string", "format": "date-time" }
      }
    }
    

    Example event:

    {
      "specversion": "1.0",
      "type": "user.user.updated",
      "source": "user-service",
      "id": "evt_01HX8...",
      "time": "2024-01-16T08:00:00.000Z",
      "datacontenttype": "application/json",
      "subject": "usr_01HX7...",
      "data": {
        "userId": "usr_01HX7...",
        "updatedFields": ["name"],
        "before": { "name": "Jane Doe" },
        "after": { "name": "Jane Smith" },
        "updatedAt": "2024-01-16T08:00:00.000Z"
      }
    }
    

    user.user.deleted

    Published when a user account is soft-deleted.

    PropertyValue
    Publisheruser-service
    Consumersorder-service, notification-service, analytics-service
    Payload {domain} userId, deletedAt, reason }
    Ordering guaranteePer user ID

    Example event:

    {
      "specversion": "1.0",
      "type": "user.user.deleted",
      "source": "user-service",
      "id": "evt_01HX9...",
      "time": "2024-01-17T12:00:00.000Z",
      "datacontenttype": "application/json",
      "subject": "usr_01HX7...",
      "data": {action}
        "userId": "usr_01HX7...",
        "deletedAt": "2024-01-17T12:00:00.000Z",
        "reason": "user_requested"
      }
    }
    

    5.2 Order Service Events

    order.order.created

    PropertyValue
    Publisher invoice.sentorder-service
    Consumerspayment-service, expense.approvednotification-service, inventory-service
    Topicorder.order.created
    Ordering guaranteePer order ID

    Payload schema:

    {
      "type": "object",
      "required": ["orderId", "userId", "items", "total", "currency", "createdAt"],
      "properties": {
        "orderId": { "type": "string" },
        "userId": { "type": "string" },
        "items": {
          "type": "array",
          "items": {
            "type": "object",
            "properties": {
              "productId": { "type": "string" },
              "quantity": { "type": "integer" },
              "unitPrice": { "type": "number" }
            }
          }
        },
        "total": { "type": "number" },
        "currency": { "type": "string", "pattern": "^[A-Z]{3}$" },
        "createdAt": { "type": "string", "format": "date-time" }
      }
    }
    

    5.3 {{DOMAIN}} Service Events

    {{domain}}.{{entity}}.{{action}}

    PropertyValue
    Publisher{{service-name}}
    Consumers{{consumer-a, consumer-b}}
    Topic{{domain.entity.action}}
    Ordering guarantee`{{Per entity ID
    Idempotency key{{id}}

    Payload schema: TODO: Define JSON Schema

    Example event: TODO: Add example


    6. Dead Letter Queue Handling

    DLQ naming: {{topic}}.dlq — e.g., user.user.created.dlq

    DLQ workflow:

    Event Published
        ↓
    Consumer processes
        ↓ Fails
    Retry (exp. backoff: 1s, 2s, 4s, 8s, 16s — max 5 retries)
        ↓ All retries exhausted
    Move to DLQ
        ↓
    Alert fires: PagerDuty P3
        ↓
    On-call investigates
        ↓
    Option A: Fix consumer bug → replay from DLQ
    Option B: Skip message (data was invalid) → log + discard
    

    DLQ message format:

    {
      "originalEvent": { /* original CloudEvent */ },
      "failureReason": "Consumer threw: Cannot read property 'id' of undefined",
      "attemptCount": 5,
      "firstFailedAt": "2024-01-15T10:30:00Z",
      "lastFailedAt": "2024-01-15T10:32:00Z",
      "consumerGroup": "notification-service-consumer"
    }
    

    DLQ retention: 14 days. DLQ alert threshold: > 10 messages in DLQ within 5 minutes.


    7. Event Versioning Strategy

    Strategy: Backward-compatible field addition + major version in event type.

    Rules:

    1. Adding optional fields: allowed without version bump
    2. Removing fields: NOT allowed (use deprecation first, remove after all consumers updated)
    3. Changing field types: NOT allowed (breaking change)
    4. Adding required fields: requires version bump
    5. Major breaking change: new event type user.user.created.v2

    Deprecation process:

    1. Mark field as deprecated in schema docs
    2. Notify all consumer teams
    3. Wait 2 sprint cycles (4 weeks minimum)
    4. Remove field from schema
    5. Update documentation
    

    Schema registry: {{Confluent Schema Registry | AWS Glue Schema Registry | Manual docs}} Validation: Consumer validates incoming events against pinned schema version.


    8. Event Replay Capability

    Replay supported: {{Yes — Kafka log retention | No — events are ephemeral}}

    Replay scenarios:

    • Bug in consumer → fix bug → replay affected time window
    • New consumer onboarded → replay historical events to build initial state
    • Data migration → replay events to new storage

    Replay procedure:

    1. Identify topic and time range to replay
    2. Coordinate with all consumer teams (replay may cause duplicate side effects)
    3. Ensure consumers are idempotent before replay
    4. Set consumer offset to target timestamp: kafka-consumer-groups --reset-offsets --to-datetime
    5. Restart consumer with temporary consumer group to avoid affecting production offset
    6. Verify replayed state is correct
    7. Switch production consumer to new state

    Retention periods by topic: See topic configuration table in Section 2.


    9. Monitoring & Observability

    failurefailure P2
    SignalWhat to WatchMetric Alert ThresholdSeverityChannel
    LoggedActionConsumer INSERTlag failures(per topic) Prisma> middleware10,000 errormessages AnyP2 Slack → P1#alerts
    GLDLQ Transaction creation failuresdepth Accounting> engine10 errormessages / 5min AnyP3 Slack → P1#alerts
    PDF generation failuresPuppeteerProducer error rate > 3 failures1% / 5min P1 PagerDuty
    EmailConsumer deliveryerror failuresrate SendGrid> 4xx/5xx5% / 5minP2PagerDuty
    Event processing latency P99 > 5 failures / 5min → P2
    Webhook delivery failures (Phase 2)seconds Non-2xx after retries → DLQP3 DLQSlack depth > 10 → P3#alerts

    LogDashboard: pattern{{https://monitoring.domain.com/dashboards/events}} forDistributed everytracing: domainAll event:events carry traceparent header (OpenTelemetry W3C Trace Context).

    logger.info('invoice.sent', {
      invoiceId,
      organizationId,
      transactionId,
      pdfUrl,
      durationMs: Date.now() - startTime,
    })
    

    10. Testing DomainEvent-Driven EventsFlows

    Unit Test PatternTests

    // Test producer: verify event shape
    it('should createpublish GLuser.created transactionevent whenwith invoicecorrect isschema', sent'async () => {
      await userService.create(createUserDto);
    
      expect(eventBus.publish).toHaveBeenCalledWith(
        expect.objectContaining({
          type: 'user.user.created',
          source: 'user-service',
          data: expect.objectContaining({
            userId: expect.any(String),
            email: createUserDto.email,
          }),
        })
      );
    });
    
    // Test consumer: verify handler idempotency
    it('should not send welcome email twice for duplicate event', async () => {
      const invoiceevent = await createDraftInvoice(orgId)buildUserCreatedEvent();
      await invoicesService.sendInvoice(invoice.id, userId)
    
      const transaction =handler.handle(event);
      await prisma.transaction.findFirst({handler.handle(event); where:// {duplicate
      referenceId: invoice.id, referenceType: 'INVOICE' }expect(emailService.send).toHaveBeenCalledTimes(1);
    })
    
      expect(transaction).toBeTruthy()
      expect(transaction!.debitAccountId).toBe(ACCOUNTS_RECEIVABLE_ID)
      expect(transaction!.creditAccountId).toBe(REVENUE_ACCOUNT_ID)
      expect(transaction!.amount).toBe(invoice.totalAmount)
    })
    
    it('should write LoggedAction on invoice status change', async () => {
      const invoice = await createDraftInvoice(orgId)
    
      await invoicesService.sendInvoice(invoice.id, userId)
    
      const auditEntry = await prisma.loggedAction.findFirst({
        where: { tableName: 'Invoice', rowId: invoice.id, action: 'UPDATE' },
        orderBy: { createdAt: 'desc' },
      })
    
      expect(auditEntry).toBeTruthy()
      expect(auditEntry!.changedFields).toMatchObject({ status: 'sent' })
      expect(auditEntry!.userId).toBe(userId)
    });
    

    Integration Test:Tests

    Full
    // InvoiceUse Lifecyclereal broker in integration tests (testcontainers)
    const kafka = await new KafkaContainer('confluentinc/cp-kafka:7.5.0').start();
    

    E2E Tests

    Test full event chain: API action → event published → consumer processes → side effect visible.

    POST /api/v1/invoicesusers201poll draftfor createdwelcome POSTemail /api/v1/invoices/:id/send(SendGrid sandbox)200 sent
      ↳ Assert: LoggedAction INSERT (Invoice, status=sent)
      ↳ Assert: Transaction created (Debit 1200, Credit 4000)
      ↳ Assert: PDF URL set on invoice
      ↳ Assert: SendGrid sandboxassert received emailwithin GET /api/v1/invoices/track/:token → 200 viewed
      ↳ Assert: invoice.status = 'viewed'
    
    PATCH /api/v1/invoices/:id/mark-paid → 200 paid
      ↳ Assert: LoggedAction INSERT (Invoice, status=paid)
      ↳ Assert: Second Transaction (Debit 1000, Credit 1200)
      ↳ Assert: Total LoggedAction rows for invoice = 3 (created + sent + paid)5s
    

    Related DocumentsApproval