Skip to main content

WSO2 Siddhi Advance Example

I would like to present you today how to write an advance siddhi query for wso2 das. I would skip basic steps hoping you are already familiar with siddhi event processing language and DAS server.


Siddhi - Is an event processing language created by WSO2. this language has a very similar behavior to SQL query language, except siddhi query triggered by a receiving event. siddhi syntaxes bit upside down when compared to traditional SQL.


DAS - DAS is the data analytic server introduced by WSO2.

Introduction


What my example doing is when DAS server is receiving an event it flows through my siddhi example.



DAS is connected to an external system and DAS receives events. this event has json body where event-type is hidden. based on the event type we will be pricing the event and pushing updated stream data to a new stream. so to make a result we will have to process the event using siddhi java extension.


Before explaining the siddhi query, let me explain you various functions I have used in the siddhi query

Lowercase


This function helps us to change the value to lower case.

str:lower(api) == 'payment'


Cast


The selected value can be cast to different values as follows. it can be a string or int.

cast(0.0 ,'double') as chargeAmount



Calling java extension


Following code will call java extension and loads data to the query returned from the java class.


from tempPaymentPreProcessedResponse  [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]

Null check  for select


When you making a select query and if you receiving null value instead of a double value it will break your code. so you have to make sure null value is not generated. but there is no straight forward to the mechanism to handle nulls. fortunately, you can use coalesce() method. it will accept any number of arguments and returns first nonnull parameter. so

str:coalesce(chargeAmount, 0.0) as chargeAmount

the query will return 0.0 if chargeAmount is null.

Null Check at Java Extension


Below code checks for null when calling java extension.

from tempPaymentPreProcessedResponse
    [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]



Calling javascript function


You can call a javascript function to ease code filteration.
you can call the java script as following,

[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]


This is the javascript function

define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {                                                                                                                                               
    var str1 = data[0];
    var str2 = data[1];
       var responce = false;
    if( str1 ){
        if((str1.length == str2) && (!isNaN(str1)) ){
            responce = true;
        }
    }                                                                                                                       
    return responce;
};



Loading data from MySQL table


@From(eventtable='rdbms', datasource.name='WSO2TELCO_RATE_DB', table.name='billingAccumelator')
define table billingAccumelator (
    api string,   
    totalAmount double
    );


This mechanism will call external MySQL table and load data to memory.


Updating Stream

There's no way of updating a record in Siddhi stream as far as I know. but if you pass the same record with the same primary key value it will update.

Log

Will print the whole stream in the log after printing "payment ............."

from getProcessedResponse#log('payment .............')



Flow


Let me explain the flow of the example. my main objective is to price the event received and have an accumulated total for the received event. this accumulated total will be used for the next incoming message. that means accumulated total to be shared among events. below is the basic flow what my siddhi is doing.


1. receive event from the publisher
2. filter payment API
3. get purchase code and amount from jsonbody
4. if the purchase code is a 12 digit number extract the merchant id
5. calll java extension to calculate the pricing for the record.
6. save the accumulated total in a MySQL table
7. push the priced record into a priced stream
8. push the accumulated record to an accumulated stream




Siddhi Query


Now I will explain you the simplified version of my siddhi query.

Import stream


@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
    api string,  applicationName string,  jsonBody string    );

this is the stream defined for receiving data for siddhi query. org.wso2telco.analytics.hub.stream.processedStatistics is the stream name referring.


Export stream


@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
    api string,
      chargeAmount double,
    merchant string,
    price double,
);

wso2telco.billing.processedStatistics is the stream to publish priced event.


Common data


this mechanism will store common data not required for the processing will store in the temp table and will use later.


define table commonData (    
    apiID string,
    applicationName string
    );



 Java script function


This function will find the passed parameter is a number and having given length.


define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
                                                                                                                                               
    var str1 = data[0];
    var str2 = data[1];
  
    var responce = false;
    if( str1 ){
        if((str1.length == str2) && (!isNaN(str1)) ){
            responce = true;
        }
    }                                                                                                                       
    return responce;
};




Filter payment APIs


This query can filter payment APIs.


@info(name = 'QUERY: filter payment')
from getProcessedResponse
    [str:lower(api) == 'payment']#log('payment .............')
select
        api,       
        chargeAmount ,
        jsonBody,       
        merchant
insert into tempPaymentPreProcessedResponse;



process json



This step will process json body using java extension. read the data from the input stream and output Java processed data.


