OBA Logstash White Paper

HPE Operations Bridge Analytics

Software Version: 3.00 and 3.01

 

Starting with Operations Bridge Analytics (OBA) version 3.0, you can use the integrated OBA Kafka topic for direct log data ingestion, as an alternative to integrating log data via either Arcsight Logger or Splunk.

Kafka is a technology that processes streams of data efficiently and in real time. This white paper describes how you can use a data collecting software, like Logstash, to pass log data to the OBA Kafka topic for processing. As with previous versions, you can continue to integrate log data via Arcsight Logger or Splunk. For more information on those integrations, see the OBA online help or the OBA User Guide.

Information and examples in this document should be sufficient for you to complete the integration; you do not need to become an expert on Kafka itself.

This document includes the following sections:

Limitations

It is not recommended that you use this method of data integration to integrate custom metric collections. When possible, collections of metrics should be implemented as described in the OBA User Guide. While some metrics can be defined in the OBA Kafka topic, like the custom_int1 field, this method is not as flexible as custom csv collection configurations where you can control the field names and tagging options.

All data put into Kafka will be forwarded to the OBA Event and Log Analytics subsystem, which relies on useful message fields to create groupings and attach significance to data. Log Analytics is not designed for use on metric collections. If you do not have a relevant message text in your logs that you want to have processed for significance relative to other log or event data you are integrating into OBA, then you should not use this method.

The log data that is fed into the OBA Kafka topic cannot be removed, because it goes into shared tables in Vertica. Therefore, unlike a custom csv collection, you cannot simply delete the collection to remove unwanted data. For this reason, it is recommended that you test your formatting output to a file or terminal window before beginning an integration with the OBA Kafka processor.

Overview

OBA can now ingest log data via Kafka.The OBA Kafka bus is preconfigured and automatically started by OBA. There is no need to configure anything in OBA for this mechanism to work. Kafka listens on TCP port 9092 for incoming connections. As shown in the following diagram, log files are gathered by a data collecting technology, like Logstash, and sent to the OBA Kafka bus.

Any tool can be used to feed data into Kafka, as long as it formats the data properly. In this paper, we use Logstash for the examples because it is a popular technology that enables you to process data from a variety of sources and formats it in an output stream that works well with OBA's Kafka. This solution was tested with Logstash version 5.3.0.

Logstash Configuration

Information on Logstash installation is out of the scope of this document. For more information on Logstash, see https://www.elastic.co/products/logstash.

If you want to use Logstash to populate the OBA Kafka topic, the following Logstash configuration example can serve as a guide, as configured on a RedHat 6.x OS.

# /etc/logstash/conf.d/syslogexample.conf for Ops Bridge Analytics
#
input {
  # read new content going into the local default syslog file:
  file {
    path => "/var/log/messages"
  }
}
filter {
  # For Analytics, we want most of the text of the message in one field
  # so only the timestamp and hostname are extracted:
  grok {
    match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:hostname} %{GREEDYDATA:message}" }
    # overwrite the message field, removing the timestamp
    overwrite => ["message"]
  }
  # Populate fields for Analytics
  # - hardcode the severity (better would be to use rsyslog and parse it
  # however for syslogd messages file, the priority is not preserved)
  # - set device vendor and product fields for this particular input:
  mutate {
    add_field => {
      "severity" => "Medium"
      "device_vendor" => "RHLinux"
      "device_product" => "syslog"
    }
  }
  # Set timestamp to original from log - note Time Zone should be edited for your environment:
  date {
    match => [ "syslog_timestamp", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss", "MMM d HH:mm:ss" ]
    timezone => "America/Denver"
    target => "timestamp"
  }
  # Reformat timestamp so that it matches the required input format for OBA:
  ruby {
    code => "event.set('timestamp', event.get('timestamp').time.strftime('%Y-%m-%d %H:%M:%S'))"
  }
}
output {
  # Change destination server for the kafka output for OBA:
  # commented out for testing..
  # kafka {
  #  codec => json
  #  topic_id => "opsa_default.opsa.data.log.1.0.di.input"
  #  bootstrap_servers => "<your-opsa-collector-system-hostname>:9092"
  #  compression_type => "gzip"
  #}
  # Output options for testing:
  stdout { codec => rubydebug }
  file {
   path => "/tmp/logstashout.json" 
   codec => json
  }
}

