Home Contact

Event Series Intelligence: Esper & NEsper

Additional Examples

This page summarizes the additional examples that come with the distribution and that are also available in source code.

A detailed description of each example can be found in the reference documentation.

Examples can be found in the "examples" folder of the distribution. Many examples are ready-to-run and provide scripts to compile and run the example from command line. Please see the readme file in the "examples/etc" folder for instructions.

JMS Server Shell with JMX extension

The server shell is a Java Messaging Service (JMS) -based server that listens to messages on a JMS destination, and sends the received events into Esper. The example also demonstrates a Java Management Extension (JMX) MBean that allows remote dynamic statement management.

The server shell is a low-latency processor for JMS byte messages. It employs JMS listeners to process message in multiple threads, this model reduces thread context switching for many JMS providers. The server is configurable and has been tested with two JMS providers. It consists of only 10 classes and is thus easy to understand.

The server shell sample comes with a client (server shell client) that sends events into the JMS-based server, and that also creates a statement on the server remotely through a JMX MBean proxy class.

J2EE Self-Service Terminal Management

This example has been described in the On Java article titled "Esper: Event Stream Processing and Correlation".

The example is about a J2EE-based self-service terminal managing system in an airport that gets a lot of events from connected terminals.

The example code in the distribution package implements a J2EE message-driven enterprise Java bean (MDB EJB). We used an MDB as a convenient place for processing incoming events via a JMS message queue or topic.

Assets Moving Across Zones - An RFID Example with a Swing GUI

This example out of the RFID domain processes location report events. Each location report event indicates an asset id and the current zone of the asset. The example solves the problem that when a given set of assets is not moving together from zone to zone, then an alert must be fired.

This example provides a Swing-based GUI that can be run from the command line. The GUI allows drag-and-drop of three RFID tags that form one asset group from zone to zone.

AutoID RFID Reader generating XML documents

In this example an array of RFID readers sense RFID tags as pallets are coming within the range of one of the readers. A reader generates XML documents with observation information such as reader sensor ID, observation time and tags observed. A statement computes the total number of tags per reader sensor ID within the last 60 seconds.

StockTicker

The multithreaded StockTicker example comes from the stock trading domain. The example creates event patterns to filter stock tick events based on price and symbol. When a stock tick event is encountered that falls outside the lower or upper price limit, the example simply displays that stock tick event. The price range itself is dynamically created and changed. This is accomplished by an event patterns that searches for another event class, the price limit event.

MatchMaker

In the MatchMaker example every mobile user has an X and Y location, a set of properties (gender, hair color, age range) and a set of preferences (one for each property) to match. The task of the event patterns created by this example is to detect mobile users that are within proximity given a certain range, and for which the properties match preferences.

QualityOfService

This example develops code for measuring quality-of-service levels such as for a service-level agreement (SLA).

The example measures and monitors operation latency and error counts per customer and operation. When one of our operations oversteps these constraints, we want to be alerted right away. Additionally, we would like to have some monitoring in place that checks the health of our service and provides some information on how the operations are used.

DEBS 2011 Challenge - The Trivia Geeks Club game

Introduction

The Distributed Event-Based Systems 2011 conference raised an interesting challenge, the Trivia Geeks Club: A Social Game.

We implemented a solution to the challenge that is outlined here.

Problem

There are three event producers:

  • Trivia question generator which generates the trivia question and emits an event with trivia question and answer, the question (without the correct answer of course) is sent to all subscribers;
  • Player which can generate events of three type: answer, answer annulment, and request for most frequent answer, in that case the system generates an answer to this request and sends it as an event to the player;
  • The system that generates control events that trigger the changes in some scoring rules.

    The event consumers are:

  • A scoreboard manager which displays points for all players on a scoreboard, the system sends each point increase/decrease event to that consumer;
  • A Player, getting response to a most frequent answer request.

    The system functions are:

  • Get new questions;
  • Get answers; Determine if the question is still valid, and if yes match against the answer, and create score event if applicable;
  • Enable annulment of answers.

    The scoring system creates score event with points for player according to the following scoring table:

    Case Score
    Correct answer 5
    Correct answer after asking for the most frequent answer 1
    First who answered 100
    Incorrect answer -1
    Three answers incorrect without a correct answer in the middle -50
    Correct answers to 10 consecutive questions 500
    Correct answers to 10 questions within 30 minutes* during late night hours (1:00 – 5:00) 500

