Skip to main content

How to Summarize Real Time Event Data Using Siddhi CEP


These days I am working on a task to improve the performance of a wso2telco analytics which is based wso2 DAS. Our product does data summarizing using apache spark. When the data increases it hangs even though we have applied wso2 incremental processing.

Hence we decided to do one level data summarizing on real time as soon as data arrives at wso2 siddhi event processor. spark will do summarize per hour while siddhi will summarize per minute.

To summarize data in real time we can use siddhi window feature. it can hold the events arrived up to a specified time and then release all the accumulated events at once. Since the events received are released at once we can use summarizing methods to summarize them.

Following describes few attempts how I  tried to do this.

Attempt 01


This was the attempt suggested by wso2 support team

https://github.com/wso2/analytics-apim/blob/master/features/org.wso2.analytics.apim.feature/src/main/resources/template-manager/executionplans/APIMAnalytics-RequestSummarizer-RequestSummarizer-realtime1.siddhiql#L48
-- requests -- second
-- global
from requests#window.externalTimeBatch(requestTime, 1 sec, 0, 10 sec, true)
select requestTime as startTime, meta_clientType, consumerKey, context, api_version, api, version,
max(requestTime) as requestTime, userId, hostName, apiPublisher, count() as total_request_count,
resourceTemplate, method, applicationName, tenantDomain, userAgent, resourcePath, request, applicationId,
tier, throttledOut, clientIp, applicationOwner
group by consumerKey, context, api_version, userId, hostName, apiPublisher, resourceTemplate, method, userAgent, clientIp 
insert into perSecondRequestsTemp;

from perSecondRequestsTemp
select meta_clientType, (startTime - (startTime % 1000)) as startTime, convert((startTime - (startTime % 1000)), 'string') as facetStartTime,
time:extract(startTime, 'year') as year, time:extract(startTime, 'month') as month,
time:extract(startTime, 'day') as day, time:extract(startTime, 'hour') as hour,
time:extract(startTime, 'minute') as minute, time:extract(startTime, 'second') as second,
consumerKey, context, api_version, api, version,
requestTime, userId, hostName, apiPublisher, total_request_count,
resourceTemplate, method, applicationName, tenantDomain, userAgent, resourcePath, request, applicationId,
tier, throttledOut, clientIp, applicationOwner
insert into perSecondRequests;


-- minute
-- global
from perSecondRequests
select startTime, meta_clientType, str:concat(facetStartTime, '_', consumerKey, '_', context, '_',
    api_version, '_', userId, '_', hostName, '_', apiPublisher, '_', resourceTemplate, '_', method, '_', userAgent, '_', clientIp)
as uniqueId, year, month, day, hour, minute, second, consumerKey, context, api_version, api, version,
requestTime, userId, hostName, apiPublisher, total_request_count, resourceTemplate, method, applicationName,
tenantDomain, userAgent, resourcePath, request, applicationId, tier, throttledOut, clientIp, applicationOwner
insert into perSecondRequestsWithUniqueId;

from perSecondRequestsWithUniqueId#window.uniqueExternalTimeBatch(uniqueId, startTime, 60 sec, 59999 millisec, 10 sec, true)
select startTime, meta_clientType, consumerKey, context, api_version, api, version,
max(requestTime) as requestTime, userId, hostName, apiPublisher, sum(total_request_count) as total_request_count,
resourceTemplate, method, applicationName, tenantDomain, userAgent, resourcePath, request, applicationId, tier,
throttledOut, clientIp, applicationOwner
group by consumerKey, context, api_version, userId, hostName, apiPublisher, resourceTemplate, method, userAgent, clientIp 
insert into perMinuteRequestsTemp;

I did not try to this as this looked too complex than I expected. Thus I went looking for a simple solution.


Attempt 02


This was my first attempt written using window.externalTimeBatch

/* Enter a unique ExecutionPlan */
@Plan:name('ExecutionPlan-depAnalytics-perMinSummary2')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */


@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream processedevnts (api string, resourcePath string, method string, responseTime long, serviceTime long, serviceProvider string, apiPublisher string, applicationName string, requestId string, operatorId string, responseCode string, isSuccess bool, msisdn string, direction string, jsonBody string, serviceProviderId string, spUserId string, spConsumerKey string, errorMessageId string, errorText string, responseTimeRange string, year int, month int, day int, hour int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);