The various sections and syntax conventions in this Logstash config file are explained in the Elastic Logstash web pages. The example here includes only a very simple input section, which reads a common log file from the system on which Logstash is running. The filter section is used to parse and populate the required fields for OBA. The output section directs the output, in this case to Logstash's stdout and also to a /tmp/logstashout.json file.

It is recommended that you make sure that your Logstash produces the desired output before actually sending it to OBA. You can execute Logstash as the root user to run on this file as follows:

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/syslogexample.conf

You must have root privileges to access var/log/messages. By default, Logstash will only process new messages that are appended to the monitored file input, after it has started. You can change that behavior, as described later, but for now, to test your Logstash feed you can cause a line to be added to /var/log/messages by executing a command in a different root terminal window, as follows:

logger "testing testing 123"

You should see a processed, formatted message come out of Logstash stdout and also into the file. The file is in json format, which can be difficult to read, but contains the same basic structure as the rubydebug we sent to stdout. When you are ready, you can use Ctrl + C to stop this Logstash and then edit the syslogexample.conf file to replace the output section with something similar to the following (change the hostname destination for Kafka to match your environment):

 output {
  # Change destination server for the kafka output for OBA:
  kafka {
    codec => json
    topic_id => "opsa_default.opsa.data.log.1.0.di.input"
    bootstrap_servers => "<your-opsa-collector-system-hostname>:9092"
    compression_type => "gzip"
  }
  # Other options for testing:
  #stdout { codec => rubydebug }
  #file {
  # path => "/tmp/logstashout.json" 
  # codec => json
  #}
}

Re-execute Logstash using the revised configuration, as above. Again, use the Linux logger command to send a local syslog message into the monitored messages file. After a few minutes, you should be able to view your messages in the OBA Search user interface.

If you want to change the configuration again, this time to process the input file from the beginning instead of just new entries, you can try this replacement for the input clause:

input {
  # read the local default syslog file from the start:
  file {
    path => "/var/log/messages"
    # The start_position setting is evaluated only on the first start because
    # logstash keeps a database of file offsets read previously. By setting
    # the sincedb path to /dev/null it will bypass and always read from the
    # beginning of the file - this can be useful but ONLY recommended while testing. 
    # See the logstash file input documentation for more details!
    start_position => beginning
    sincedb_path => "/dev/null"
  }
}

There are many different ways to feed data into Logstash. You do not need to install it on every system from which you want to pull data. For example, you might use Logstash to process syslog data that is sent over the network from many other systems, or you might have a central Logstash, which receives logfiles sent from integrated programs, such as Filebeat. How you deploy Logstash in your environment is up to you.

For performance reasons, it is not recommended that you install Logstash directly onto your OBA system. If you have no other choice but to place Logstash on the OBA host, then install it in a totally new directory structure (not under /opt/HP/), and ensure you have enough resources on the system to compensate for the additional load. For example, on a virtual machine, you must add more cores and memory (depending on your Logstash load), beyond the recommendations for OBA itself.

Kafka Interface Description

The Kafka topic for collecting logs is the following:

<tenant_id>.opsa.data.log.1.0.di.input

Gzip compression is enabled for this topic. For optimal disk space efficiency, make sure to enable gzip compression on any Kafka producer.

To populate OBA with log data using this interface, you need to send JSON-formatted data to the Kafka topic. For more information about JSON, see http://www.json.org/.

For example, use the Kafka console producer that is available in the OBA installation:

/opt/HP/opsa/kafka/bin/kafka-console-producer.sh --broker-list <collector ip address>:9092 --topic opsa_default.opsa.data.log.1.0.di.input

Each line that is sent to the Kafka topic must be a properly formatted JSON object. The JSON object needs to contain at least the following fields:

The following is an example for a valid JSON object:

