To create an index, an index mapping is needed - In general, I think for collecting filebased logs - the filebeat template suits me.

Make a copy of filebeat.json from the zip package at and change filebeat.json name and the content likewise.


Then upload and create index.

root@elkserver:curl -XPUT http://localhost:9200/.kibana/index-pattern/camlog-* -d @camlog.json


Then, copy filebeat-index-template.json to camlog-index-template.json (and change the content likewise)

root@elkserver:~# curl -XPUT 'http://localhost:9200/_template/camlog?pretty' -d@camlog-index-template.json
  "acknowledged" : true


The collection on moserver is (this is a part of it)

        - /data/camera-data/Fordor.log
        - /data/camera-data/Baghus.log
      document_type: camlog
      input_type: log


This is shipped to Logstash, where output is configured for ElasticSearch- notice the if for type "camlog":

output {
  if [type] == "pinglog" {
    elasticsearch {
      hosts => ["localhost:9200"]
      sniffing => true
      manage_template => false
      index => "pinglog-%{+YYYY.MM.dd}"
      document_type => "%{[@metadata][type]}"
    if [type] == "camlog" {
      elasticsearch {
        hosts => ["localhost:9200"]
        sniffing => true
        manage_template => false
        index => "camlog-%{+YYYY.MM.dd}"
        document_type => "%{[@metadata][type]}"
      elasticsearch {
        hosts => ["localhost:9200"]
        sniffing => true
        manage_template => false
        index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
        document_type => "%{[@metadata][type]}"

Test and restart:

root@elkserver:/etc/logstash/conf.d# /etc/init.d/logstash configtest
Configuration OK
root@elkserver:/etc/logstash/conf.d# /etc/init.d/logstash restart
Killing logstash (pid 16262) with SIGTERM
Waiting logstash (pid 16262) to die...
Waiting logstash (pid 16262) to die...
Waiting logstash (pid 16262) to die...
Waiting logstash (pid 16262) to die...
logstash stopped.
logstash started.


As I have ensured logs are pretty much key=value for the values I need to collect, I use kv to get the fields, and a mutate to ensure the string value for "pingtime=" becomes a float (otherwise it cant be used in a Visualization):

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
filter {
  if [type] == "apache" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    geoip {
      source => "clientip"
      target => "geoip"
      database => "/etc/logstash/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    mutate {
      convert => [ "[geoip][coordinates]", "float"]
    if [type] == "pinglog" {
      kv {}
      mutate {
        convert => { "pingtime" => "float" }


This should bring pinglogs in the index "pinglogs"