@Export('wso2telco.dep.analytics.perMinProcessedStatistics:1.0.0')
define stream minsummary (api string, serviceProvider string, apiPublisher string, applicationName string, operatorId string, isSuccess bool, msisdn string, direction string, totalCount long, serviceProviderId string, errorMessageId string, responseTimeRange string, year int, month int, day int, hour int, min int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);


@info('Query01')
from processedevnts #window.externalTimeBatch(responseTime, 60 sec)
select api , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction , cast(count(api),'long') as totalCount , serviceProviderId , errorMessageId ,responseTimeRange , year , month , day , hour , time:extract(responseTime,'minute')  as min , operatorName , apiPublisherID , apiID , department , applicationId
group by api , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction, serviceProviderId , errorMessageId, responseTimeRange , year , month , day , hour,operatorName , apiPublisherID , apiID , department , applicationId
insert into minsummary;

from minsummary#log('nuwan')
insert into minsummary2
The difficulty found with the solution was it calculates a minute time window only when an event arrives.

The second obstacle was, end of the window triggered only when another event arrives at the system. So calculation was totally based on the time stamp of the events received.

This approach didn't let me summarise data based on a minute on the clock.

Attempt 03


The third parameter of the below window tells the window to expire in given time. So I made it expire in one minute.
#window.externalTimeBatch(responseTime, 60 sec, 60sec)

This attempt expired the time window without depending on the next incoming event. Still, it had issues as the start time of the event depended on the events arrives.

Attempt 04


Consequently, I was looking for an option to expire time window based on the clock time. then I found

          #window.cron('0 0/1 * 1/1 * ? *')

This cron mode will expire the time window based on the clock time.

/* Enter a unique ExecutionPlan */
@Plan:name('ExecutionPlan-depAnalytics-perMinSummary')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */
@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream processedevnts (api string, resourcePath string, method string, responseTime long, serviceTime long, serviceProvider string, apiPublisher string, applicationName string, requestId string, operatorId string, responseCode string, isSuccess bool, msisdn string, direction string, jsonBody string, serviceProviderId string, spUserId string, spConsumerKey string, errorMessageId string, errorText string, responseTimeRange string, year int, month int, day int, hour int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);

@Export('wso2telco.dep.analytics.perMinProcessedStatistics:1.0.0')
define stream minsummary (api string, responseTime long , serviceProvider string, apiPublisher string, applicationName string, operatorId string, isSuccess bool, msisdn string, direction string, totalCount long, serviceProviderId string, errorMessageId string, responseTimeRange string, year int, month int, day int, hour int, min int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);

@info('Query01')
from processedevnts #window.cron('0 0/1 * 1/1 * ? *')
select api, max(responseTime) as responseTime , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction , cast(count(api),'long') as totalCount , serviceProviderId , errorMessageId ,responseTimeRange , year , month , day , hour , time:extract(responseTime,'minute')  as min , operatorName , apiPublisherID , apiID , department , applicationId
group by api , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction, serviceProviderId , errorMessageId, responseTimeRange , year , month , day , hour,operatorName , apiPublisherID , apiID , department , applicationId
insert into minsummary;

from minsummary#log('nuwan')
insert into minsummary2
Using this cron expression window will expire the captured events based on the clock time. Therefore you can use Attempt 04 as summarization method for real time data.

Explanation

One minute siddhi summary is been used to calculate the hourly summary records. external time windows will not be based on the clock time which will make us harder to map the minute summary to the hourly summary. For example window from 7:59:20 to 8:00:20 time, what is the hour it belongs to?
window.cron will make sure siddhi data window will be synced with the clock time. I have configured the job to start 00 seconds and end exactly after 60 seconds. this will help us to get the hourly summary easily from the minute summary.

The second point is if external time windows used instead of cron windows, windows will be created and completed only based on the events received to the stream. when the first event arrives window is created. The event received after the window completed will expire the event. Window expiration depends on the last event to be received to the stream. This will make issues in the accuracy of window expiration. 


Attempt 05

even though attempt 4 can releases events based on events I came across another problem where i see  duplicate events as window is working on real time data. Adding another timewindow to group them solved the issue.

/* Enter a unique ExecutionPlan */
@Plan:name('ExecutionPlan-depAnalytics-perMinSummary22')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */


@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream processedevnts (api string, resourcePath string, method string, responseTime long, serviceTime long, serviceProvider string, apiPublisher string, applicationName string, requestId string, operatorId string, responseCode string, isSuccess bool, msisdn string, direction string, jsonBody string, serviceProviderId string, spUserId string, spConsumerKey string, errorMessageId string, errorText string, responseTimeRange string, year int, month int, day int, hour int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);

@Export('wso2telco.dep.analytics.perMinProcessedStatistics:1.0.0')
define stream minsummaryReal (api string, responseTime long , serviceProvider string, apiPublisher string, applicationName string, operatorId string, isSuccess bool, msisdn string, direction string, totalCount long, serviceProviderId string, errorMessageId string, responseTimeRange string, year int, month int, day int, hour int, min int, operatorName string, apiPublisherID string, apiID string, department string, applicationId string);


from processedevnts
select

api, responseTime , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction, serviceProviderId , errorMessageId, responseTimeRange , year , month , day ,
hour, time:extract(responseTime, 'minute') as min, operatorName , apiPublisherID , apiID , department , applicationId



insert into processedevntsTemp;


@info('Query01')
from processedevntsTemp #window.cron('0 0/1 * 1/1 * ? *')
select api, max(responseTime) as responseTime , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction , cast(count(api),'long') as totalCount , serviceProviderId , errorMessageId ,responseTimeRange , year , month , day , hour ,  min , operatorName , apiPublisherID , apiID , department , applicationId
group by api , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction, serviceProviderId , errorMessageId, responseTimeRange , year , month , day , hour, min ,operatorName , apiPublisherID , apiID , department , applicationId
insert into perSecondRequestsTemp;


from perSecondRequestsTemp#window.timeBatch(5 sec)

select api, max(responseTime) as responseTime , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction , cast(count(api),'long') as totalCount , serviceProviderId , errorMessageId ,responseTimeRange , year , month , day , hour ,  min , operatorName , apiPublisherID , apiID , department , applicationId
group by api , serviceProvider , apiPublisher , applicationName , operatorId , isSuccess , msisdn , direction, serviceProviderId , errorMessageId, responseTimeRange , year , month , day , hour, min ,operatorName , apiPublisherID , apiID , department , applicationId
insert into minsummaryReal;

Comments

Post a Comment

Popular posts from this blog

Oracle Database 12c installation on Ubuntu 16.04

This article describes how to install Oracle 12c 64bit database on Ubuntu 16.04 64bit. Download software  Download the Oracle software from OTN or MOS or get a downloaded zip file. OTN: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit). edelivery: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit)   Unpacking  You should have following two files downloaded now. linuxamd64_12102_database_1of2.zip linuxamd64_12102_database_2of2.zip Unzip and copy them to \tmp\databases NOTE: you might have to merge two unzipped folders to create a single folder. Create new groups and users Open a terminal and execute following commands. you might need root permission. groupadd -g 502 oinstall groupadd -g 503 dba groupadd -g 504 oper groupadd -g 505 asmadmin Now create the oracle user useradd -u 502 -g oinstall -G dba,asmadmin,oper -s /bin/bash -m oracle You will prompt to set to password. set a momorable password and write it down. ...

DBCA : No Protocol specified

when trying to execute dbca from linux terminal got this error message. now execute the command xhost, you probably receiving No protocol specified xhost:  unable to open display ":0" issue is your user is not allowed to access the x server. You can use xhost to limit access for X server for security reasons. probably you are logged in as oracle user. switch back to default user and execute xhost again. you should see something like SI:localuser:nuwan solution is adding the oracle to access control list xhost +SI:localuser:oracle now go back to oracle user and try dbca it should be working

Slow CPU after resume / Ubuntu 16.04 LTS too slow after suspend and resume

you might experience a slow performance in ubuntu 16.04 after resuming back from a sleep or hibernate. reason for this is it doesn't move away from powersave mode automatically. intel_pstate driver is the one making this problem, so we have to disable it and use acpi-cpufreq  driver. first lets check weather your competer is on powersave.  cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor If your answer is powersave we have to move it to performance state. get your cpu info by cat /proc/cpuinfo | grep MHz  probabaly you seeing 4 cores lets update scaling governers     echo 'performance' > /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu1/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu2/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu3/cpufreq/scaling_g...