@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
    [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
        api,        
        cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
        str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
         as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;


from tempPaymentPreProcessedResponse     [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]

Is used to filter records having amountTransactio value as not null.


cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount

will extract the charge amount value from jsonbody.


str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
         as purchaseCategoryCode

this step will extract purchaseCategoryCode from json body and replaces a '' if value is null.

Finally, inserts the record to new temp stream.



Select if purchase category code is 12 digit


This will use the javascript function to find the purchase category code is a length 12 number. 4 to 6 digits are considered as merchant id.


str:substr(purchaseCategoryCode, 0,3) as merchant

So will extract the value.


@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
        api,        
        chargeAmount ,
        str:substr(purchaseCategoryCode, 0,3) as merchant        
insert into tempPricingProcessedResponse;



Select if purchasecategory code is NOT 12 digit number


If the purchase category code is not a 12 digit long number onbehalf value is considered as merchant id.


@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
    [checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
        api,        
        chargeAmount ,
        cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string')  as merchant         
insert into tempPricingProcessedResponse;



Pricing


Now the pricing part comes


Join accumulated summary to the new event1


Now we need the common data to proceed. let's make an inner join with common data to processed data.



@info(name='QUERY: join accumelated summary to the new event1')
      from tempPricingProcessedResponse
        inner join commonData
          on tempPricingProcessedResponse.api == commonData.api
      select
        api,
        commonData.applicationName ,   
        chargeAmount ,   
        merchant
    insert into tempPricingProcessedResponseJoinCommon;



Join accumulated summary to the new event


Since the accumulated summary table can be null when the first record arrives this left join helps. so we will always have a record to process even if the accumulated record is null. price column will be place holder for the priced column.


@info(name='QUERY: join accumelated summary to the new event')
      from tempPricingProcessedResponseJoinCommon
        left outer join billingAccumelator
          on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api

      select
       
        str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
       
         applicationName,
        tempPricingProcessedResponseJoinCommon.api ,
   
        str:coalesce(chargeAmount, 0.0) as chargeAmount ,
   
        merchant ,
        0.0 as price
   
       
    insert into tempEventWithAccumelatedSummary;



Call siddhi billing extension


Call java extension to do the pricing.


@info(name='QUERY: call siddhi billing extension')
      from tempEventWithAccumelatedSummary
        #wso2TelcoHubAnalytics:getBillingProcessor()
          #log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
      select
        
        total,
       
        api ,
       
        applicationName ,
       
        chargeAmount ,
       
        merchant
        ,price
    insert into tempPricedEventTable;



Mysql record update


First, will delete the existing record and add a new record to make sure updating not to make an exception for the first record when no record found in the database.


@info(name='QUERY: delete accumelated record in mysql table')
      from tempPricedEventTable
        #log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
      select api
      delete billingAccumelator
      on billingAccumelator.api == api

add new record.


@info(name='QUERY: add new record to mysql table')
      from tempPricedEventTable
          #log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
      select
     
        api ,
       
   
        total as totalAmount
      insert into billingAccumelator; 
 


Push priced record to stream


Update the record to stream, so priced record is updated in the second stream.


@info(name='QUERY: push priced record to stream')
      from tempPricedEventTable
          #log('---------------4-priced record----------------------------------------------------------------------------------------------')
      select
   
        api ,
       
        applicationName ,
   
        chargeAmount ,
       
        merchant ,
        price
   
      insert into pricingProcessedResponse;



Code

Siddhi Query Complete Code


/* Enter a unique ExecutionPlan */
@Plan:name('ExecutionPlan-Pricing-Processor')

/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')

/* define streams/tables and write queries here ... */

/*
*****************************************************
**************************** PART 01 ****************
*****************************************************
*/

@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
    api string,  applicationName string,  jsonBody string    );

@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
    api string,
      chargeAmount double,
    merchant string,
    price double,
);
   

   
@From(eventtable='rdbms', datasource.name='WSO2TELCO_RATE_DB', table.name='billingAccumelator')
define table billingAccumelator (
    api string,   
    totalAmount double
    );

/* common columns will be added to final stream later in the coide*/
define table commonData (
    
    apiID string,
    applicationName string
    );
   
/*JavaScript function validates the given purchase category code is a 12 length number*/   
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
                                                                                                                                               
    var str1 = data[0];
    var str2 = data[1];
  
    var responce = false;
    if( str1 ){
        if((str1.length == str2) && (!isNaN(str1)) ){
            responce = true;
        }
    }                                                                                                                       
    return responce;
};






/*
*****************************************************
****************** PART 02 ****************
*****************************************************
*/


@info(name = 'QUERY: filter payment')
from getProcessedResponse
    [str:lower(api) == 'payment']#log('payment .............')
select
        api,       
        chargeAmount ,
        jsonBody,       
        merchant
