Shop It Docs
Developer Resourcesbanners

Banners Analytics Architecture

Banners Analytics Architecture

The Banners module captures impression and click events asynchronously and aggregates them into daily stats rollups. All writes go through BullMQ — the tracking HTTP endpoints never write to the database directly.


1. Architecture Overview

Client browser/app

        │  POST /api/banners/events/impression
        │  POST /api/banners/events/click

BannerTrackingController    (@Public, 202 Accepted)

        │  resolve publicIds → integer IDs (parallel DB lookups)
        │  enqueue with jobId = correlationId

BullMQ BANNERS queue (Redis)

        │  async processing

BannerEventsProcessor
   ├── RECORD_IMPRESSION    → INSERT INTO banner_impressions
   ├── RECORD_CLICK         → INSERT INTO banner_clicks
   ├── SYNC_SCHEDULE_STATUS → UPDATE banners + campaigns + invalidate cache
   └── ROLLUP_STATS         → GROUP BY + UPSERT INTO banner_stats

(BannerSchedulerService registers cron jobs that enqueue SYNC_SCHEDULE_STATUS
and ROLLUP_STATS into the same BANNERS queue.)

The tracking controller returns 202 Accepted immediately after enqueuing the job. Database writes happen out of band and do not affect response latency for the client.


2. BullMQ Queue Configuration

Queue name: QueueName.BANNERS = "banners"

Registered in apps/api/src/services/bullmq/bull.module.ts inside the REGISTERED_QUEUES array. This registration enables Bull Board UI monitoring, global default job options (attempts, backoff), and graceful shutdown via BullMQService.

Two NestJS modules register this queue:

  • BannerTrackingModule — for enqueue-only access (@InjectQueue(QueueName.BANNERS))
  • BannersWorkersModule — for processor + scheduler access (BannerEventsProcessor + BannerSchedulerService)

NestJS BullMQ deduplicates Redis connections when the same queue is registered in multiple modules.


3. Job Contracts

All job contracts are defined in packages/jobs/src/index.ts:

JobEnum valueProducerConsumerPayload interface
BannerJob.RECORD_IMPRESSION"banner.record_impression"BannerTrackingService.recordImpressionBannerEventsProcessor.processRecordImpressionRecordBannerImpressionPayload
BannerJob.RECORD_CLICK"banner.record_click"BannerTrackingService.recordClickBannerEventsProcessor.processRecordClickRecordBannerClickPayload
BannerJob.SYNC_SCHEDULE_STATUS"banner.sync_schedule_status"BannerSchedulerService.runScheduleSync (cron)BannerEventsProcessor.processSyncScheduleStatusSyncBannerScheduleStatusPayload
BannerJob.ROLLUP_STATS"banner.rollup_stats"BannerSchedulerService.runStatsRollup (cron)BannerEventsProcessor.processRollupStatsRollupBannerStatsPayload

4. Impression Flow (End-to-End)

4.1 Client Submission

POST /api/banners/events/impression
Content-Type: application/json

{
  "bannerPublicId": "019b0b5f-...",
  "placementPublicId": "019b0b5f-...",
  "campaignPublicId": "019b0b5f-...",
  "sessionId": "sess_abc123",
  "deviceType": "mobile",
  "countryCode": "NP",
  "pageUrl": "https://shop.example.com/p/123",
  "occurredAt": "2026-06-19T10:15:30.000Z",
  "correlationId": "imp_019b0b5f-..._1750334400000"
}

4.2 Controller Handling

@Post("impression")
@HttpCode(HttpStatus.ACCEPTED)
@UseGuards(IpThrottlerGuard)
@IpThrottle({ limit: 600, windowSeconds: 60 })
async recordImpression(@Body() dto: RecordImpressionDto) {
  await this.trackingService.recordImpression(dto);
  return new ResponseDto("Impression recorded.", null);
}

Returns 202 Accepted after the service call completes. The service returns as soon as the job is enqueued — no waiting for DB write.

4.3 Service: Public ID Resolution

BannerTrackingService.recordImpression() resolves the three publicIds to integer IDs in a single Promise.all before enqueuing. Note: the service does NOT filter on deletedAt — only eq(publicId):

