I would like to present you today how to write an advance siddhi query for wso2 das. I would skip basic steps hoping you are already familiar with siddhi event processing language and DAS server.
Siddhi - Is an event processing language created by WSO2. this language has a very similar behavior to SQL query language, except siddhi query triggered by a receiving event. siddhi syntaxes bit upside down when compared to traditional SQL.
DAS - DAS is the data analytic server introduced by WSO2.
What my example doing is when DAS server is receiving an event it flows through my siddhi example.
DAS is connected to an external system and DAS receives events. this event has json body where event-type is hidden. based on the event type we will be pricing the event and pushing updated stream data to a new stream. so to make a result we will have to process the event using siddhi java extension.
Before explaining the siddhi query, let me explain you various functions I have used in the siddhi query
This function helps us to change the value to lower case.
The selected value can be cast to different values as follows. it can be a string or int.
Following code will call java extension and loads data to the query returned from the java class.
from tempPaymentPreProcessedResponse [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
When you making a select query and if you receiving null value instead of a double value it will break your code. so you have to make sure null value is not generated. but there is no straight forward to the mechanism to handle nulls. fortunately, you can use coalesce() method. it will accept any number of arguments and returns first nonnull parameter. so
the query will return 0.0 if chargeAmount is null.
Below code checks for null when calling java extension.
You can call a javascript function to ease code filteration.
you can call the java script as following,
This is the javascript function
This mechanism will call external MySQL table and load data to memory.
Let me explain the flow of the example. my main objective is to price the event received and have an accumulated total for the received event. this accumulated total will be used for the next incoming message. that means accumulated total to be shared among events. below is the basic flow what my siddhi is doing.
1. receive event from the publisher
2. filter payment API
3. get purchase code and amount from jsonbody
4. if the purchase code is a 12 digit number extract the merchant id
5. calll java extension to calculate the pricing for the record.
6. save the accumulated total in a MySQL table
7. push the priced record into a priced stream
8. push the accumulated record to an accumulated stream
Now I will explain you the simplified version of my siddhi query.
@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
api string, applicationName string, jsonBody string );
this is the stream defined for receiving data for siddhi query. org.wso2telco.analytics.hub.stream.processedStatistics is the stream name referring.
@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
api string,
chargeAmount double,
merchant string,
price double,
);
wso2telco.billing.processedStatistics is the stream to publish priced event.
this mechanism will store common data not required for the processing will store in the temp table and will use later.
define table commonData (
apiID string,
applicationName string
);
This function will find the passed parameter is a number and having given length.
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
This query can filter payment APIs.
@info(name = 'QUERY: filter payment')
from getProcessedResponse
[str:lower(api) == 'payment']#log('payment .............')
select
api,
chargeAmount ,
jsonBody,
merchant
insert into tempPaymentPreProcessedResponse;
This step will process json body using java extension. read the data from the input stream and output Java processed data.
@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
api,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;
from tempPaymentPreProcessedResponse [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
Is used to filter records having amountTransactio value as not null.
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount
will extract the charge amount value from jsonbody.
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode
this step will extract purchaseCategoryCode from json body and replaces a '' if value is null.
Finally, inserts the record to new temp stream.
This will use the javascript function to find the purchase category code is a length 12 number. 4 to 6 digits are considered as merchant id.
So will extract the value.
@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
api,
chargeAmount ,
str:substr(purchaseCategoryCode, 0,3) as merchant
insert into tempPricingProcessedResponse;
If the purchase category code is not a 12 digit long number onbehalf value is considered as merchant id.
@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
api,
chargeAmount ,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string') as merchant
insert into tempPricingProcessedResponse;
Now the pricing part comes
Now we need the common data to proceed. let's make an inner join with common data to processed data.
@info(name='QUERY: join accumelated summary to the new event1')
from tempPricingProcessedResponse
inner join commonData
on tempPricingProcessedResponse.api == commonData.api
select
api,
commonData.applicationName ,
chargeAmount ,
merchant
insert into tempPricingProcessedResponseJoinCommon;
Since the accumulated summary table can be null when the first record arrives this left join helps. so we will always have a record to process even if the accumulated record is null. price column will be place holder for the priced column.
@info(name='QUERY: join accumelated summary to the new event')
from tempPricingProcessedResponseJoinCommon
left outer join billingAccumelator
on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api
select
str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
applicationName,
tempPricingProcessedResponseJoinCommon.api ,
str:coalesce(chargeAmount, 0.0) as chargeAmount ,
merchant ,
0.0 as price
insert into tempEventWithAccumelatedSummary;
Call java extension to do the pricing.
@info(name='QUERY: call siddhi billing extension')
from tempEventWithAccumelatedSummary
#wso2TelcoHubAnalytics:getBillingProcessor()
#log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
select
total,
api ,
applicationName ,
chargeAmount ,
merchant
,price
insert into tempPricedEventTable;
First, will delete the existing record and add a new record to make sure updating not to make an exception for the first record when no record found in the database.
@info(name='QUERY: delete accumelated record in mysql table')
from tempPricedEventTable
#log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
select api
delete billingAccumelator
on billingAccumelator.api == api
add new record.
@info(name='QUERY: add new record to mysql table')
from tempPricedEventTable
#log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
select
api ,
total as totalAmount
insert into billingAccumelator;
Update the record to stream, so priced record is updated in the second stream.
@info(name='QUERY: push priced record to stream')
from tempPricedEventTable
#log('---------------4-priced record----------------------------------------------------------------------------------------------')
select
api ,
applicationName ,
chargeAmount ,
merchant ,
price
insert into pricingProcessedResponse;
Siddhi - Is an event processing language created by WSO2. this language has a very similar behavior to SQL query language, except siddhi query triggered by a receiving event. siddhi syntaxes bit upside down when compared to traditional SQL.
DAS - DAS is the data analytic server introduced by WSO2.
Introduction
What my example doing is when DAS server is receiving an event it flows through my siddhi example.
DAS is connected to an external system and DAS receives events. this event has json body where event-type is hidden. based on the event type we will be pricing the event and pushing updated stream data to a new stream. so to make a result we will have to process the event using siddhi java extension.
Before explaining the siddhi query, let me explain you various functions I have used in the siddhi query
Lowercase
This function helps us to change the value to lower case.
str:lower(api) == 'payment'
Cast
The selected value can be cast to different values as follows. it can be a string or int.
cast(0.0 ,'double') as chargeAmount
Calling java extension
Following code will call java extension and loads data to the query returned from the java class.
from tempPaymentPreProcessedResponse [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
Null check for select
When you making a select query and if you receiving null value instead of a double value it will break your code. so you have to make sure null value is not generated. but there is no straight forward to the mechanism to handle nulls. fortunately, you can use coalesce() method. it will accept any number of arguments and returns first nonnull parameter. so
str:coalesce(chargeAmount, 0.0) as chargeAmount
the query will return 0.0 if chargeAmount is null.
Null Check at Java Extension
Below code checks for null when calling java extension.
from tempPaymentPreProcessedResponse
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
Calling javascript function
You can call a javascript function to ease code filteration.
you can call the java script as following,
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
This is the javascript function
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
Loading data from MySQL table
@From(eventtable='rdbms', datasource.name='WSO2TELCO_RATE_DB', table.name='billingAccumelator')
define table billingAccumelator (
api string,
totalAmount double
);
define table billingAccumelator (
api string,
totalAmount double
);
This mechanism will call external MySQL table and load data to memory.
Updating Stream
There's no way of updating a record in Siddhi stream as far as I know. but if you pass the same record with the same primary key value it will update.Log
Will print the whole stream in the log after printing "payment ............."
from getProcessedResponse#log('payment .............')
Flow
Let me explain the flow of the example. my main objective is to price the event received and have an accumulated total for the received event. this accumulated total will be used for the next incoming message. that means accumulated total to be shared among events. below is the basic flow what my siddhi is doing.
1. receive event from the publisher
2. filter payment API
3. get purchase code and amount from jsonbody
4. if the purchase code is a 12 digit number extract the merchant id
5. calll java extension to calculate the pricing for the record.
6. save the accumulated total in a MySQL table
7. push the priced record into a priced stream
8. push the accumulated record to an accumulated stream
Siddhi Query
Now I will explain you the simplified version of my siddhi query.
Import stream
@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
api string, applicationName string, jsonBody string );
this is the stream defined for receiving data for siddhi query. org.wso2telco.analytics.hub.stream.processedStatistics is the stream name referring.
Export stream
@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
api string,
chargeAmount double,
merchant string,
price double,
);
wso2telco.billing.processedStatistics is the stream to publish priced event.
Common data
this mechanism will store common data not required for the processing will store in the temp table and will use later.
define table commonData (
apiID string,
applicationName string
);
Java script function
This function will find the passed parameter is a number and having given length.
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
Filter payment APIs
This query can filter payment APIs.
@info(name = 'QUERY: filter payment')
from getProcessedResponse
[str:lower(api) == 'payment']#log('payment .............')
select
api,
chargeAmount ,
jsonBody,
merchant
insert into tempPaymentPreProcessedResponse;
process json
This step will process json body using java extension. read the data from the input stream and output Java processed data.
@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
api,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;
from tempPaymentPreProcessedResponse [(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
Is used to filter records having amountTransactio value as not null.
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount
will extract the charge amount value from jsonbody.
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode
this step will extract purchaseCategoryCode from json body and replaces a '' if value is null.
Finally, inserts the record to new temp stream.
Select if purchase category code is 12 digit
This will use the javascript function to find the purchase category code is a length 12 number. 4 to 6 digits are considered as merchant id.
str:substr(purchaseCategoryCode, 0,3) as merchant
So will extract the value.
@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
api,
chargeAmount ,
str:substr(purchaseCategoryCode, 0,3) as merchant
insert into tempPricingProcessedResponse;
Select if purchasecategory code is NOT 12 digit number
If the purchase category code is not a 12 digit long number onbehalf value is considered as merchant id.
@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
api,
chargeAmount ,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string') as merchant
insert into tempPricingProcessedResponse;
Pricing
Now the pricing part comes
Join accumulated summary to the new event1
Now we need the common data to proceed. let's make an inner join with common data to processed data.
@info(name='QUERY: join accumelated summary to the new event1')
from tempPricingProcessedResponse
inner join commonData
on tempPricingProcessedResponse.api == commonData.api
select
api,
commonData.applicationName ,
chargeAmount ,
merchant
insert into tempPricingProcessedResponseJoinCommon;
Join accumulated summary to the new event
Since the accumulated summary table can be null when the first record arrives this left join helps. so we will always have a record to process even if the accumulated record is null. price column will be place holder for the priced column.
@info(name='QUERY: join accumelated summary to the new event')
from tempPricingProcessedResponseJoinCommon
left outer join billingAccumelator
on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api
select
str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
applicationName,
tempPricingProcessedResponseJoinCommon.api ,
str:coalesce(chargeAmount, 0.0) as chargeAmount ,
merchant ,
0.0 as price
insert into tempEventWithAccumelatedSummary;
Call siddhi billing extension
Call java extension to do the pricing.
@info(name='QUERY: call siddhi billing extension')
from tempEventWithAccumelatedSummary
#wso2TelcoHubAnalytics:getBillingProcessor()
#log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
select
total,
api ,
applicationName ,
chargeAmount ,
merchant
,price
insert into tempPricedEventTable;
Mysql record update
First, will delete the existing record and add a new record to make sure updating not to make an exception for the first record when no record found in the database.
@info(name='QUERY: delete accumelated record in mysql table')
from tempPricedEventTable
#log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
select api
delete billingAccumelator
on billingAccumelator.api == api
add new record.
@info(name='QUERY: add new record to mysql table')
from tempPricedEventTable
#log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
select
api ,
total as totalAmount
insert into billingAccumelator;
Push priced record to stream
Update the record to stream, so priced record is updated in the second stream.
@info(name='QUERY: push priced record to stream')
from tempPricedEventTable
#log('---------------4-priced record----------------------------------------------------------------------------------------------')
select
api ,
applicationName ,
chargeAmount ,
merchant ,
price
insert into pricingProcessedResponse;
Code
Siddhi Query Complete Code
/* Enter a unique ExecutionPlan */
@Plan:name('ExecutionPlan-Pricing-Processor')
/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')
/* define streams/tables and write queries here ... */
/*
*****************************************************
**************************** PART 01 ****************
*****************************************************
*/
@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
api string, applicationName string, jsonBody string );
@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
api string,
chargeAmount double,
merchant string,
price double,
);
@From(eventtable='rdbms', datasource.name='WSO2TELCO_RATE_DB', table.name='billingAccumelator')
define table billingAccumelator (
api string,
totalAmount double
);
/* common columns will be added to final stream later in the coide*/
define table commonData (
apiID string,
applicationName string
);
/*JavaScript function validates the given purchase category code is a 12 length number*/
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
/*
*****************************************************
****************** PART 02 ****************
*****************************************************
*/
@info(name = 'QUERY: filter payment')
from getProcessedResponse
[str:lower(api) == 'payment']#log('payment .............')
select
api,
chargeAmount ,
jsonBody,
merchant
insert into tempPaymentPreProcessedResponse;
@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
api,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;
@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
api,
chargeAmount ,
str:substr(purchaseCategoryCode, 0,3) as merchant
insert into tempPricingProcessedResponse;
@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
api,
chargeAmount ,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string') as merchant
insert into tempPricingProcessedResponse;
/*
*****************************************************
********** PART 11 - pricing ************************
*****************************************************
*/
@info(name='QUERY: join accumelated summary to the new event1')
from tempPricingProcessedResponse
inner join commonData
on tempPricingProcessedResponse.api == commonData.api
select
api,
commonData.applicationName ,
chargeAmount ,
merchant
insert into tempPricingProcessedResponseJoinCommon;
@info(name='QUERY: join accumelated summary to the new event')
from tempPricingProcessedResponseJoinCommon
left outer join billingAccumelator
on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api
select
str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
applicationName,
tempPricingProcessedResponseJoinCommon.api ,
str:coalesce(chargeAmount, 0.0) as chargeAmount ,
merchant ,
0.0 as price
insert into tempEventWithAccumelatedSummary;
@info(name='QUERY: call siddhi billing extension')
from tempEventWithAccumelatedSummary
#wso2TelcoHubAnalytics:getBillingProcessor()
#log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
select
total,
api ,
applicationName ,
chargeAmount ,
merchant
,price
insert into tempPricedEventTable;
/*store accumelated billing total in a mysql table temporaly*/
@info(name='QUERY: delete accumelated record in mysql table')
from tempPricedEventTable
#log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
select api
delete billingAccumelator
on billingAccumelator.api == api
/*store accumelated billing total in a mysql table temporaly*/
@info(name='QUERY: add new record to mysql table')
from tempPricedEventTable
#log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
select
api ,
total as totalAmount
insert into billingAccumelator;
/*finally insert the proced record to wso2telco.billing.processedStatistic stream*/
@info(name='QUERY: push priced record to stream')
from tempPricedEventTable
#log('---------------4-priced record----------------------------------------------------------------------------------------------')
select
api ,
applicationName ,
chargeAmount ,
merchant ,
price
insert into pricingProcessedResponse;
/******* END of ExecutionPlan *****/
@Plan:name('ExecutionPlan-Pricing-Processor')
/* Enter a unique description for ExecutionPlan */
-- @Plan:description('ExecutionPlan')
/* define streams/tables and write queries here ... */
/*
*****************************************************
**************************** PART 01 ****************
*****************************************************
*/
@Import('org.wso2telco.analytics.hub.stream.processedStatistics:1.0.0')
define stream getProcessedResponse (
api string, applicationName string, jsonBody string );
@Export('wso2telco.billing.processedStatistics:1.0.0')
define stream pricingProcessedResponse (
api string,
chargeAmount double,
merchant string,
price double,
);
@From(eventtable='rdbms', datasource.name='WSO2TELCO_RATE_DB', table.name='billingAccumelator')
define table billingAccumelator (
api string,
totalAmount double
);
/* common columns will be added to final stream later in the coide*/
define table commonData (
apiID string,
applicationName string
);
/*JavaScript function validates the given purchase category code is a 12 length number*/
define function checkPurchaseCategoryCodeValidity[JavaScript] return bool {
var str1 = data[0];
var str2 = data[1];
var responce = false;
if( str1 ){
if((str1.length == str2) && (!isNaN(str1)) ){
responce = true;
}
}
return responce;
};
/*
*****************************************************
****************** PART 02 ****************
*****************************************************
*/
@info(name = 'QUERY: filter payment')
from getProcessedResponse
[str:lower(api) == 'payment']#log('payment .............')
select
api,
chargeAmount ,
jsonBody,
merchant
insert into tempPaymentPreProcessedResponse;
@info(name = 'QUERY: Creates a temporary stream for payment')
from tempPaymentPreProcessedResponse
[(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction') is null) == false]
select
api,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.amount'),'double') as chargeAmount ,
str:coalesce(cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.purchaseCategoryCode'),'string'),'')
as purchaseCategoryCode,merchant,jsonBody,
insert into tempPaymentJsonProcessedResponse;
@info(name = 'QUERY: Select if purchasecategory code is 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == true]
select
api,
chargeAmount ,
str:substr(purchaseCategoryCode, 0,3) as merchant
insert into tempPricingProcessedResponse;
@info(name = 'QUERY: Select if purchasecategory code is NOT 12 digit number')
from tempPaymentJsonProcessedResponse
[checkPurchaseCategoryCodeValidity(purchaseCategoryCode, 12 ) == false]
select
api,
chargeAmount ,
cast(wso2TelcoHubAnalytics:getJSONBody(jsonBody,'amountTransaction.onBehalfOf'),'string') as merchant
insert into tempPricingProcessedResponse;
/*
*****************************************************
********** PART 11 - pricing ************************
*****************************************************
*/
@info(name='QUERY: join accumelated summary to the new event1')
from tempPricingProcessedResponse
inner join commonData
on tempPricingProcessedResponse.api == commonData.api
select
api,
commonData.applicationName ,
chargeAmount ,
merchant
insert into tempPricingProcessedResponseJoinCommon;
@info(name='QUERY: join accumelated summary to the new event')
from tempPricingProcessedResponseJoinCommon
left outer join billingAccumelator
on tempPricingProcessedResponseJoinCommon.api == billingAccumelator.api
select
str:coalesce(billingAccumelator.totalAmount, 0.0) as total,
applicationName,
tempPricingProcessedResponseJoinCommon.api ,
str:coalesce(chargeAmount, 0.0) as chargeAmount ,
merchant ,
0.0 as price
insert into tempEventWithAccumelatedSummary;
@info(name='QUERY: call siddhi billing extension')
from tempEventWithAccumelatedSummary
#wso2TelcoHubAnalytics:getBillingProcessor()
#log('--------calling siddhi billing extension----------------------------------------------------------------------------------- ')
select
total,
api ,
applicationName ,
chargeAmount ,
merchant
,price
insert into tempPricedEventTable;
/*store accumelated billing total in a mysql table temporaly*/
@info(name='QUERY: delete accumelated record in mysql table')
from tempPricedEventTable
#log('-----------------1---delete-mysql-----------------------------------------------------------------------------------------')
select api
delete billingAccumelator
on billingAccumelator.api == api
/*store accumelated billing total in a mysql table temporaly*/
@info(name='QUERY: add new record to mysql table')
from tempPricedEventTable
#log('----------------2-to-mysql-table------------------------------------------------------------------------------------------')
select
api ,
total as totalAmount
insert into billingAccumelator;
/*finally insert the proced record to wso2telco.billing.processedStatistic stream*/
@info(name='QUERY: push priced record to stream')
from tempPricedEventTable
#log('---------------4-priced record----------------------------------------------------------------------------------------------')
select
api ,
applicationName ,
chargeAmount ,
merchant ,
price
insert into pricingProcessedResponse;
/******* END of ExecutionPlan *****/
Comments
Post a Comment