From Batch Jobs to Event-Driven: How We Cut a 24-Hour Data Lag to Sub-Second


Waiting for events…
📝 Write path — events ingested🔍 Read path — queries served💀 DLQ — validation failure

The Problem

The platform we inherited was built around AWS Batch jobs. Three times a day, a job pulled roughly three million data points from three upstream APIs — device registrations, dealer assignments, alert histories — consolidated them into Snowflake, and triggered a second job to rebuild reporting views on top.

On top of that sat an AWS AppSync GraphQL API that surfaced the data to the product frontend.

The architecture did what it was designed to do. But it had two compounding problems.

24-hour data lag. If a technician registered a new device or reassigned it to a different dealer in the upstream system, that change wouldn’t appear in the product until the next batch window. For a connected product sold on the promise of real-time visibility, this was a material constraint.

Cost. The Snowflake bill alone was barely manageable. Three million rows a day, rebuilt views, cross-region egress — it added up.

Constraints

The platform was in production, serving commercial HVAC installations across multiple regions. A multi-week downtime window was off the table.

We also didn’t own the upstream systems. We could negotiate with their teams, but we couldn’t unilaterally change how they worked.

Getting Upstream Sources onto Kafka

The upstream systems already had internal event infrastructure — they emitted events for business operations like device registration, dealer assignment, and alert updates. They just weren’t publishing to Kafka.

The first piece of work was extending each source to publish to a shared Kafka cluster using a common topic naming convention:

device.{sourceType}.{action}

For example: device.infinity.dealer.setup, device.elt.dealer.assigned, device.ecobee.profile.updated.

The naming convention was deliberate. Topic-per-domain, not topic-per-service, keeps coupling loose. Every consumer subscribes to the events it cares about without any dependency on the producer’s internals.

What We Built

The event contract

Every Kafka message is validated at the consumer boundary using a ConcordEvent base class:

export abstract class ConcordEvent<T> {
  static get topic(): string {
    throw new Error('Subclass must implement static getter for topic');
  }

  public data: T;

  constructor(
    private schema: ZodSchema<T>,
    private payload: unknown,
  ) {
    this.validate();
    this.data = this.payload as T;
  }

  private validate() {
    try {
      return this.schema.parse(this.payload);
    } catch (error: any) {
      throw new ValidationError(error);
    }
  }
}

Each event type defines its own Zod schema and topic name:

export class InfinityDeviceWallCtrlDealerSetupEvent
  extends ConcordEvent<DeviceWallCtrlDealerSetupEventData> {

  static get topic(): string {
    return 'device.infinity.dealer.setup';
  }

  constructor(payload: unknown) {
    super(schema, payload);
  }
}

If a message doesn’t match the schema, it throws immediately — no silent data corruption downstream.

The consumers

Each NestJS microservice subscribes to relevant topics using @EventPattern:

@EventPattern(InfinityDeviceWallCtrlDealerSetupEvent.topic)
@TrackKafkaMetrics(InfinityDeviceWallCtrlDealerSetupEvent.topic)
public async handleInfinityDeviceWallCtrlDealerSetupEvent(
  @Payload() payload: unknown,
  @Ctx() context: KafkaContext,
) {
  try {
    const event = new InfinityDeviceWallCtrlDealerSetupEvent(payload);
    return await this.productSystemService.handleDeviceWallCtrlDealerSetup(event.data);
  } catch (error) {
    await this.errorHandler.handle(error, context);
  }
}

The @TrackKafkaMetrics decorator instruments each handler with throughput, latency, and error rate metrics per topic. Error handling is centralised — the ErrorHandlerService handles DLQ routing, logging, and alerting.

The shared SDK

Fifteen-plus services needed to query the same MongoDB collections. Rather than letting each team implement their own query logic, we built a shared concord-sdk with typed repositories:

export abstract class BaseRepository<
  TModel, TCreate, TWhere, TUpdate,
  TSelect, TInclude, TOrderBy, TGroupBy, TGroupByOutput
> {
  constructor(
    protected readonly prismaService: PrismaService,
    protected readonly collectionName: Prisma.ModelName,
  ) {}

  protected abstract getModel(): any;

  public async findById(
    id: string,
    args?: { select?: TSelect; include?: TInclude }
  ): Promise<TModel> {
    return this.getModel().findUniqueOrThrow({
      where: { id },
      ...(args || {}),
    });
  }

  public async upsert(
    data: TCreate,
    updateData: TUpdate,
    where: TWhere
  ): Promise<TModel> {
    return this.getModel().upsert({
      where,
      update: updateData,
      create: data,
    });
  }
}

Services extend BaseRepository<T> to get typed query methods for their domain. The Prisma schema becomes the source of truth for the data model across the entire platform — no inconsistent query logic across teams.

The serving layer

AppSync was replaced with gRPC NestJS microservices — one per domain (devices, alerts, customers, dealers) — fronted by a GraphQL gateway. The GraphQL API surface stayed familiar to the frontend team; the implementation underneath changed entirely.

The Migration

We didn’t cut over in one shot.

First, we migrated the full Snowflake dataset into MongoDB. Both systems ran in parallel — the old batch pipeline still writing to Snowflake, the new Kafka consumers writing to MongoDB.

Then we stood up two environments with separate URLs. The QA team validated both side-by-side — same queries, different backends — until they were satisfied the new system was producing consistent results.

Then we cut over. Hard switch, monitoring and alerts configured, hypercare sprints in place for urgent issues.

Outcome

  • Events that previously took up to 24 hours to propagate now appear in sub-second
  • Infrastructure costs reduced significantly — Snowflake and batch processing overhead eliminated
  • A shared SDK enforcing consistent data access patterns across 15+ services
  • A topic naming convention that has held up as the platform has grown

What I’d Do Differently

Start the SDK earlier. There was a period where services were writing their own Prisma queries before the SDK existed, and retrofitting them onto the shared repository pattern took longer than it should have.

Also: get the Kafka topic naming convention agreed and documented before the first consumer is written. Renaming topics after services are subscribed is painful.


Interested in event-driven architecture or platform engineering? Get in touch.