Banners Analytics Architecture
Banners Analytics Architecture
The Banners module captures impression and click events asynchronously and aggregates them into daily stats rollups. All writes go through BullMQ — the tracking HTTP endpoints never write to the database directly.
1. Architecture Overview
Client browser/app
│
│ POST /api/banners/events/impression
│ POST /api/banners/events/click
▼
BannerTrackingController (@Public, 202 Accepted)
│
│ resolve publicIds → integer IDs (parallel DB lookups)
│ enqueue with jobId = correlationId
▼
BullMQ BANNERS queue (Redis)
│
│ async processing
▼
BannerEventsProcessor
├── RECORD_IMPRESSION → INSERT INTO banner_impressions
├── RECORD_CLICK → INSERT INTO banner_clicks
├── SYNC_SCHEDULE_STATUS → UPDATE banners + campaigns + invalidate cache
└── ROLLUP_STATS → GROUP BY + UPSERT INTO banner_stats
(BannerSchedulerService registers cron jobs that enqueue SYNC_SCHEDULE_STATUS
and ROLLUP_STATS into the same BANNERS queue.)The tracking controller returns 202 Accepted immediately after enqueuing the job. Database writes happen out of band and do not affect response latency for the client.
2. BullMQ Queue Configuration
Queue name: QueueName.BANNERS = "banners"
Registered in apps/api/src/services/bullmq/bull.module.ts inside the REGISTERED_QUEUES array. This registration enables Bull Board UI monitoring, global default job options (attempts, backoff), and graceful shutdown via BullMQService.
Two NestJS modules register this queue:
BannerTrackingModule— for enqueue-only access (@InjectQueue(QueueName.BANNERS))BannersWorkersModule— for processor + scheduler access (BannerEventsProcessor+BannerSchedulerService)
NestJS BullMQ deduplicates Redis connections when the same queue is registered in multiple modules.
3. Job Contracts
All job contracts are defined in packages/jobs/src/index.ts:
| Job | Enum value | Producer | Consumer | Payload interface |
|---|---|---|---|---|
BannerJob.RECORD_IMPRESSION | "banner.record_impression" | BannerTrackingService.recordImpression | BannerEventsProcessor.processRecordImpression | RecordBannerImpressionPayload |
BannerJob.RECORD_CLICK | "banner.record_click" | BannerTrackingService.recordClick | BannerEventsProcessor.processRecordClick | RecordBannerClickPayload |
BannerJob.SYNC_SCHEDULE_STATUS | "banner.sync_schedule_status" | BannerSchedulerService.runScheduleSync (cron) | BannerEventsProcessor.processSyncScheduleStatus | SyncBannerScheduleStatusPayload |
BannerJob.ROLLUP_STATS | "banner.rollup_stats" | BannerSchedulerService.runStatsRollup (cron) | BannerEventsProcessor.processRollupStats | RollupBannerStatsPayload |
4. Impression Flow (End-to-End)
4.1 Client Submission
POST /api/banners/events/impression
Content-Type: application/json
{
"bannerPublicId": "019b0b5f-...",
"placementPublicId": "019b0b5f-...",
"campaignPublicId": "019b0b5f-...",
"sessionId": "sess_abc123",
"deviceType": "mobile",
"countryCode": "NP",
"pageUrl": "https://shop.example.com/p/123",
"occurredAt": "2026-06-19T10:15:30.000Z",
"correlationId": "imp_019b0b5f-..._1750334400000"
}4.2 Controller Handling
@Post("impression")
@HttpCode(HttpStatus.ACCEPTED)
@UseGuards(IpThrottlerGuard)
@IpThrottle({ limit: 600, windowSeconds: 60 })
async recordImpression(@Body() dto: RecordImpressionDto) {
await this.trackingService.recordImpression(dto);
return new ResponseDto("Impression recorded.", null);
}Returns 202 Accepted after the service call completes. The service returns as soon as the job is enqueued — no waiting for DB write.
4.3 Service: Public ID Resolution
BannerTrackingService.recordImpression() resolves the three publicIds to integer IDs in a single Promise.all before enqueuing. Note: the service does NOT filter on deletedAt — only eq(publicId):
const [[banner], [placement], [campaign]] = await Promise.all([
this.db.select({ id: banners.id }).from(banners)
.where(eq(banners.publicId, bannerPublicId)).limit(1),
this.db.select({ id: placements.id }).from(placements)
.where(eq(placements.publicId, placementPublicId)).limit(1),
campaignPublicId
? this.db.select({ id: campaigns.id }).from(campaigns)
.where(eq(campaigns.publicId, campaignPublicId)).limit(1)
: Promise.resolve([]),
]);- If banner not found: throws
NotFoundException(BANNER_NOT_FOUND, 404) - If placement not found: throws
NotFoundException(PLACEMENT_NOT_FOUND, 404) - If
campaignPublicIdprovided but not found:campaignIdisundefined(no throw — campaign may have been deleted after the impression)
4.4 Enqueue with Deduplication
await this.bannersQueue.add(
BannerJob.RECORD_IMPRESSION,
payload,
{ jobId: correlationId },
).catch((err) => {
if (isDuplicateJobError(err)) return; // idempotent
throw err;
});BullMQ uses jobId as a deduplication key within the queue. The enqueueWithDedup helper catches "jobid already exists" errors and silently no-ops. Retry-safe by design.
4.5 Processor: Database Write
BannerEventsProcessor.processRecordImpression(payload):
await this.db.insert(bannerImpressions).values({
bannerId: payload.bannerId,
placementId: payload.placementId,
campaignId: payload.campaignId ?? null,
sessionId: payload.sessionId ?? null,
userId: payload.userId ?? null,
deviceType: payload.deviceType ?? null,
countryCode: payload.countryCode ?? null,
pageUrl: payload.pageUrl ?? null,
referrerUrl: payload.referrerUrl ?? null,
occurredAt: new Date(payload.occurredAt), // string → Date for timestamptz
});occurredAt is converted to a Date object before the Drizzle insert. Nullable payload fields default to null.
5. Click Flow
Identical to impression flow with two differences:
- Job name:
BannerJob.RECORD_CLICK destinationUrlfield is required in payload- No
referrerUrlfield on click events - Inserts into
banner_clickstable (withdestinationUrlcolumn)
6. Schedule Status Sync Flow
6.1 Cron Trigger
BannerSchedulerService.runScheduleSync() runs each minute (BANNER_SCHEDULE_SYNC_CRON, default * * * * *) and enqueues:
const now = new Date();
const minuteKey = [
now.getUTCFullYear(),
String(now.getUTCMonth() + 1).padStart(2, "0"),
String(now.getUTCDate()).padStart(2, "0"),
].join("-").concat(
`T${String(now.getUTCHours()).padStart(2, "0")}:${String(now.getUTCMinutes()).padStart(2, "0")}`,
);
const jobId = `banner-sync-${minuteKey}`;
await this.bannersQueue.add(
BannerJob.SYNC_SCHEDULE_STATUS,
{ correlationId: `sync-${minuteKey}` },
{ jobId },
).catch((err) => {
if (isDuplicateJobError(err)) return;
this.logger.error(`Failed to enqueue banner schedule sync: ${err.message}`);
});minuteKey = YYYY-MM-DDTHH:MM (UTC). At most one sync job runs per minute window — if the cron fires twice within the same minute (clock drift, restart), the second job is silently deduplicated.
6.2 Processor: 4-Parallel Updates
BannerEventsProcessor.processSyncScheduleStatus(payload) runs 4 DB updates concurrently via Promise.all:
const [bannersActivated, bannersExpired, campaignsActivated, campaignsEnded] = await Promise.all([
// 1. scheduled banners → active
this.db.update(banners)
.set({ scheduleStatus: "active", updatedAt: now })
.where(and(
eq(banners.scheduleStatus, "scheduled"),
lte(banners.publishAt, now),
or(isNull(banners.expiresAt), sql`${banners.expiresAt} > ${now}`),
isNull(banners.deletedAt),
)).returning({ id: banners.id }),
// 2. active/scheduled banners → expired
this.db.update(banners)
.set({ scheduleStatus: "expired", updatedAt: now })
.where(and(
inArray(banners.scheduleStatus, ["active", "scheduled"]),
sql`${banners.expiresAt} IS NOT NULL`,
lt(banners.expiresAt, now),
isNull(banners.deletedAt),
)).returning({ id: banners.id }),
// 3. scheduled campaigns → active
this.db.update(campaigns)
.set({ status: "active", updatedAt: now })
.where(and(
eq(campaigns.status, "scheduled"),
lte(campaigns.startsAt, now),
or(isNull(campaigns.endsAt), sql`${campaigns.endsAt} > ${now}`),
isNull(campaigns.deletedAt),
)).returning({ id: campaigns.id }),
// 4. active/scheduled campaigns → ended
this.db.update(campaigns)
.set({ status: "ended", updatedAt: now })
.where(and(
inArray(campaigns.status, ["active", "scheduled"]),
sql`${campaigns.endsAt} IS NOT NULL`,
lt(campaigns.endsAt, now),
isNull(campaigns.deletedAt),
)).returning({ id: campaigns.id }),
]);6.3 Cache Invalidation
const totalChanged =
bannersActivated.length + bannersExpired.length +
campaignsActivated.length + campaignsEnded.length;
this.logger.log(
`Banner schedule sync complete bannersActivated=${...} bannersExpired=${...} campaignsActivated=${...} campaignsEnded=${...} correlationId=${...}`,
);
if (totalChanged === 0) return;
try {
await this.redisCacheService.invalidatePattern("banners:serve:*");
} catch (err) {
this.logger.warn(`Banner serving cache invalidation failed correlationId=${...} reason=${err.message}`);
}Cache invalidation only fires when totalChanged > 0. Failure is warn-only — the 60-second TTL ensures eventual consistency.
7. Stats Rollup Flow
7.1 Cron Trigger
BannerSchedulerService.runStatsRollup() runs nightly (BANNER_STATS_ROLLUP_CRON, default 0 1 * * *) and enqueues for the previous day (UTC):
const yesterday = new Date();
yesterday.setUTCDate(yesterday.getUTCDate() - 1);
const statDate = yesterday.toISOString().split("T")[0]; // "YYYY-MM-DD"
const jobId = `banner-rollup-${statDate}`;
await this.bannersQueue.add(
BannerJob.ROLLUP_STATS,
{ statDate, correlationId: `rollup-${statDate}` },
{ jobId },
);jobId ensures the rollup for a given date runs exactly once — safe to restart the cron without double-counting.
7.2 Processor: Aggregation Queries
BannerEventsProcessor.processRollupStats(payload):
const dayStart = new Date(`${statDate}T00:00:00.000Z`);
const dayEnd = new Date(`${statDate}T23:59:59.999Z`);
const [impressionRows, clickRows] = await Promise.all([
this.db.select({
bannerId: bannerImpressions.bannerId,
placementId: bannerImpressions.placementId,
campaignId: bannerImpressions.campaignId,
eventCount: count(),
})
.from(bannerImpressions)
.where(and(
gte(bannerImpressions.occurredAt, dayStart),
lte(bannerImpressions.occurredAt, dayEnd),
sql`${bannerImpressions.campaignId} IS NOT NULL`,
))
.groupBy(
bannerImpressions.bannerId,
bannerImpressions.placementId,
bannerImpressions.campaignId,
),
this.db.select({ ... })
.from(bannerClicks)
.where(and(
gte(bannerClicks.occurredAt, dayStart),
lte(bannerClicks.occurredAt, dayEnd),
sql`${bannerClicks.campaignId} IS NOT NULL`,
))
.groupBy(
bannerClicks.bannerId,
bannerClicks.placementId,
bannerClicks.campaignId,
),
]);Why filter campaign_id IS NOT NULL: the banner_stats unique constraint on (banner_id, placement_id, campaign_id, stat_date) cannot deduplicate rows where campaign_id IS NULL because PostgreSQL treats NULL values as distinct in unique indexes. ON CONFLICT DO UPDATE will not fire for null-campaign_id rows — it would silently create duplicate rows. Filtering at rollup time avoids this. Since the serving endpoint always supplies a campaignId context, analytics events in production always have a non-null campaignId.
7.3 Merge and Upsert
const statsMap = new Map<string, { ... }>();
for (const row of impressionRows) {
if (row.campaignId === null) continue;
const key = `${row.bannerId}:${row.placementId}:${row.campaignId}`;
statsMap.set(key, {
bannerId: row.bannerId,
placementId: row.placementId,
campaignId: row.campaignId,
statDate,
impressions: Number(row.eventCount),
clicks: 0,
});
}
for (const row of clickRows) {
if (row.campaignId === null) continue;
const key = `${row.bannerId}:${row.placementId}:${row.campaignId}`;
const existing = statsMap.get(key) ?? { /* impressions: 0, clicks: 0, ... */ };
existing.clicks = Number(row.eventCount);
statsMap.set(key, existing);
}
const statsValues = [...statsMap.values()];
if (statsValues.length === 0) {
this.logger.log(`Banner stats rollup skipped no_data statDate=${statDate} correlationId=${...}`);
return;
}
let upsertedRows = 0;
for (let index = 0; index < statsValues.length; index += ROLLUP_BATCH_SIZE /* 100 */) {
const batch = statsValues.slice(index, index + ROLLUP_BATCH_SIZE).map((v) => ({
bannerId: v.bannerId,
placementId: v.placementId,
campaignId: v.campaignId,
statDate,
impressions: v.impressions,
clicks: v.clicks,
updatedAt: now,
}));
await this.db.insert(bannerStats).values(batch).onConflictDoUpdate({
target: [bannerStats.bannerId, bannerStats.placementId, bannerStats.campaignId, bannerStats.statDate],
set: {
impressions: sql`EXCLUDED.impressions`,
clicks: sql`EXCLUDED.clicks`,
updatedAt: sql`NOW()`,
},
});
upsertedRows += batch.length;
}7.4 Overwrite Semantics (Idempotency)
ON CONFLICT DO UPDATE SET impressions = EXCLUDED.impressions uses overwrite semantics, not accumulate. This means:
- Running the rollup for the same
statDatetwice produces the same result (idempotent) - If a job fails midway and is retried, the re-run overwrites with the correct aggregated count
- If impression rows are backfilled for a past date, re-running the rollup for that date will correct
banner_stats
Accumulate semantics (impressions = banner_stats.impressions + EXCLUDED.impressions) would produce doubled counts on retry — silent data corruption.
8. Schema Details
banner_impressions and banner_clicks (append-only)
bigserialPKs (impression/click volumes can exceed integer range)- No unique constraint; append-only writes
- No UPDATE or DELETE in normal operation
- Deduplication is at the application layer via
correlationId→ BullMQjobId - Rows are never updated or deleted after insert; historical event data is permanent
banner_stats (daily rollup with upsert)
- Unique constraint:
UNIQUE (banner_id, placement_id, campaign_id, stat_date) - This constraint only deduplicates rows where
campaign_id IS NOT NULL NULLcampaign_idrows are excluded at rollup time (see section 7.2)
9. DLQ and Job Failure Handling
Failed jobs follow the global BullMQ configuration from bull.module.ts:
- Default retry count: configured per-queue in
resolveQueueDefaultJobOptions() - On max retry exhausted: job moves to the failed job list (BullMQ's built-in DLQ equivalent)
- BullBoard (
/admin/bullboard) shows failed jobs with stack traces - Manual retry available via BullBoard UI
SYNC_SCHEDULE_STATUS and ROLLUP_STATS jobs are safe to retry — both are idempotent:
- Status sync: re-running applies the same state transitions; already-transitioned records match no
WHEREconditions - Stats rollup: overwrite semantics ensure re-running produces correct totals
RECORD_IMPRESSION and RECORD_CLICK jobs: already-inserted rows on retry would create duplicate rows in banner_impressions/banner_clicks if the first attempt succeeded but the job was marked failed (network timeout after successful DB write). This is an acceptable tradeoff — impression analytics are best-effort, not billing-critical. Duplicate events are rare and represent < 0.1% of volume in practice.
10. Data Retention
No automatic purge is implemented in the current version.
| Table | Retention | Notes |
|---|---|---|
banner_impressions | Indefinite | Append-only; partition by occurred_at month for scale |
banner_clicks | Indefinite | Same |
banner_stats | Indefinite | Daily aggregates; small row count regardless of event volume |
Future: RETENTION_CLEANUP_CRON job to archive or delete banner_impressions/banner_clicks older than N days. Define N in env config when implemented.
11. Observability
| Signal | What to Watch |
|---|---|
BullBoard /admin/bullboard (BANNERS queue) | Failed jobs, queue depth, processing rate |
SYNC_SCHEDULE_STATUS job logs | totalChanged > 0 means banners/campaigns transitioned — check if expected |
ROLLUP_STATS job logs | Log statsMap.size = number of (banner, placement, campaign) combinations for the day |
| Redis memory | Watch for banners:serve:* key growth if TTL invalidation isn't working |
banner_stats row count | Should grow by statsMap.size per day; flat growth = rollup not running |
| Duplicate impressions | If correlationId dedup breaks, banner_impressions row count per banner spikes |
12. Architecture Sequence Diagram
Banners Targeting Engine
Previous Page
Starter Nest Backend Guidelines
This document explains how the `apps/api` service is structured, how to extend it safely, and how to write and test new backend code with NestJS. It is intentionally beginner-friendly—treat it as the playbook for everyday backend work in this repo.