Browse Source

docs and c

Lennart Weller 6 years ago
11 changed files with 2260 additions and 16 deletions
  1. +1670
  2. +10
  3. +57
  4. +16
  5. +18
  6. +74
  7. +76
  8. +57
  9. +0
  10. +11
  11. +271

+ 1670
- 0
File diff suppressed because it is too large
View File

+ 10
- 0
doc/literature.bib View File

@@ -0,0 +1,10 @@
title={The {DEBS} 2013 {G}rand {C}hallenge},
author={Christopher Mutschler and Holger Ziekow and Zbigniew Jerzak},
booktitle={DEBS 2013: Proceedings of the 7th ACM International Conference on Distributed Event-Based Systems},
address = {Arlington, TX, USA},
month = {July},

+ 57
- 0
doc/paper.tex View File

@@ -0,0 +1,57 @@

\usepackage[ruled,lined,linesnumbered, noend]{algorithm2e}

\widowpenalty = 10000


\conferenceinfo{Event Processing}{'14, Application \& Middleware Systems (I13), TUM}

\title{Event Processing 2014: Course Project}

% Change according to your choice
\subtitle{Query B, F in case study 1 \& Query F in case study 2}

Lennart Weller\\
\email{} \\


In this course project I implemented two of the queries
given in the project description. I chose query B and F.
While query B is implemented in a plain C program, query F
was implemented with Apache Incubator Storm.





+ 16
- 0
doc/sections/attribution.tex View File

@@ -0,0 +1,16 @@
\textit{Properly, fill in the Table~\ref{tbl:attribution} and hand in a printout of this document in the oral exam.}
\hline Firstname Lastname & Attribution & Signature \\
\hline Lennart Weller& The complete implementation& \\
\caption{Attribution table.}

+ 18
- 0
doc/sections/conclusion.tex View File

@@ -0,0 +1,18 @@
The results from the evaluation of both implementations proved the obvious
point that a specialized implementation in a high performance language is a lot
faster then the more general applicabally written system.
Further performance improvements in the C implementation could be done but the
highest performance gain would come from changing the data source to faster
medium. Other improvements would be changing the data format of the source file
to a binary one to avoid the slow process of reading the integers byte by byte.
Which would also improve the performance of the Storm implementation as it
suffers from the same caveat.
In terms of modification and extensibility the Storm system should be
preferred, as adding a new processor requires only to add a new Bolt while in
the custom implementation the topology of calls has to be changed considerably
to allow more or parallel processes to be executed.

+ 74
- 0
doc/sections/evaluation.tex View File

@@ -0,0 +1,74 @@

The evaluation of the event processing system was done on two seperate machine
with different performance specifications which are recorded seperatly in the
tables below.

The processing time was calculated with the unix \texttt{time} command. For
this setup to work reliably the Storm implementation had to be changed to exit
after the last line from the file was read. As there is no way for a spout to
know when the processing of the events is finalized or for one of the bolts to
know when it processed the last event. This required to substract the start
time of the Storm instance and distributors from the final result to provide a
fair comparison.

The C implementation is written in a way which makes this unnecessary as the
program exits after the last line is read by design.

The specifications of System 1 are as follows:\\
CPU: Intel Core i5-3570 CPU @ 4.40GHz\\
Bogomips: 9200\\
RAM: 16GB DDR3\\
HDD: 130MBps/90MBps Read/Write
And the specifications for System 2 are as follows:\\
CPU: Intel Core i7-4600U CPU @ 2.10GHz\\
Bogomips: 5300\\
SDD: 540MBps/510MBps Read/Write

These tables \ref{tbl:resultsHDD}, \ref{tbl:resultsSSD} prove the
prediction made in Section \ref{sec:system}, that the performance of the
overall query processing highly relies on the I/O and lesser on the actual
performance of the rest of the system.

The queries themselves don't require a lot of computational power which results
in the system waiting for more input data. This would of course be the same
case in a real time event-based system. As we can see that even the slower
Storm/Java implementation finishes the 60 minutes of real-time data in less
then 10 seconds on a personal computer.

\hline \textsc{Query} & \textsc{System} & \textsc{time in ms} \\
Query b & Implementation in C& 3040 ms \\
Query f & Implementation in C& 3110 ms \\
Query f & Apache Storm & 9480 ms \\
\caption{Total processing time for each query on System 1}

\hline \textsc{Query} & \textsc{System} & \textsc{time in ms} \\
Query b & Implementation in C& 1560 ms \\
Query f & Implementation in C& 1610 ms \\
\caption{Total processing time for each query on System 2}

+ 76
- 0
doc/sections/queries.tex View File

@@ -0,0 +1,76 @@
\section{Query Implementation}
\subsection{Case study 1}
\subsubsection{Query B}
\textit{The query requires the system to output the total distance covered
by per player since the beginning of the match. The output interval is set
to 30 seconds event-time}
For this query I implemented sampling over a moving window to reduce inaccuracy
introduced by the high frequency of incoming events. The sampling count can be
set via a variable before compilation. It is set to 100 samples by default
which equals two events per second which are used for distance calculation, as
by the declaration of the data source (200Hz).
The distance calulation is done in a two dimensional plane with just the X and
Y coordinates of the last and current position being used. This avoids further
inaccuracy due to leg movement. The system also takes care of the doubled distance
of the player which is introduced by having two sensors for the player by dividing
by two for the output instead of keeping track of only one leg per player.
The system also keeps track of the current event timestamp which is used by the
system to output the current travel distance every 30 seconds according to the
timestamp. This is done to fullfill the requirements of the query definition.
Due to the execution of the script this is not very important as there are more
lines in the output buffer of \texttt{stdout} then the terminal can process at
any time during the process runtime.
\subsubsection{Query F}
\textit{The query requires the system to output an event line at every time
one of the keepers hands got close to the ball and therefore cleared the shot}
This query works differently from the Query B implementation and therefore
doesn't need sampling. It only records state changes for the given query. The
current position of the ball is always kept with maximal accuracy at 2000 Hz.
For the goalkeepers only the position of their hands are updated as the query
does not require the system to keep track of the leg position.
For every event pertaining to a sensor from a goalkeeper hand a check is done
if the current ball position is within a defined distance of the hand. For this
process a three dimensional distance calculation is done to provide better
accuracy. In case a near contact is found the counter for cleared balls is
increased at this point and the result i is broadcasted to \texttt{stdout} and
the current state is recorded to avoid further events for the same state.
The following events now check for the distance from the goalkeepers hands to
the current position of the ball. If it increases over a threshold the state is
changed again and the process is reset. This happens for example if the
goalkeeper dropped the ball for a kick or threw the ball away. The minimal
distance for the ball contact and loss is 30 centimeters to the hand in both
cases. This value is based on averaged minimal distance over the whole dataset.
The broadcast is done instantly unlike in the former implementation for Query B
which waited for the event timer to roll over.
\subsection{Case study 2}
\subsubsection{Query F}
\textit{The query requires the system to output an event line at every time
one of the keepers hands got close to the ball and therefore cleared the shot}
The query works in a similar way to the C implementation, as it is based on it.
Same as in the C implementation the position updates are done in the same way with
higher frequency for the ball and lower frequency updates for the goalkeeper hands.
The checks also work in the same way as in the C implementation. But there are
implementation differences based on the framework and the multi-threading of
the Storm framework.
The integer which keeps track of the cleared balls is provided by an
\texttt{AtomicInteger} to avoid issues with multi-threading, as the class
provides a simple lock mechanism for the integer which is held by the instance
processing the current event for the time of writing to the integer.
An additional change is the way the event is broadcasted. Due to the event
processor setup instead of writing the state directly to the \texttt{stdout}
the ``cleared ball`` event is instead passed to the output collector provided
by the Storm Bolt. Further work on the event is done outside of the processing

+ 57
- 0
doc/sections/system.tex View File

@@ -0,0 +1,57 @@
\section{System Overview}
\subsection{Case study 1}
The implementation was done in plain C99 requiring no library dependencies and
is highly optimized for performance. This achieves extremly low processing
times in total. The CSV file parsing is done line-by-line with the getline
function after which a tokenizer is used to parse the CSV entry. As the
standard libc tokenizer \texttt{strtok} is to general and slow for this project
a custom more specific implementation is used. Another caveat was the slow
string to integer conversion function \texttt{atoi} provided by the libc which
was also replaced by a faster function more specific to the given dataset.
The architecture of the application is written so that it can be easily
parallelized with pthreads or a similar threading implementations. A test
implementation with OpenMP was done to see if it would improve runtime, which
it did not and is therefore not part of the final submission. In general the
runtime of the application is highly dependent on the read speed of the
underlying hardware and kernel and not the software itself.
The line which was tokenized and converted into integers is stored in a data
structure and passed to the function handling the processing. Again this could
technically be parallelized but it provides no significant performance
increase. Before processing the information all data not relevant to the query
is dropped. The processing function depends on the compilation and either
executes Query B or Query F. Query B uses a sampling window of a specified
length to avoid inaccuracy given the high frequency of positioning updates.
While Query F uses different mechanisms to keep the output realistic.
Both queries then write to \texttt{stdout} with simple format strings for
updates as required by the query definition. Query B writes the current
information every 30 seconds event-time and Query F writes in case a new event
\subsection{Case study 2}
For the ESP system I chose the Apache Storm realtime computation framework.
The implementation is done in Java and consists of three main components which
all run on a local node of the Storm system. Distribution of workload among
different nodes is not enabled in this project.
The input source for the system is called CSVFileSpout. This class works in the
same manner as the \texttt{read\_data} function in the C implementation. As it
reads the source file line by line and extracts the integers of the event.
Those are similarly put into a data structure and send to the next handler.
There was no performance optimization done on this class.
The class which receives the events from the CSVFileSpout is the
GoalKeeperClearsBolt. This class contains the main work of the system and
processes the information into the form required by the query definition. All
data not concerning the query are dropped immediatly which speeds up the
processing considerably. The processing is done the same way as in the C
implementation in case study 1. The aggregated data is passed on to the next
handler defined in the topology.
The output class which receives the data from the processing bolt handles only
simple string format output to the console and does no processing on the

+ 0
- 16
pom.xml View File

@@ -14,20 +14,4 @@

+ 11
- 0
src/main/c/Makefile View File

@@ -0,0 +1,11 @@
all: query_b query_f

gcc --std=c99 -o query_b footballchallenge.c -lm -O3 -DQUERYB

gcc --std=c99 -o query_f footballchallenge.c -lm -O3

rm query_b
rm query_f

+ 271
- 0
src/main/c/footballchallenge.c View File

@@ -0,0 +1,271 @@
Real-time data available at
Compiler: gcc --std=c99 -o footballchallenge footballchallenge.c -lm -O3

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <inttypes.h>
#include <math.h>

typedef struct {
uint8_t sid;
uint64_t ts;
//position from (0,0) (mid-field)
uint16_t x;
uint16_t y;
uint16_t z;

uint32_t v;
uint32_t a;
uint16_t vx;
uint16_t vy;
uint16_t vz;
uint16_t ax;
uint16_t ay;
uint16_t az;
} __attribute__ ((packed)) sensor_t;

enum {

enum {

enum {

#define START_TIME 10753295594424116
#define END_TIME 14879639146403495
static const uint8_t ball_ids[] = {4,8,10,12};
static const uint8_t players[TEAM__COUNT][PLAYER__COUNT][SENSOR__COUNT] = {

#ifdef QUERYB
#define SAMPLE_SIZE 100
#define PICO_STEP 1000000000000

double travel_distance[TEAM__COUNT][PLAYER__COUNT] = {0};
uint8_t sample_counter[106 + 1] = {0};
void print_distance(uint64_t t);

void *read_data(void *b);
void *handle_event(void *b);
uint8_t in_field(sensor_t *s);
uint8_t get_team(uint8_t sid);
uint8_t get_player(uint8_t sid);

int64_t min_atoi(const char *);
double dst_circle(sensor_t *s1, sensor_t *s2);
double dst_sphere(sensor_t *s1, sensor_t *s2);
int64_t min_strtok(char *line, char **save);

sensor_t sensor_list[106 + 1];

int main(int argc, char **argv) {
if(argc < 2) return 1;
char *file_path = malloc(strlen(argv[1]));
strcpy(file_path, argv[1]);

#ifdef QUERYB
void print_distance(uint64_t time) {
for(uint8_t t = 0; t <= TEAM_B; ++t) {
for(uint8_t p = 0; p < PLAYER__COUNT; ++p) {
printf("%ld,%d,%.2lf\n", time, players[t][p][0], travel_distance[t][p] / 2000);

void *read_data(void *b) {
char *file_path = (char *)b;
FILE *f = fopen(file_path, "r");

char *line = NULL, *save;
ssize_t read;
size_t len = 0;
sensor_t *s = calloc (1, sizeof *s);
while ((read = getline(&line, &len, f)) != -1) {
s->sid = min_strtok(line, &save);
s->ts = min_strtok(NULL, &save);
if(START_TIME > s->ts || s->ts > END_TIME) continue;
s->x = min_strtok(NULL, &save);
s->y = min_strtok(NULL, &save);
s->z = min_strtok(NULL, &save);
s->v = min_strtok(NULL, &save);
s->a = min_strtok(NULL, &save);
s->vx = min_strtok(NULL, &save);
s->vy = min_strtok(NULL, &save);
s->vz = min_strtok(NULL, &save);
s->ax = min_strtok(NULL, &save);
s->ay = min_strtok(NULL, &save);
s->az = min_strtok(NULL, &save);
//memcpy(&sensor_list[s->sid], s, sizeof *s);


return NULL;

double dst_circle(sensor_t *s1, sensor_t *s2) {
return s1 && s2? sqrt(pow(s1->x - s2->x, 2) + pow(s1->y - s2->y, 2)): 0;

double dst_sphere(sensor_t *s1, sensor_t *s2) {
return s1 && s2? sqrt(pow(s1->x - s2->x, 2) + pow(s1->y - s2->y, 2) + pow(s1->z - s2->z, 2)): 0;

int64_t min_atoi(const char *str) {
int64_t num = 0;
uint8_t neg = 0;
if (*str == '-') {
neg = 1;
while (*str != '\0') {
num = (num*10) + (*str - '0');
return neg? -num: num;

int64_t min_strtok(char *line, char **save) {
if (line) *save = line;
char *p = strchr(*save, ',');
if (p) *p = 0;
int64_t num = min_atoi(*save);
if (p) *save = p+1;
return num;

#ifdef QUERYB
static uint16_t last_update = 0;
static sensor_t last_keeper = {0};
static sensor_t ball = {0};
static uint8_t keeper_counter[TEAM__COUNT] = {0};
#define MIN_DISTANCE 3500
void *handle_event(void *b) {
sensor_t *s = (sensor_t *)b;
if(!in_field(s)) return NULL;

#ifdef QUERYB
if(s->sid > 12 && s->sid < 97) {
uint8_t *counter = &sample_counter[s->sid];
if(*counter == SAMPLE_SIZE) {
travel_distance[get_team(s->sid)][get_player(s->sid)] += dst_circle(&sensor_list[s->sid], s);
memcpy(&sensor_list[s->sid], s, sizeof *s);
*counter = 0;
} else {

uint16_t cur = 0;
if (last_update + 30 < (cur = (s->ts - START_TIME) / PICO_STEP)) {
last_update = cur;
if(s->sid <= 12) {
ball = *s;

int8_t team = -1;
if(s->sid == 97 || s->sid == 98) {
if (last_keeper.sid && last_keeper.sid == s->sid) memcpy(&last_keeper, s, sizeof *s);
team = TEAM_A;
} else if (s->sid == 99 || s->sid == 100) {
if (last_keeper.sid && last_keeper.sid == s->sid) memcpy(&last_keeper, s, sizeof *s);
team = TEAM_B;

double distance = 0.0;
if (last_keeper.sid && (distance = dst_sphere(&last_keeper, &ball)) > MIN_DISTANCE)
last_keeper.sid = 0;

if (!last_keeper.sid && team >= 0) {
if ((distance = dst_sphere(s, &ball)) <= MIN_DISTANCE) {
last_keeper = *s;
printf("%ld,%d,%d\n", s->ts, s->sid, keeper_counter[team]);
return NULL;

uint8_t in_field(sensor_t *s) {
return (-54290 < s->x && s->x < 54290) && (-33945 < s->y && s->y < 33945);

uint8_t get_team(uint8_t sid) {
for(uint8_t t = 0; t < TEAM__COUNT; ++t)
for(uint8_t p = 0; p < PLAYER__COUNT; ++p)
for(uint8_t s = 0; s < SENSOR__COUNT; ++s)
if(players[t][p][s] == sid) return t;

uint8_t get_player(uint8_t sid) {
for(uint8_t t = 0; t < TEAM__COUNT; ++t)
for(uint8_t p = 0; p < PLAYER__COUNT; ++p)
for(uint8_t s = 0; s < SENSOR__COUNT; ++s)
if(players[t][p][s] == sid) return p;