{"timestamp": "2016-11-14 11:08:09+0100", "message": "message 123", "hostname": "myhost.com", "device_product": "product123", "device_vendor": "vendor123", "device_version": "1", "severity": "High"}

The following table shows the fields that can be used when collecting log files. It is not possible to create additional fields.

Name Data type (length/format) Required Notes
timestamp date (yyyy-MM-dd HH:mm:ss[Z]) true The current timestamp will be used if this field is missing, but a warning will be printed in logs. See additional explanations below on timestamp handling.
severity string (240) true

Must be one of the following: Very-High, High, Medium, Low

Note: These values are case sensitive.

message string (10000) true The message field must exist and must not be an empty string.
hostname string (300) true This is a free text field that can take any value. To properly work together with host reconciliation, it should be FQDN or IP-Address (IPv4).
path string (1065)    
device_vendor string (300) true Any value is allowed. Only one of device_vendor, device_product, and device_version is required. It is used by log analytics to identify the type of log, for example apache.
device_product string (300) true Any value is allowed. Only one of device_vendor, device_product, and device_version is required. It is used by log analytics to identify the type of log, for example accesslog.
device_version string (48) true Any value is allowed. Only one of device_vendor, device_product, and device_version is required. It is used by log analytics to identify the type of log, for example 1.
external_id string (120)    
event_state string (30)    
event_priority string (30)    
source_address string (48)    
source_host string (300)    
source_agent string (300)    
source_severity string (189)   This field can be used to store the original severity that came from the log itself, for example "INFO" or "ERROR".
timezone string (30)    
device_receipt_time date (epoch)    
end_time date (epoch)    
custom_string1 string (300)    
custom_string2 string (300)    
custom_string3 string (300)    
custom_string4 string (300)    
custom_string5 string (300)    
custom_string6 string (300)    
custom_float1 float    
custom_float2 float    
custom_float3 float    
custom_int1 integer    
custom_int2 integer    
custom_int3 integer    
custom_int4 integer    
custom_int5 integer    
custom_int6 integer    
custom_int7 integer    
custom_time1 date (yyyy-MM-dd HH:mm:ss)   Time in UTC. No timezone allowed.
custom_time2 date (yyyy-MM-dd HH:mm:ss)   Time in UTC. No timezone allowed.

If the length of a string exceeds the maximum size, the string is truncated.

The following example shows a JSON object using all fields:

{"timestamp": "2016-11-10 10:02:01", "severity": "High", "message": "my message", "hostname": "myhost", "path": "mypath", "device_vendor": "myvendor", "device_product": "myproduct", "device_version": "myversion", "external_id": "myexternalid", "event_state": "mystate", "event_priority": "myprio", "source_address": "myaddress", "source_host": "mysourcehost", "source_agent": "mysourceagent", "agent_severity": "ERROR", "timezone": "mytz", "device_receipt_time": "1478782142", "end_time": "1478782143", "custom_string1": "string1", "custom_string2": "string2", "custom_string3": "string3", "custom_string4": "string4", "custom_string5": "string5", "custom_string6": "string6", "custom_float1": 1.5, "custom_float2": 2.4, "custom_float3": 3.6, "custom_int1": 1, "custom_int2": 2, "custom_int3": 3, "custom_int4": 4, "custom_int5": 5, "custom_int6": 6, "custom_int7": 7, "custom_time1": "2016-11-10 10:03:01", "custom_time2": "2016-11-10 10:04:01"}

Field Specifics

severity, source_severity. The value of severity has a meaning for OBA. Message weights are calculated based on the value that is used here. The severity is therefore a normalized value that must be one of the following:

For each log source, make sure that the severities used in the data are mapped to one of the above values. The severities are case sensitive and must begin with a capital letter.

An example mapping for a typical log4j severities could be the following:

The original severity can also be retained in OBA by putting it into the field source_severity.

timestamp. For optimal performance, the timestamp has to be properly formatted so that OBA can parse it. If the format contains no timezone information, the timestamp is assumed to be in UTC. For example:

2016-12-05 13:15:01

If the timestamp is not in UTC, then the timezone information has to be specified as part of the timestamp. The timestamp in the following example is located in the timezone UTC+1:

2016-12-05 13:15:01+0100

If OBA fails to parse the value of the timestamp, or if no timestamp is specified at all, the current time is used as a fallback and a warning is printed into the logs. This situation should be avoided because it can lead to performance problems.

Note: This timestamp information does not apply to the custom time fields custom_time1 and custom_time2. Those fields always have to be specified in format yyyy-MM-dd HH:mm:ss and cannot contain a timezone. They are stored in the database as is, without making adjustments.

device_vendor, device_product, device_version. It is very important to pick the good values for these fields. OBA will internally use the concatenation of these three values to identify different log sources for use in message clustering and analysis. That means that for one source (or type) of logs, one value of the combination of these fields should be used. Optimally, no one source or type of log should have many thousands of message templates, and conversely, you should avoid creating thousands of sources.

For example, OBA is ingesting Apache access logs and Vertica database logs. You might use the following values:

Log source device_vendor device_product device_version
Apache access logs Apache accesslog 1
Vertica HPE Vertica 8.01

Do not generate values for device_vendor, device_product, and device_version that are different for each message. This would result in too many combinations and could seriously limit the performance of OBA, impacting analytics.

message. The value of the message field is used as an input for OBA's log analytics. For optimal results, the message field should not contain a timestamp. While it is not strictly necessary to remove timestamps, OBA's analytics features will work better and with more efficiency.

Other fields. All other fields can be used to store data that was extracted from the message. The data can then be used in AQL to group or filter the data.

Kafka Network Specifics

Kafka runs on all OBA collectors and listens on TCP port 9092 for incoming connections. Kafka is a distributed bus, and its data storage is automatically distributed among all the collector hosts you have registered fora particular tenant. For that reason, the Kafka producer (for example, Logstash) must be able to connect to all OBA collectors, not just the one you specify in the bootstrap_servers configuration in Logstash. You do not know which collector Logstash will send data to, so the Logstash server must be able to reach all of your registered collector servers. Configure any firewalls appropriately. This is especially important when the Kafka producer is running in a different subnet than the OBA collectors and there is a firewall in between.

Accessing Log Data in OBA

PQL. PQL data can be accessed using a host search. A host search will return a metric query for all custom integer and float fields that contain data. In addition, it generates log analytics (aqllogsummary), text search, and anomaly panes.

AQL. The following AQL uses the metrics custom_int1 and custom_float1:

[metricQuery({opsa_data_log}, {((i.hostname ilike "*"))}, {i.hostname}, {i.custom_int1, i.custom_float1})]

The default labels can be changed using "as" in AQL:

[metricQuery({opsa_data_log}, {((i.hostname ilike "*"))}, {i.hostname}, {i.custom_int1 as "my integer", i.custom_float1 as "my float"})]

XQL. Log messages can be found using the OBA text search.

By default, OBA searches in all log data, which includes self monitoring data and events. To see only the logs that were read via the Kafka integration, filter for Text: * | where source_type="PlainLog".

Security

To configure OBA to use SSL with Kafka, see the OBA Hardening Guide.

If Kafka is configured to use SSL, the Kafka producer needs to be configured to use SSL as well. If Logstash is used, follow the instructions provided by Logstash:

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html#plugins-outputs-kafka-ssl

Deployment

It is generally not supported to run 3rd party software on any machine that is running an OBA server or collector.

