Centralización de logs con Logstash, Filebeat y ElasticSearch

Continuando con el proyecto de centralización de logs que ya había empezado con la instalación de ElasticSearch y de Kibana, llega el turno de la recogida y envío de logs al servidor centralizador mediante Logstash.

Lo que hace Logstash es enviar los datos directamente a ElasticSearch y luego los visualizaremos con Kibana.

Instalación de Logstash en Linux CentOS

Para montar esta prueba, voy a instalar Logstash en un Linux CentOS 7, ejecutando el siguiente comando:

yum install logstash -y

Configuración de Logstash

Una vez instalado el producto, configuraremos el puerto de escucha del servicio creando el siguiente fichero:

[root@elkbn ~]# cat /etc/logstash/conf.d/02-beats-input.conf
input {
  beats {
    port => 5044
  }
}
[root@elkbn ~]#

Seguidamente, vamos a crear un filtro para darle un formato a los logs del sistema operativo que queremos guardar, creando el siguiente fichero de configuración (obtenido de la documentación oficial de ElasticSearch):

[root@elkbn ~]# cat /etc/logstash/conf.d/10-syslog-filter.conf
filter {
  if [fileset][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }
}
[root@elkbn ~]#

Finalmente, creamos el fichero de configuración con la IP y puerto de ElasticSearch, que será a donde enviemos los logs recogidos por el «input» configurado anteriormente por el puerto 5044:

[root@elkbn ~]# cat /etc/logstash/conf.d/30-elasticsearch-output.conf
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}
[root@elkbn ~]#

En este caso, el servidor de ElasticSearch está arrancado en el mismo servidor local donde hemos instalado Kibana y Logstash. Ya nos sirve para hacer la prueba de concepto.

Arranque del servicio

Una vez instalado y creados los ficheros de configuración de Logstash, ha llegado la hora de arrancar el servicio.

[root@elkbn ~]# systemctl start logstash
[root@elkbn ~]#

[root@elkbn ~]# systemctl enable logstash
Created symlink from /etc/systemd/system/multi-user.target.wants/logstash.service to /etc/systemd/system/logstash.service.
[root@elkbn ~]#

[root@elkbn ~]# systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2019-06-27 09:31:33 UTC; 10s ago
 Main PID: 5123 (java)
   CGroup: /system.slice/logstash.service
           └─5123 /bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby....

Jun 27 09:31:33 elkbn systemd[1]: Started logstash.
[root@elkbn ~]#

Instalación y configuración de Filebeat

Filebeat es un servicio capaz de enviar datos a ElasticSearch y a Logstash. Lo instalaremos para enviar los logs que queremos a Logstash.

Lo primero de todo es instalarlo:

yum install filebeat -y

En el fichero de configuración /etc/filebeat/filebeat.yml indicamos en qué IP y puerto está escuchando el servicio de ElasticSearch y de Logstash:

#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------

output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

En esta prueba están todos los servicios configurados en el mismo servidor local pero dejo deshabilitado el servicio de ElasticSearch.

Habilitamos el módulo System para capturar los logs del sistema operativo:

[root@elkbn ~]# filebeat modules enable system
Enabled system
[root@elkbn ~]#

[root@elkbn ~]# filebeat modules list
Enabled:
system  <-- Módulo habilitado

Disabled:
apache
auditd
cisco
coredns
elasticsearch
envoyproxy
haproxy
icinga
iis
iptables
kafka
kibana
logstash
mongodb
mysql
nats
netflow
nginx
osquery
panw
postgresql
rabbitmq
redis
santa
suricata
traefik
zeek
[root@elkbn ~]#

Esto significa que hemos activado una plantilla por defecto para la recogida de logs del sistema.

Deshabilitarlo es igual de sencillo:

[root@elkbn filebeat]# filebeat modules disable system
Disabled system
[root@elkbn filebeat]#

[root@elkbn filebeat]# filebeat modules list
Enabled:

Disabled:
apache
auditd
cisco
coredns
elasticsearch
envoyproxy
haproxy
icinga
iis
iptables
kafka
kibana
logstash
mongodb
mysql
nats
netflow
nginx
osquery
panw
postgresql
rabbitmq
redis
santa
suricata
system
traefik
zeek
[root@elkbn filebeat]#

Y cargamos la plantilla de filebeat en ElasticSearch para el tratamiento de los logs del sistema:

[root@elkbn ~]# filebeat setup --index-management -E output.logstash.enabled=false -E 'output.elasticsearch.hosts=["localhost:9200"]'
Index setup finished.
[root@elkbn ~]#

En Kibana ya podemos ver el índice que acabamos de crear:

Kibana - Visualización de la Plantilla de Filebeat en ElasticSearch

Arrancamos el servicio de Filebeat:

[root@elkbn ~]# systemctl start filebeat
[root@elkbn ~]#

[root@elkbn ~]# systemctl enable filebeat
Created symlink from /etc/systemd/system/multi-user.target.wants/filebeat.service to /usr/lib/systemd/system/filebeat.service.
[root@elkbn ~]#

[root@elkbn ~]# systemctl status filebeat
● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
   Loaded: loaded (/usr/lib/systemd/system/filebeat.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2019-06-27 09:53:58 UTC; 9s ago
     Docs: https://www.elastic.co/products/beats/filebeat
 Main PID: 6420 (filebeat)
   CGroup: /system.slice/filebeat.service
           └─6420 /usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/fil...

Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.439Z        INFO        cfgfile/reload.go:172        Config reloader started
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        log/input.go:148        Configured paths: [/var/log/auth.log* /var/log/secure*]
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        log/input.go:148        Configured paths: [/var/log/messages* /var/log/syslog*]
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        input/input.go:114        Starting input of type: log; ID: 14859378308450008226
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        input/input.go:114        Starting input of type: log; ID: 6524930022985820847
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        cfgfile/reload.go:227        Loading of config files completed.
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.441Z        INFO        log/harvester.go:253        Harvester started for file: /var/log/secure
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.441Z        INFO        log/harvester.go:253        Harvester started for file: /var/log/messages
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.526Z        INFO        pipeline/output.go:95        Connecting to backoff(async(tcp://localhost:5044))
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.527Z        INFO        pipeline/output.go:105        Connection to backoff(async(tcp://localhost:5044)) established
[root@elkbn ~]#

Si nos volvemos a conectar a la consola de Kibana, podemos visualizar los logs configurados desde la interfaz WEB:

Visualizar logs desde Kibana

Personalización de recogida de logs

Si lo que queremos es personalizar la recogida de logs del sistema en vez de utilizar una plantilla predefinida, editaremos el fichero /etc/filebeat/filebeat.yml indicando el path absoluto de los logs que queremos recoger. Por ejemplo, el boot.log.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:    
    - /var/log/messages
    - /var/log/boot.log

Una vez configurado el fichero filebeat.yml, reiniciamos el servicio:

systemctl restart filebeat

Te podría interesar

COMPÁRTEME

1 comentario en «Centralización de logs con Logstash, Filebeat y ElasticSearch»

  1. Hola, yo quiero simplemente mandar logs de una maquina con filebeat a otra maquina que tiene elasticsearch y verlos con kibana y me estoy haciendo un lío con toda esta movida.

    Un saludo.

    Responder

Deja un comentario