insert into tempPaymentPreProcessedResponse;

@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
    [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
        api,        
        cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
        str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
         as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;

@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
        api,        
        chargeAmount ,
        str:substr(purchaseCategoryCode, 0,3) as merchant        
insert into tempPricingProcessedResponse;

@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
    [checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
        api,        
        chargeAmount ,
        cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string')  as merchant        
insert into tempPricingProcessedResponse;


/*
*****************************************************
********** PART 11 - pricing ************************
*****************************************************
*/

@info(name='QUERY: join accumelated summary to the new event1')
      from tempPricingProcessedResponse
        inner join commonData
          on tempPricingProcessedResponse.api == commonData.api
      select
        api,
        commonData.applicationName ,   
        chargeAmount ,   
        merchant
    insert into tempPricingProcessedResponseJoinCommon;

@info(name='QUERY: join accumelated summary to the new event')
      from tempPricingProcessedResponseJoinCommon
        left outer join billingAccumelator
          on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api

      select
       
        str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
       
         applicationName,
        tempPricingProcessedResponseJoinCommon.api ,
   
        str:coalesce(chargeAmount, 0.0) as chargeAmount ,
   
        merchant ,
        0.0 as price
   
       
    insert into tempEventWithAccumelatedSummary;

@info(name='QUERY: call siddhi billing extension')
      from tempEventWithAccumelatedSummary
        #wso2TelcoHubAnalytics:getBillingProcessor()
          #log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
      select
       
        total,
       
        api ,
       
        applicationName ,
       
        chargeAmount ,
       
        merchant
        ,price
    insert into tempPricedEventTable;

/*store accumelated billing total in a mysql table temporaly*/
@info(name='QUERY: delete accumelated record in mysql table')
      from tempPricedEventTable
        #log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
      select api
      delete billingAccumelator
      on billingAccumelator.api == api


 
/*store accumelated billing total in a mysql table temporaly*/ 
@info(name='QUERY: add new record to mysql table')
      from tempPricedEventTable
          #log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
      select
     
        api ,
       
   
        total as totalAmount
      insert into billingAccumelator; 
 


/*finally insert the proced record to wso2telco.billing.processedStatistic stream*/ 
@info(name='QUERY: push priced record to stream')
      from tempPricedEventTable
          #log('---------------4-priced record----------------------------------------------------------------------------------------------')
      select
   
        api ,
       
        applicationName ,
   
        chargeAmount ,
       
        merchant ,
        price
   
      insert into pricingProcessedResponse;

/******* END of ExecutionPlan *****/

                   

                    


Comments

Popular posts from this blog

Oracle Database 12c installation on Ubuntu 16.04

This article describes how to install Oracle 12c 64bit database on Ubuntu 16.04 64bit. Download software  Download the Oracle software from OTN or MOS or get a downloaded zip file. OTN: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit). edelivery: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit)   Unpacking  You should have following two files downloaded now. linuxamd64_12102_database_1of2.zip linuxamd64_12102_database_2of2.zip Unzip and copy them to \tmp\databases NOTE: you might have to merge two unzipped folders to create a single folder. Create new groups and users Open a terminal and execute following commands. you might need root permission. groupadd -g 502 oinstall groupadd -g 503 dba groupadd -g 504 oper groupadd -g 505 asmadmin Now create the oracle user useradd -u 502 -g oinstall -G dba,asmadmin,oper -s /bin/bash -m oracle You will prompt to set to password. set a momorable password and write it down. ...

DBCA : No Protocol specified

when trying to execute dbca from linux terminal got this error message. now execute the command xhost, you probably receiving No protocol specified xhost:  unable to open display ":0" issue is your user is not allowed to access the x server. You can use xhost to limit access for X server for security reasons. probably you are logged in as oracle user. switch back to default user and execute xhost again. you should see something like SI:localuser:nuwan solution is adding the oracle to access control list xhost +SI:localuser:oracle now go back to oracle user and try dbca it should be working

Slow CPU after resume / Ubuntu 16.04 LTS too slow after suspend and resume

you might experience a slow performance in ubuntu 16.04 after resuming back from a sleep or hibernate. reason for this is it doesn't move away from powersave mode automatically. intel_pstate driver is the one making this problem, so we have to disable it and use acpi-cpufreq  driver. first lets check weather your competer is on powersave.  cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor If your answer is powersave we have to move it to performance state. get your cpu info by cat /proc/cpuinfo | grep MHz  probabaly you seeing 4 cores lets update scaling governers     echo 'performance' > /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu1/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu2/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu3/cpufreq/scaling_g...