Solution

The solution in EPL is below. No user-defined function or other solution-specific Java code is needed.

/**
* Input Events
*/
create schema TriviaQuestion (questionId string, question string, questionTime long, answer string);
create schema PlayerAnswer (playerId string, questionId string, answer string, clientAnswerTime long);
create schema PlayerFARequest (playerId string, questionId string);
create schema PlayerAnnulment (playerId string, questionId string, annulTime long);
create schema UpdateScore (questionId string);
create schema ChangeRule(ruleId string, points int);

/**
* Outgoing events.
*/
create schema PlayerFAResponse (playerId string, questionId string, answerFA string);
@Name('Outgoing-PlayerFAResponse') select * from PlayerFAResponse;

/**
* Scoring variables to handle rule changes.
*/
create variable int NUM_PTS_FIRST_WHO_ANSWERED = 100;

/**
* Handle rule change
*/ 
on ChangeRule(ruleId = "1") set NUM_PTS_FIRST_WHO_ANSWERED = points;

/**
*  Named windows to retain information on questions, players and answers.
*/
@Name('Score window')
create window PlayerScoreWindow.std:unique(playerId) as (playerId string, score int);

@Name('Player-answer window')
create window PlayerAnswerWindow.win:time(5 minutes) as (playerId string, questionId string, questionTime long, answer string, clientAnswerTime long, hasReceivedFA boolean);

@Name('Question-fastest-correct-answer window')
create window PlayerFastestAnswerWindow.std:unique(questionId) as (questionId string, playerId string);

@Name('Question window')
create window QuestionWindow.win:time(1 hour) as TriviaQuestion;

@Name('Player answer history window')
create window PlayerAnswerHistoryWindow.win:keepall() as (playerId string, questionId string, correct boolean, bonusEligible boolean, questionTime long);

/**
* Reacting to Question.
*/
@Name('Save New Question')
on TriviaQuestion tq merge QuestionWindow qt
where tq.questionId = qt.questionId
when not matched then insert select *;

/**
* Reacting to Answer.
*/
@Name('Keep the answer')
on PlayerAnswer pa merge PlayerAnswerWindow paw
where pa.playerId = paw.playerId and pa.questionId = paw.questionId
when not matched then insert select questionId, (select questionTime from QuestionWindow where questionId = pa.questionId) as questionTime, playerId, answer, clientAnswerTime, false as hasReceivedFA
when matched and paw.answer is null then update set paw.answer = pa.answer, paw.clientAnswerTime = pa.clientAnswerTime;

/**
* Reacting to Request-FA.
*/
@Name('Keep the fact that a user requested frequent-answers (FA)')
on PlayerFARequest prfa merge PlayerAnswerWindow paw
where prfa.playerId = paw.playerId and prfa.questionId = paw.questionId
when not matched then insert select questionId, (select questionTime from QuestionWindow where questionId = prfa.questionId) as questionTime, playerId, null as answer, 0L as clientAnswerTime, true as hasReceivedFA
when matched then update set hasReceivedFA = true;

@Name('Return most frequent answer to player')
on PlayerFARequest prfa insert into PlayerFAResponse select playerId, questionId, PlayerAnswerWindow(questionId = prfa.questionId).mostFrequent(a=>answer) as answerFA;

/**
* Reacting to Annulment.
*/
@Name('Annul a players answer')
on PlayerAnnulment pa update PlayerAnswerWindow paw set answer = null, clientAnswerTime = null
where pa.playerId = paw.playerId and pa.questionId = paw.questionId and annulTime <= (QuestionWindow(questionId = pa.questionId).firstof().questionTime + 30000);

/**
* Score computation based on time passing.
*/
@Name('Score computation time-based trigger')
insert into TriggerScore
select q.questionId as questionId, q.answer as correctAnswer, q.questionTime as questionTime
from pattern [every q=TriviaQuestion->UpdateScore(questionId = q.questionId)];

@Name('Compute player that responsed first')
@Priority(1)
insert into PlayerFastestAnswerWindow
select questionId, PlayerAnswerWindow(questionId = ts.questionId).where(x => answer = correctAnswer).minBy(x => clientAnswerTime).playerId as playerId
from TriggerScore ts;