const [[banner], [placement], [campaign]] = await Promise.all([
  this.db.select({ id: banners.id }).from(banners)
    .where(eq(banners.publicId, bannerPublicId)).limit(1),
  this.db.select({ id: placements.id }).from(placements)
    .where(eq(placements.publicId, placementPublicId)).limit(1),
  campaignPublicId
    ? this.db.select({ id: campaigns.id }).from(campaigns)
        .where(eq(campaigns.publicId, campaignPublicId)).limit(1)
    : Promise.resolve([]),
]);
  • If banner not found: throws NotFoundException (BANNER_NOT_FOUND, 404)
  • If placement not found: throws NotFoundException (PLACEMENT_NOT_FOUND, 404)
  • If campaignPublicId provided but not found: campaignId is undefined (no throw — campaign may have been deleted after the impression)

4.4 Enqueue with Deduplication

await this.bannersQueue.add(
  BannerJob.RECORD_IMPRESSION,
  payload,
  { jobId: correlationId },
).catch((err) => {
  if (isDuplicateJobError(err)) return;  // idempotent
  throw err;
});

BullMQ uses jobId as a deduplication key within the queue. The enqueueWithDedup helper catches "jobid already exists" errors and silently no-ops. Retry-safe by design.

4.5 Processor: Database Write

BannerEventsProcessor.processRecordImpression(payload):

await this.db.insert(bannerImpressions).values({
  bannerId: payload.bannerId,
  placementId: payload.placementId,
  campaignId: payload.campaignId ?? null,
  sessionId: payload.sessionId ?? null,
  userId: payload.userId ?? null,
  deviceType: payload.deviceType ?? null,
  countryCode: payload.countryCode ?? null,
  pageUrl: payload.pageUrl ?? null,
  referrerUrl: payload.referrerUrl ?? null,
  occurredAt: new Date(payload.occurredAt),   // string → Date for timestamptz
});

occurredAt is converted to a Date object before the Drizzle insert. Nullable payload fields default to null.


5. Click Flow

Identical to impression flow with two differences:

  • Job name: BannerJob.RECORD_CLICK
  • destinationUrl field is required in payload
  • No referrerUrl field on click events
  • Inserts into banner_clicks table (with destinationUrl column)

6. Schedule Status Sync Flow

6.1 Cron Trigger

BannerSchedulerService.runScheduleSync() runs each minute (BANNER_SCHEDULE_SYNC_CRON, default * * * * *) and enqueues:

const now = new Date();
const minuteKey = [
  now.getUTCFullYear(),
  String(now.getUTCMonth() + 1).padStart(2, "0"),
  String(now.getUTCDate()).padStart(2, "0"),
].join("-").concat(
  `T${String(now.getUTCHours()).padStart(2, "0")}:${String(now.getUTCMinutes()).padStart(2, "0")}`,
);
const jobId = `banner-sync-${minuteKey}`;
await this.bannersQueue.add(
  BannerJob.SYNC_SCHEDULE_STATUS,
  { correlationId: `sync-${minuteKey}` },
  { jobId },
).catch((err) => {
  if (isDuplicateJobError(err)) return;
  this.logger.error(`Failed to enqueue banner schedule sync: ${err.message}`);
});

minuteKey = YYYY-MM-DDTHH:MM (UTC). At most one sync job runs per minute window — if the cron fires twice within the same minute (clock drift, restart), the second job is silently deduplicated.

6.2 Processor: 4-Parallel Updates

BannerEventsProcessor.processSyncScheduleStatus(payload) runs 4 DB updates concurrently via Promise.all:

const [bannersActivated, bannersExpired, campaignsActivated, campaignsEnded] = await Promise.all([
  // 1. scheduled banners → active
  this.db.update(banners)
    .set({ scheduleStatus: "active", updatedAt: now })
    .where(and(
      eq(banners.scheduleStatus, "scheduled"),
      lte(banners.publishAt, now),
      or(isNull(banners.expiresAt), sql`${banners.expiresAt} > ${now}`),
      isNull(banners.deletedAt),
    )).returning({ id: banners.id }),

  // 2. active/scheduled banners → expired
  this.db.update(banners)
    .set({ scheduleStatus: "expired", updatedAt: now })
    .where(and(
      inArray(banners.scheduleStatus, ["active", "scheduled"]),
      sql`${banners.expiresAt} IS NOT NULL`,
      lt(banners.expiresAt, now),
      isNull(banners.deletedAt),
    )).returning({ id: banners.id }),

  // 3. scheduled campaigns → active
  this.db.update(campaigns)
    .set({ status: "active", updatedAt: now })
    .where(and(
      eq(campaigns.status, "scheduled"),
      lte(campaigns.startsAt, now),
      or(isNull(campaigns.endsAt), sql`${campaigns.endsAt} > ${now}`),
      isNull(campaigns.deletedAt),
    )).returning({ id: campaigns.id }),

  // 4. active/scheduled campaigns → ended
  this.db.update(campaigns)
    .set({ status: "ended", updatedAt: now })
    .where(and(
      inArray(campaigns.status, ["active", "scheduled"]),
      sql`${campaigns.endsAt} IS NOT NULL`,
      lt(campaigns.endsAt, now),
      isNull(campaigns.deletedAt),
    )).returning({ id: campaigns.id }),
]);

