Skip to main content

ELK Logstash Filter writing for response time, request and response

ELK Logstash Filter writing for response time, request and response correlating

Now my elk setup is up and running and reads a log file and shows it on Kibana dashboard. basically there is no business logic was implemented.
The new business requirement was to write a csv file from a log file with request response and response time. and also show them on kibana. problem i faced here was request and response came as two separate events in the log file. so the requirement can be split as below.
  1. it should read the requests and responses from a file
  2. correlate them, as they not come in a single event
  3. calculate the response time
  4. create a csv file with request response and response time in same line
As we have already configured our system to read log records from a file, there’s no extra work to do to get both requests and responses to elastic-search.
we decided go with log-stash as it’s the tool recommended in elk stack to handle complex processing such as correlating events.
when we writing log-stash filters all of them are written in a custom file with conf extension and placed in /etc/logstash/conf.d/yourfilename.conf. logstash will accept any file in this folder with conf extension. conf file is seperated in to 3 parts as input filter and output.
Input
this defines how the input is received to log-stash. in my example file-beat should send log data to logstash:5044
input {
beats {
host => “localhost”
port => 5044
}
}
input for the our system is filebeat configured.
Output
this part specifies where to send data once the processing is done.
As the first step i wanted to create the csv file with whatever data i have. so i decide to use csv filter as second output method in our elk setup. this will make sure events emitted are published to both csv and elasticsearch server be used by kibana.
output {
elasticsearch {
hosts => [ “localhost:9200” ]
}
csv {
path => “csv_reports/transactions.csv”
fields => [“tid”,”user”,”api”]
}
}
This is the abc.conf file located in logstash conf.d folder. this output will make sure output is emitted to kibana and the given fields are emitted to the csv file.

Filter Implementation

This is where we can implement our logics to cater business requirement. there are many log-stash filters you can find on elastic.co site that can implement to satisfy various requirement. 

I will separate this part to two. first part will describe problems i faces when writing the solution and second part with final implementation.

Filters Part I Effort

  1. creating new events
my devlopment was done locally and i didn’t had access to the system making logs. so publishing new events done by coping and pasting same event again and again in to the same log file be reading. 
2. breaking log json in to fields
we can’t use grok filter to split json data as our given json format can be changes per request. kv filter helped us splitting json data. fileds can be extracred using ‘,’ and then value can be extracted using ‘:’. 
Later i had to stop splitting json body as this creates fields dynamcally and hence elasticsearch started creating fields dynamically which made creating columns dynamically. which uses a lot of data and space. 
again when trying use csv filter to write data to file, it requires fields to be configured which didn’t support in our schenario.
So conclusion was to not to split json at the moment.
3. send different fields to different outputs
if you need to send different fields to the outputs you can use clone feature to make a copy of the event and handle it separately. check the type as cloned 
i didnlt use this for my final implementation
4. grok filter didn’t process reponse data
this due to response and request had different log format. so desided to handle them in separate grok fileters.
5. handling complex data types in grok filter
data like uuid having a complex pattern where grok patterns like word had problems breaking it. when unknown data is available best mechanism to use is GREEDYDATA data type, which extracts all the data without validating. it will read till next grok pattern we have specified. in my case after the greedydata i have added comma which is available in the source. 
6. converring json to csv
i wanted to convert json into a csv like string. still workign on it.
7. adding a new field to an event
code => "event.set('request2', 'new add field testing')"
8. correlating request and responses
aggreagate fileter is used here. it helped us persisiting data from request and wait till response arrives. when response arrived request data is extracted from the map[] and send all as a single event.
9. if resonse didn’t return request is also missing
push_map_as_event_on_timeout => true 
this parameter helped us emitting request after given timeout value. 
10. distinguishing transaction is completed or timeout
added a new filed to the event with tranaction type. it has request full or reponse as types. correlated events are marked as full. 
11. events started duplicating
when request arrived it was perssisted in a map and also emmited as an event. to overcome this issue after event is persisted in the map event should be dropped before reaching to the end. this also helped removing unwanted fields like very long jwt token.
12. response arrives after timeout
This notified should be notified. so correlation happens but still record is visible as a separate record.
13. remove source
to save some space surce is removed after gone through grok filter.
14. getting resonse time
the given logstash filters didnt helped us gettig us response time. so we had to roby code base. so once the aggregated event is creaated, it was sent in to this filter to creaate the resonse time. 
time1.to_time.to_i- time1.to_time.to_i
initially used above method to get the reponse time but the output was zerro as this was for seconds. to calculate miliseconds had to use following method.
DateTime.parse(event.get('response-time')).to_time.to_f*1000
15. responses coming after timeout had null values
Responses coming after timeout had null values as values looking are in the requested which has been expired. 
16. someother events which has nothing to do with requests and responses stated capturing in the system.
checking wether the transaction id is null fixed the issue as those events didn’t had those id’s.
17. ruby code bug
ruby scripts had some issues of handling “-” characters and getting rid of solved the issues.

Filters Part II Final