@Name('Score computation trigger for each player that submitted an answer')
insert into TriggerPlayerScore
select paw.*, correctAnswer
from TriggerScore ts unidirectional, PlayerAnswerWindow paw
where ts.questionId = paw.questionId and paw.answer is not null;

@Name('Insert into player history')
@Priority(1)
on TriggerPlayerScore as tps
merge PlayerAnswerHistoryWindow pahw where tps.playerId = pahw.playerId and tps.questionId = pahw.questionId
when not matched then insert select playerId, questionId, (correctAnswer = answer) as correct, true as bonusEligible, questionTime;

// The scoring system creates score event with points for player according to the following scoring table:
// Correct answer       5
// Correct answer after asking for the most frequent answer     1
// First who answered   100
// Incorrect answer     -1
// Three answers incorrect without a correct answer in the middle       -50
// Correct answers to 10 consecutive questions*         500
// Correct answers to 10 questions within 30 minutes* during late night hours (1:00 – 5:00)     500
// *: each correct answer is counted towards a single bonus of the same type and cannot be counted twice.
// **: If there are several players that are tied in one of the "best" categories, each of them receives the bonus of 1000 points.
 
@Name('Compute score for a player')
@Priority(0)
expression last9EligibleAnswers {
  p => PlayerAnswerHistoryWindow(PlayerAnswerHistoryWindow.playerId = p.playerId).takeLast(9).where(pah => correct and bonusEligible)
}
expression last9QuestionIds {
  p => QuestionWindow(QuestionWindow.questionTime < p.questionTime).takeLast(9).selectFrom(q => q.questionId)
}
expression nowAt1AM {
  current_timestamp.withTime(1, 0, 0, 0)
}
expression nowAt5AM {
  current_timestamp.withTime(5, 0, 0, 0)
}
expression computeScore {
  tps => case
  when answer = correctAnswer and hasReceivedFA
    then new { points = 1, removeEligible = false }
  when answer = correctAnswer and last9EligibleAnswers(tps).countof() = 9 and last9EligibleAnswers(tps).selectFrom(a => a.questionId).sequenceEqual(last9QuestionIds(tps))
    then new { points = 500, removeEligible = true }
  when answer = correctAnswer and last9EligibleAnswers(tps).countof() = 9
        and last9EligibleAnswers(tps).allOf(a => a.questionTime between nowAt1AM() and nowAt5AM())
        and tps.questionTime.minus(30 min) >= last9EligibleAnswers(tps).min(a => a.questionTime)
    then new { points = 500, removeEligible = true }
  when answer = correctAnswer and exists(select * from PlayerFastestAnswerWindow as pfaw where pfaw.playerId = tps.playerId and pfaw.questionId = tps.questionId)
    then new { points = 5 + NUM_PTS_FIRST_WHO_ANSWERED, removeEligible = false }
  when answer = correctAnswer
    then new { points = 5, removeEligible = false }
  when answer != correctAnswer and PlayerAnswerHistoryWindow(PlayerAnswerHistoryWindow.playerId = tps.playerId).takeLast(2).where(x => x.correct = false).countof() = 2
    then new { points = -51, removeEligible = false }
  when answer != correctAnswer
    then new { points = -1, removeEligible = false }
  else new { points = 0, removeEligible = false }
  end
}
insert into PlayerScoreUpdate select playerId, questionId, computeScore(tps) as scoredPoints from TriggerPlayerScore tps;

@Name('Merge score to total player score')
on PlayerScoreUpdate as psu
merge PlayerScoreWindow as psw where psu.playerId = psw.playerId
when not matched then insert select playerId, scoredPoints.points as score
when matched then update set psw.score = psw.score + scoredPoints.points;

@Name('Mark players question answer as ineligible for a bonus')
on PlayerScoreUpdate(scoredPoints.removeEligible) psu update PlayerAnswerHistoryWindow pahw set bonusEligible = false where pahw.playerId = psu.playerId and pahw.questionId = psu.questionId;

Running the Example

The complete example code can be found in the "examples" folder of the distribution under "trivia". The "etc" folder contains a requirements document, test specifications and the EPL solution file. The "src" folder contains simulation code, parsing of the test specification and unit test code.