If Logstash is used to collect logs, follow the logstash recommendations for deployment and scaling (https://www.elastic.co/guide/en/logstash/current/deploying-and-scaling.html)

Additional Logstash Examples

Severity Mapping

The field severity must be set to one of the following values: "Very-High", "High", "Medium", or "Low". This is an example filter section that maps common logging severities to the required OBA severities that are required in the agent_severity field. The severities are taken from a field named source_severity and mapped to the severity field:

    if [source_severity] == "ERROR" {
      mutate { add_field => [ "severity", "Very-High" ]}
    } else {
      if [source_severity] == "FATAL" {
        mutate { add_field => [ "severity", "Very-High" ]}
      } else {
        if [source_severity] =~ "WARN" {
          mutate { add_field => [ "severity", "High" ]}
        } else {
          if [source_severity] == "INFO" {
            mutate { add_field => [ "severity", "Medium" ]}
          } else {
            mutate { add_field => [ "severity", "Low" ]}
          }
        }
      }
    }
An alternative to using if-else syntax would be to install the optional translate filter plugin to Logstash to create the mapping, such as the following.
 translate {
  dictionary => [
   "FATAL", "Very-High",
   "ERROR", "Very-High",
   "WARN", "High",
   "INFO", "Medium",
   "DEBUG", "Low",
   "TRACE", "Low"
  ]
  field => "source_severity"
  destination => "severity" 
 }

Example configuration including severity mapping

For the following example, consider the following log message:

2017-04-17 16:25:47 INFO [main] modules:932 - JBoss Modules version 1.3.5.Final

input {
 file {
  path => "/tmp/<testfile>"
  add_field => {
   "device_vendor" => "my_vendor"
   "device_product" => "my_product"
  }
 }
}
filter {
 grok {
  match => {
  # timestamp can be used as is - it is already in the output format expected by OBA
  "message" => "^(?<timestamp>\d+-\d+-\d+ \d+:\d+:\d+) (?<source_severity>[A-Z]+) (?<message>.*)"
  }
  overwrite => ["message"]
 }
 mutate {
  # rename field host (generated by file input plugin) to hostname (as expected by OBA)
  rename => { "host" => "hostname" }
 } # use the contents of field 'source_severity' to set OBA required field 'severity'
  if [source_severity] == "ERROR" {
      mutate { add_field => [ "severity", "Very-High" ]}
  } else {
    if [source_severity] == "FATAL" {
      mutate { add_field => [ "severity", "Very-High" ]}
    } else {
      if [source_severity] =~ "WARN" {
        mutate { add_field => [ "severity", "High" ]}
      } else {
        if [source_severity] == "INFO" {
          mutate { add_field => [ "severity", "Medium" ]}
        } else {
          mutate { add_field => [ "severity", "Low" ]}
        }
      }
    }
  }
}
output {
 kafka {
  codec => json
  topic_id => "opsa_default.opsa.data.log.1.0.di.input"
  bootstrap_servers => "<your-opsa-collector-system-hostname>:9092"
  compression_type => "gzip"
 }
}

Timestamp formatting

Timestamps must be properly formatted, using the format yyyy-MM-dd HH:mm:ss. If this format is not followed, then timestamps are not recognized.

Consider the following log message:

2017-04-15 00:56:31 INFO [main] modules:932 - JBoss Modules version 1.3.5.Final

This message already contains the timestamp in the correct format. It just has to be parsed out of the message and stored in the appropriate field. This is achieved by using the following Logstash config:

grok {
    match => {
        "message" => "^(?<timestamp>\d+-\d+-\d+ \d+:\d+:\d+) (?<severity>[A-Z]+) (?<message>.*)"
    }
    overwrite => ["message"]
}

This grok filter parses the message field and creates values for timestamp and severity. Timestamp is already in the correct format, so it can be used as is, although, as noted earlier, because of the lack of timezone information it is assumed to be in UTC. Severity will still have to be mapped using the severity mapping filter shown above.

Now consider the following log message:

INFO 04/15/17 12:56 AM:liquibase: Successfully acquired change log lock

Extracting the timestamp in the appropriate format is a bit more tricky here. The following configuration achieves that:

# grok filter is responsible for string parsing. Parsed timestamp is stored as a string inside a field labeled 'timestamp'
grok {
    match => {
        "message" => "^(?<severity>[a-zA-Z]+) (?<timestamp>\d+/\d+/\d+ \d+:\d+ [a-zA-Z]+):liquibase: (?<message>.*)"
    }
    overwrite => ["message"]
}
# date filter converts extracted time string into a date object
date {
    match => ["timestamp", "M/d/yy h:m a"]
    target => "timestamp"
    # note here is where a local timezone setting could be set, if desired, since the log timestamp does not have the time zone:
    timezone