this is the log entries created from the application. API_REQUEST_ID act as the way of correlating two events.
Request
[2018–06–22 11:36:19,886] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:request,API_REQUEST_ID:urn:uuid:e86a7610,APPLICATION_ID:9,API_NAME:smsmessaging,API_PUBLISHER:admin,API_VERSION:v1,API_CONTEXT:/smsmessaging/v1,USER_ID:admin,jwzToken:eyJ0,body: { “emailVerified”:”true”, “phoneNumber”:”0718783373", “phoneNumberCountryCode”:””, “zoneinfo”:””, “BillingSegment”:””, “isLostStolen”:””, “idHash”:””, “SubscriptionActivity”:””, “LengthOfTenure”:””, “pairingChange”:””, “isRoaming”:”No”, “roamingCountry”:”UK”, “isDivertSet”:””, “locationCountry”:””, “deviceChange”:””}
Response
[2018–06–22 11:36:19,997] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:response,API_REQUEST_ID:urn:uuid:e86a7610,body: { “msisdn”:”0718783373", “imsi”:”0718783373", “title”:”abc”, “firstName”:”Nuwan”, “lastName”:”Senanayake”, “dob”:””, “identificationType”:””, “identificationNumber”:””, “onBehalfOf”:””, “purchaseCategoryCode”:””, “accountType”:””, “ownerType”:””, “status”:””}
As you can see two log entry formats different. in order to apply logstash filleting format should be fixed. so we have to handle requests and responses separately. 
if [message] =~ "TRANSACTION:request" {
this condition checks whether request keyword exists in the log. 
Next step is to extract keyvalue pairs from the log record so that can process the record using logstash filters. Grok filter is used get this done as below.
grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" }
            }



filter {
if [message] =~ "TRANSACTION:request" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" }
            }
            aggregate { #store required data in a map
                task_id => "%{tid}"
                code => "
                    map['reqBody'] = event.get('body')
                    map['user'] = event.get('user')                    
                    map['application'] = event.get('application')
                    map['api'] = event.get('api')
                    map['timestamp'] = event.get('timestamp')
                "
                map_action => "create"
            }
            drop {}#drop the request before persisting, to save indexing space in elasticsearch server
        }
        if [message] =~ "TRANSACTION:response" {
            grok {
                match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}]  %{LOGLEVEL:level} \{%{DATA:logtype}} -  TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:response}" }
                remove_field => ["message"]  
            }
aggregate {                
                task_id => "%{tid}"
code => "
                    event.set('request', map['reqBody'])
                    event.set('user', map['user'])    
                    event.set('application', map['application'])
                    event.set('api', map['api'])
                    event.set('request-time', map['timestamp'])
                    event.set('transaction', 'full')     
                                    
                "
                map_action => "update"
                end_of_task => true
                push_map_as_event_on_timeout => true
                timeout => 120
                timeout_task_id_field => "tid"
                timeout_code => "
                    event.set('response','Response-timeout')
                    event.set('type','request-response')
                    event.set('transaction', 'request')
                "
            }
ruby {
                init => "require 'time'"
                code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); "
            }
        }
    }

Post Implementation for Production

Once our business requirement is satisfied there are some more things to do before send this to production. 
if [source] =~ “request-response-logger” {
this will make sure our filter is applied only to the logs read from “request-response-logger.log” file as this same log-stash instance is used to process some other logs as well.
mutate { replace => { “type” => “request-response” } }
there is a field cold type comes with the incoming event which is bit lengthy word. we will replace it with our text and will use later when doing output as shown in below code snippet.
if [type] == "request-response" {
 elasticsearch {
  hosts => [ "localhost:9200" ]
 }
 csv {
  path => "csv_reports/transactions.csv"
  fields => ["tid","user","api"]
 }
}

Comments

Popular posts from this blog

Oracle Database 12c installation on Ubuntu 16.04

This article describes how to install Oracle 12c 64bit database on Ubuntu 16.04 64bit. Download software  Download the Oracle software from OTN or MOS or get a downloaded zip file. OTN: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit). edelivery: Oracle Database 12c Release 1 (12.1.0.2) Software (64-bit)   Unpacking  You should have following two files downloaded now. linuxamd64_12102_database_1of2.zip linuxamd64_12102_database_2of2.zip Unzip and copy them to \tmp\databases NOTE: you might have to merge two unzipped folders to create a single folder. Create new groups and users Open a terminal and execute following commands. you might need root permission. groupadd -g 502 oinstall groupadd -g 503 dba groupadd -g 504 oper groupadd -g 505 asmadmin Now create the oracle user useradd -u 502 -g oinstall -G dba,asmadmin,oper -s /bin/bash -m oracle You will prompt to set to password. set a momorable password and write it down. ...

DBCA : No Protocol specified

when trying to execute dbca from linux terminal got this error message. now execute the command xhost, you probably receiving No protocol specified xhost:  unable to open display ":0" issue is your user is not allowed to access the x server. You can use xhost to limit access for X server for security reasons. probably you are logged in as oracle user. switch back to default user and execute xhost again. you should see something like SI:localuser:nuwan solution is adding the oracle to access control list xhost +SI:localuser:oracle now go back to oracle user and try dbca it should be working

Slow CPU after resume / Ubuntu 16.04 LTS too slow after suspend and resume

you might experience a slow performance in ubuntu 16.04 after resuming back from a sleep or hibernate. reason for this is it doesn't move away from powersave mode automatically. intel_pstate driver is the one making this problem, so we have to disable it and use acpi-cpufreq  driver. first lets check weather your competer is on powersave.  cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor If your answer is powersave we have to move it to performance state. get your cpu info by cat /proc/cpuinfo | grep MHz  probabaly you seeing 4 cores lets update scaling governers     echo 'performance' > /sys/devices/system/cpu/cpu0/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu1/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu2/cpufreq/scaling_governor     echo 'performance' > /sys/devices/system/cpu/cpu3/cpufreq/scaling_g...