ELK Logstash Filter writing for response time, request and response correlating
Now my elk setup is up and running and reads a log file and shows it on Kibana dashboard. basically there is no business logic was implemented.
The new business requirement was to write a csv file from a log file with request response and response time. and also show them on kibana. problem i faced here was request and response came as two separate events in the log file. so the requirement can be split as below.
- it should read the requests and responses from a file
- correlate them, as they not come in a single event
- calculate the response time
- create a csv file with request response and response time in same line
As we have already configured our system to read log records from a file, there’s no extra work to do to get both requests and responses to elastic-search.
we decided go with log-stash as it’s the tool recommended in elk stack to handle complex processing such as correlating events.
when we writing log-stash filters all of them are written in a custom file with conf extension and placed in /etc/logstash/conf.d/yourfilename.conf. logstash will accept any file in this folder with conf extension. conf file is seperated in to 3 parts as input filter and output.
Input
this defines how the input is received to log-stash. in my example file-beat should send log data to logstash:5044
input {
beats {
host => “localhost”
port => 5044
}
}
input for the our system is filebeat configured.
Output
this part specifies where to send data once the processing is done.
As the first step i wanted to create the csv file with whatever data i have. so i decide to use csv filter as second output method in our elk setup. this will make sure events emitted are published to both csv and elasticsearch server be used by kibana.
output {
elasticsearch {
hosts => [ “localhost:9200” ]
}
csv {
path => “csv_reports/transactions.csv”
fields => [“tid”,”user”,”api”]
}
}
This is the abc.conf file located in logstash conf.d folder. this output will make sure output is emitted to kibana and the given fields are emitted to the csv file.
Filter Implementation
This is where we can implement our logics to cater business requirement. there are many log-stash filters you can find on elastic.co site that can implement to satisfy various requirement.
I will separate this part to two. first part will describe problems i faces when writing the solution and second part with final implementation.
Filters Part I Effort
- creating new events
my devlopment was done locally and i didn’t had access to the system making logs. so publishing new events done by coping and pasting same event again and again in to the same log file be reading.
2. breaking log json in to fields
we can’t use grok filter to split json data as our given json format can be changes per request. kv filter helped us splitting json data. fileds can be extracred using ‘,’ and then value can be extracted using ‘:’.
Later i had to stop splitting json body as this creates fields dynamcally and hence elasticsearch started creating fields dynamically which made creating columns dynamically. which uses a lot of data and space.
again when trying use csv filter to write data to file, it requires fields to be configured which didn’t support in our schenario.
So conclusion was to not to split json at the moment.
3. send different fields to different outputs
if you need to send different fields to the outputs you can use clone feature to make a copy of the event and handle it separately. check the type as cloned
i didnlt use this for my final implementation
4. grok filter didn’t process reponse data
this due to response and request had different log format. so desided to handle them in separate grok fileters.
5. handling complex data types in grok filter
data like uuid having a complex pattern where grok patterns like word had problems breaking it. when unknown data is available best mechanism to use is GREEDYDATA data type, which extracts all the data without validating. it will read till next grok pattern we have specified. in my case after the greedydata i have added comma which is available in the source.
6. converring json to csv
i wanted to convert json into a csv like string. still workign on it.
7. adding a new field to an event
code => "event.set('request2', 'new add field testing')"
8. correlating request and responses
aggreagate fileter is used here. it helped us persisiting data from request and wait till response arrives. when response arrived request data is extracted from the map[] and send all as a single event.
9. if resonse didn’t return request is also missing
push_map_as_event_on_timeout => true
this parameter helped us emitting request after given timeout value.
10. distinguishing transaction is completed or timeout
added a new filed to the event with tranaction type. it has request full or reponse as types. correlated events are marked as full.
11. events started duplicating
when request arrived it was perssisted in a map and also emmited as an event. to overcome this issue after event is persisted in the map event should be dropped before reaching to the end. this also helped removing unwanted fields like very long jwt token.
12. response arrives after timeout
This notified should be notified. so correlation happens but still record is visible as a separate record.
13. remove source
to save some space surce is removed after gone through grok filter.
14. getting resonse time
the given logstash filters didnt helped us gettig us response time. so we had to roby code base. so once the aggregated event is creaated, it was sent in to this filter to creaate the resonse time.
time1.to_time.to_i- time1.to_time.to_i
initially used above method to get the reponse time but the output was zerro as this was for seconds. to calculate miliseconds had to use following method.
DateTime.parse(event.get('response-time')).to_time.to_f*1000
15. responses coming after timeout had null values
Responses coming after timeout had null values as values looking are in the requested which has been expired.
16. someother events which has nothing to do with requests and responses stated capturing in the system.
checking wether the transaction id is null fixed the issue as those events didn’t had those id’s.
17. ruby code bug
ruby scripts had some issues of handling “-” characters and getting rid of solved the issues.
Filters Part II Final
this is the log entries created from the application. API_REQUEST_ID act as the way of correlating two events.
Request [2018–06–22 11:36:19,886] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:request,API_REQUEST_ID:urn:uuid:e86a7610,APPLICATION_ID:9,API_NAME:smsmessaging,API_PUBLISHER:admin,API_VERSION:v1,API_CONTEXT:/smsmessaging/v1,USER_ID:admin,jwzToken:eyJ0,body: { “emailVerified”:”true”, “phoneNumber”:”0718783373", “phoneNumberCountryCode”:””, “zoneinfo”:””, “BillingSegment”:””, “isLostStolen”:””, “idHash”:””, “SubscriptionActivity”:””, “LengthOfTenure”:””, “pairingChange”:””, “isRoaming”:”No”, “roamingCountry”:”UK”, “isDivertSet”:””, “locationCountry”:””, “deviceChange”:””}
Response [2018–06–22 11:36:19,997] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:response,API_REQUEST_ID:urn:uuid:e86a7610,body: { “msisdn”:”0718783373", “imsi”:”0718783373", “title”:”abc”, “firstName”:”Nuwan”, “lastName”:”Senanayake”, “dob”:””, “identificationType”:””, “identificationNumber”:””, “onBehalfOf”:””, “purchaseCategoryCode”:””, “accountType”:””, “ownerType”:””, “status”:””}
As you can see two log entry formats different. in order to apply logstash filleting format should be fixed. so we have to handle requests and responses separately.
if [message] =~ "TRANSACTION:request" {
this condition checks whether request keyword exists in the log.
Next step is to extract keyvalue pairs from the log record so that can process the record using logstash filters. Grok filter is used get this done as below.
grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" } }
filter {
if [message] =~ "TRANSACTION:request" { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" } } aggregate { #store required data in a map task_id => "%{tid}" code => " map['reqBody'] = event.get('body') map['user'] = event.get('user') map['application'] = event.get('application') map['api'] = event.get('api') map['timestamp'] = event.get('timestamp') " map_action => "create" } drop {}#drop the request before persisting, to save indexing space in elasticsearch server } if [message] =~ "TRANSACTION:response" { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:response}" } remove_field => ["message"] }
aggregate { task_id => "%{tid}"
code => " event.set('request', map['reqBody']) event.set('user', map['user']) event.set('application', map['application']) event.set('api', map['api']) event.set('request-time', map['timestamp']) event.set('transaction', 'full') " map_action => "update" end_of_task => true push_map_as_event_on_timeout => true timeout => 120 timeout_task_id_field => "tid" timeout_code => " event.set('response','Response-timeout') event.set('type','request-response') event.set('transaction', 'request') " }
ruby { init => "require 'time'" code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); " } } }
Post Implementation for Production
Once our business requirement is satisfied there are some more things to do before send this to production.
if [source] =~ “request-response-logger” {
this will make sure our filter is applied only to the logs read from “request-response-logger.log” file as this same log-stash instance is used to process some other logs as well.
mutate { replace => { “type” => “request-response” } }
there is a field cold type comes with the incoming event which is bit lengthy word. we will replace it with our text and will use later when doing output as shown in below code snippet.
if [type] == "request-response" { elasticsearch { hosts => [ "localhost:9200" ] } csv { path => "csv_reports/transactions.csv" fields => ["tid","user","api"] } }
Comments
Post a Comment