Home Contact

Event Series Intelligence: Esper & NEsper

Transaction Case Study

The Problem

The use case involves tracking three components of a transaction. Each component comes to the engine as an event with the following fields:

  • Transaction ID
  • Time stamp

In addition, we have the following extra fields.

In event A:

  • Customer ID

In event C:

  • Supplier ID (the ID of the supplier that the order was filled through)

We need to take in events A, B and C and produce a single, combined event with the following fields:

  • Transaction ID
  • Customer ID
  • Time stamp from event A
  • Time stamp from event B
  • Time stamp from event C

What we are doing here is matching the transaction IDs on each event, to form an aggregate event. If all these events were in a relational database, this could be done as a simple SQL join except that with 10,000 events per second, you will need some serious database hardware to do it.

Real-Time Summary Data

Further, we need to produce the following:

  • Min,Max,Average total latency from the events (difference in time between A and C) over the past 30 minutes.
  • Min,Max,Average latency grouped by (a) customer ID and (b) supplier ID. In other words, metrics on the the latency of the orders coming from each customer and going to each supplier.

Find Missing Events

We need to detect a transaction that did not make it through all three events. In other words, a transaction with events A or B, but not C. Note that, in this case, what we care about is event C. The lack of events A or B could indicate a failure in the event transport and should be ignored. Although the lack of an event C could also be a transport failure, it merits looking into.

Solution

Event Definitions

Events in Esper are represented via regular Java classes that expose JavaBean-style "getter" methods for access to event properties. Since all components of a transaction have a transaction id and timestamp field, we create a base event class with the common fields as below.

public class TxnEventBase {
    private String transactionId;
    private long timestamp;

    public TxnEventBase(String transactionId, long timestamp) {
            this.transactionId = transactionId;
            this.timestamp = timestamp;
    }

    public String getTransactionId() {
            return transactionId;
    }

    public long getTimestamp() {
            return timestamp;
    }
}

Event A in the 3-component transaction can now simply extend the TxnEventBase class and add the customer id field.

public class TxnEventA extends TxnEventBase {
    private String customerId;

    public TxnEventA(String transactionId, long timestamp, String customerId) {
        super(transactionId, timestamp);
        this.customerId = customerId;
    }

    public String getCustomerId() {
        return customerId;
    }
}

Detecting Combined Events

Now that we have defined the event classes, we can compose an EPL statement to detect combined events in which each component of the transaction is present. We restrict the event matching to the events that arrived within the last 30 minutes.

insert into CombinedEvent(transactionId, customerId, supplierId, latencyAC, latencyBC, latencyAB)
select C.transactionId, customerId, supplierId, 
       C.timestamp - A.timestamp, C.timestamp - B.timestamp, B.timestamp - A.timestamp 
  from TxnEventA.win:time(30 minutes) A,
       TxnEventB.win:time(30 minutes) B,
       TxnEventC.win:time(30 minutes) C 
 where A.transactionId = B.transactionId and B.transactionId = C.transactionId

This statement uses the insert into syntax to generate a CombinedEvent event stream. The next section uses that stream to derive realtime data on latencies between events.

Compiling Realtime Summary Data

We can now use the CombinedEvent stream to derive realtime summary data.

To derive the minimum, maximum and average total latency from the events (difference in time between A and C) over the past 30 minutes we can use the EPL below.

select min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC
  from CombinedEvent.win:time(30 minutes)

We can derive this data per customer and supplier id by specifying a group by clause.

select customerId, min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC
  from CombinedEvent.win:time(30 minutes)
 group by customerId

Finding Missing Events

An outer join allows us to detect a transaction that did not make it through all three events (see solution patterns for further ways to detect missing events).

select rstream * 
  from TxnEventA.win:time(30 minutes) A 
       full outer join TxnEventC.win:time(60 minutes) C on A.transactionId = C.transactionId
       full outer join TxnEventB.win:time(30 minutes) B on B.transactionId = C.transactionId
 where C.transactionId is null

When TxnEventA or TxnEventB events leave their respective time windows consisting of the last 30 minutes of events, Esper filters out rows in which no EventC row was found. The "rstream" keyword selects the remove stream, meaning events that leave the window (by default and without the rstream keyword a listener receives only insert stream events).

Using the API

Setting up the Engine

The Java code snippet below shows how an engine instance can be obtained. The Configuration is used to make the event classes known to the engine. This is optional and makes the EPL statements easier to read. Esper also supports fully-qualified Java class names in EPL statements.

Configuration configuration = new Configuration();
configuration.addEventTypeAlias("TxnEventA", TxnEventA.class.getName());
configuration.addEventTypeAlias("TxnEventB", TxnEventB.class.getName());
configuration.addEventTypeAlias("TxnEventC", TxnEventC.class.getName());

EPServiceProvider epService = EPServiceProviderManager.getProvider("TransactionExample", configuration);

Creating a Statement

The administrative interface on the engine creates statements:

String stmtSupplier = 
  "select supplierId, min(latencyAC) as minLatency, max(latencyAC) as maxLatency, avg(latencyAC) as avgLatency" +
  "from CombinedEvent.win:time(30 minutes) " +
  "group by supplierId";

EPStatement bySupplierStatement = epService.getEPAdministrator().createEPL(stmtSupplier);
bySupplierStatement.addListener(myListener);

Attaching Listeners

Listeners receive events published by statements and can query the received events. The listener interface receives event entering window(s) as new events (aka. istream or insert stream) and events as they are leaving window(s) as old events (aka. rstream or remove stream).

public class RealtimeSummaryTotalsListener implements UpdateListener {

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {

        EventBean event = newEvents[0];
        log.debug(" Totals minAC=" + event.get("minLatencyAC") +
                  " maxAC=" + event.get("maxLatencyAC") +
                  " avgAC=" + event.get("avgLatencyAC"));
    }
}

Sending Events

Finally, events are sent for processing by the engine via the runtime interface:

TxnEventA a = new TxnEventA("txnid1", 1, "customerId1");

epService.getEPRuntime().sendEvent(event);      

Running the Example

The complete example code can be found in the "examples" folder of the distribution in package com.espertech.esper.example.transaction. The example contains a transaction simulator that can be invoked from the command line. The readme file in the "examples/etc" folder contains build instructions and command line parameters to run the transaction simulator from the command line. You do not need to perform a build of the Esper engine to run the example, the Esper jar file has been included in the distribution.

Please consult the examples section in the reference documentation for more information.