6.3 Cache Invalidation

const totalChanged =
  bannersActivated.length + bannersExpired.length +
  campaignsActivated.length + campaignsEnded.length;

this.logger.log(
  `Banner schedule sync complete bannersActivated=${...} bannersExpired=${...} campaignsActivated=${...} campaignsEnded=${...} correlationId=${...}`,
);

if (totalChanged === 0) return;

try {
  await this.redisCacheService.invalidatePattern("banners:serve:*");
} catch (err) {
  this.logger.warn(`Banner serving cache invalidation failed correlationId=${...} reason=${err.message}`);
}

Cache invalidation only fires when totalChanged > 0. Failure is warn-only — the 60-second TTL ensures eventual consistency.


7. Stats Rollup Flow

7.1 Cron Trigger

BannerSchedulerService.runStatsRollup() runs nightly (BANNER_STATS_ROLLUP_CRON, default 0 1 * * *) and enqueues for the previous day (UTC):

const yesterday = new Date();
yesterday.setUTCDate(yesterday.getUTCDate() - 1);
const statDate = yesterday.toISOString().split("T")[0];  // "YYYY-MM-DD"
const jobId = `banner-rollup-${statDate}`;
await this.bannersQueue.add(
  BannerJob.ROLLUP_STATS,
  { statDate, correlationId: `rollup-${statDate}` },
  { jobId },
);

jobId ensures the rollup for a given date runs exactly once — safe to restart the cron without double-counting.

7.2 Processor: Aggregation Queries

BannerEventsProcessor.processRollupStats(payload):

const dayStart = new Date(`${statDate}T00:00:00.000Z`);
const dayEnd = new Date(`${statDate}T23:59:59.999Z`);

const [impressionRows, clickRows] = await Promise.all([
  this.db.select({
    bannerId: bannerImpressions.bannerId,
    placementId: bannerImpressions.placementId,
    campaignId: bannerImpressions.campaignId,
    eventCount: count(),
  })
    .from(bannerImpressions)
    .where(and(
      gte(bannerImpressions.occurredAt, dayStart),
      lte(bannerImpressions.occurredAt, dayEnd),
      sql`${bannerImpressions.campaignId} IS NOT NULL`,
    ))
    .groupBy(
      bannerImpressions.bannerId,
      bannerImpressions.placementId,
      bannerImpressions.campaignId,
    ),

  this.db.select({ ... })
    .from(bannerClicks)
    .where(and(
      gte(bannerClicks.occurredAt, dayStart),
      lte(bannerClicks.occurredAt, dayEnd),
      sql`${bannerClicks.campaignId} IS NOT NULL`,
    ))
    .groupBy(
      bannerClicks.bannerId,
      bannerClicks.placementId,
      bannerClicks.campaignId,
    ),
]);

Why filter campaign_id IS NOT NULL: the banner_stats unique constraint on (banner_id, placement_id, campaign_id, stat_date) cannot deduplicate rows where campaign_id IS NULL because PostgreSQL treats NULL values as distinct in unique indexes. ON CONFLICT DO UPDATE will not fire for null-campaign_id rows — it would silently create duplicate rows. Filtering at rollup time avoids this. Since the serving endpoint always supplies a campaignId context, analytics events in production always have a non-null campaignId.

7.3 Merge and Upsert

const statsMap = new Map<string, { ... }>();

for (const row of impressionRows) {
  if (row.campaignId === null) continue;
  const key = `${row.bannerId}:${row.placementId}:${row.campaignId}`;
  statsMap.set(key, {
    bannerId: row.bannerId,
    placementId: row.placementId,
    campaignId: row.campaignId,
    statDate,
    impressions: Number(row.eventCount),
    clicks: 0,
  });
}

for (const row of clickRows) {
  if (row.campaignId === null) continue;
  const key = `${row.bannerId}:${row.placementId}:${row.campaignId}`;
  const existing = statsMap.get(key) ?? { /* impressions: 0, clicks: 0, ... */ };
  existing.clicks = Number(row.eventCount);
  statsMap.set(key, existing);
}

const statsValues = [...statsMap.values()];
if (statsValues.length === 0) {
  this.logger.log(`Banner stats rollup skipped no_data statDate=${statDate} correlationId=${...}`);
  return;
}

let upsertedRows = 0;
for (let index = 0; index < statsValues.length; index += ROLLUP_BATCH_SIZE /* 100 */) {
  const batch = statsValues.slice(index, index + ROLLUP_BATCH_SIZE).map((v) => ({
    bannerId: v.bannerId,
    placementId: v.placementId,
    campaignId: v.campaignId,
    statDate,
    impressions: v.impressions,
    clicks: v.clicks,
    updatedAt: now,
  }));

  await this.db.insert(bannerStats).values(batch).onConflictDoUpdate({
    target: [bannerStats.bannerId, bannerStats.placementId, bannerStats.campaignId, bannerStats.statDate],
    set: {
      impressions: sql`EXCLUDED.impressions`,
      clicks: sql`EXCLUDED.clicks`,
      updatedAt: sql`NOW()`,
    },
  });

  upsertedRows += batch.length;
}

7.4 Overwrite Semantics (Idempotency)

ON CONFLICT DO UPDATE SET impressions = EXCLUDED.impressions uses overwrite semantics, not accumulate. This means:

  • Running the rollup for the same statDate twice produces the same result (idempotent)
  • If a job fails midway and is retried, the re-run overwrites with the correct aggregated count
  • If impression rows are backfilled for a past date, re-running the rollup for that date will correct banner_stats

Accumulate semantics (impressions = banner_stats.impressions + EXCLUDED.impressions) would produce doubled counts on retry — silent data corruption.


8. Schema Details

  • bigserial PKs (impression/click volumes can exceed integer range)
  • No unique constraint; append-only writes
  • No UPDATE or DELETE in normal operation
  • Deduplication is at the application layer via correlationId → BullMQ jobId
  • Rows are never updated or deleted after insert; historical event data is permanent
  • Unique constraint: UNIQUE (banner_id, placement_id, campaign_id, stat_date)
  • This constraint only deduplicates rows where campaign_id IS NOT NULL
  • NULL campaign_id rows are excluded at rollup time (see section 7.2)

9. DLQ and Job Failure Handling

Failed jobs follow the global BullMQ configuration from bull.module.ts:

  • Default retry count: configured per-queue in resolveQueueDefaultJobOptions()
  • On max retry exhausted: job moves to the failed job list (BullMQ's built-in DLQ equivalent)
  • BullBoard (/admin/bullboard) shows failed jobs with stack traces
  • Manual retry available via BullBoard UI

SYNC_SCHEDULE_STATUS and ROLLUP_STATS jobs are safe to retry — both are idempotent:

  • Status sync: re-running applies the same state transitions; already-transitioned records match no WHERE conditions
  • Stats rollup: overwrite semantics ensure re-running produces correct totals

RECORD_IMPRESSION and RECORD_CLICK jobs: already-inserted rows on retry would create duplicate rows in banner_impressions/banner_clicks if the first attempt succeeded but the job was marked failed (network timeout after successful DB write). This is an acceptable tradeoff — impression analytics are best-effort, not billing-critical. Duplicate events are rare and represent < 0.1% of volume in practice.


10. Data Retention

No automatic purge is implemented in the current version.

TableRetentionNotes
banner_impressionsIndefiniteAppend-only; partition by occurred_at month for scale
banner_clicksIndefiniteSame
banner_statsIndefiniteDaily aggregates; small row count regardless of event volume

Future: RETENTION_CLEANUP_CRON job to archive or delete banner_impressions/banner_clicks older than N days. Define N in env config when implemented.


11. Observability

SignalWhat to Watch
BullBoard /admin/bullboard (BANNERS queue)Failed jobs, queue depth, processing rate
SYNC_SCHEDULE_STATUS job logstotalChanged > 0 means banners/campaigns transitioned — check if expected
ROLLUP_STATS job logsLog statsMap.size = number of (banner, placement, campaign) combinations for the day
Redis memoryWatch for banners:serve:* key growth if TTL invalidation isn't working
banner_stats row countShould grow by statsMap.size per day; flat growth = rollup not running
Duplicate impressionsIf correlationId dedup breaks, banner_impressions row count per banner spikes

12. Architecture Sequence Diagram