Centralización de logs con Logstash, Filebeat y ElasticSearch

Continuando con el proyecto de centralización de logs que ya había empezado con la instalación de ElasticSearch y de Kibana, llega el turno de la recogida y envío de logs al servidor centralizador mediante Logstash.

Lo que hace Logstash es enviar los datos directamente a ElasticSearch y luego los visualizaremos con Kibana.

Instalación de Logstash en Linux CentOS

Para montar esta prueba, voy a instalar Logstash en un Linux CentOS 7, ejecutando el siguiente comando:

yum install logstash -y

Configuración de Logstash

Una vez instalado el producto, configuraremos el puerto de escucha del servicio creando el siguiente fichero:

[root@elkbn ~]# cat /etc/logstash/conf.d/02-beats-input.conf
input {
  beats {
    port => 5044
  }
}
[root@elkbn ~]#

Seguidamente, vamos a crear un filtro para darle un formato a los logs del sistema operativo que queremos guardar, creando el siguiente fichero de configuración (obtenido de la documentación oficial de ElasticSearch):

[root@elkbn ~]# cat /etc/logstash/conf.d/10-syslog-filter.conf
filter {
  if [fileset][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[system][auth][ssh][geoip]"
      }
    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
        remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }
}
[root@elkbn ~]#

Finalmente, creamos el fichero de configuración con la IP y puerto de ElasticSearch, que será a donde enviemos los logs recogidos por el «input» configurado anteriormente por el puerto 5044:

[root@elkbn ~]# cat /etc/logstash/conf.d/30-elasticsearch-output.conf
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}
[root@elkbn ~]#

En este caso, el servidor de ElasticSearch está arrancado en el mismo servidor local donde hemos instalado Kibana y Logstash. Ya nos sirve para hacer la prueba de concepto.

Arranque del servicio

Una vez instalado y creados los ficheros de configuración de Logstash, ha llegado la hora de arrancar el servicio.

[root@elkbn ~]# systemctl start logstash
[root@elkbn ~]#

[root@elkbn ~]# systemctl enable logstash
Created symlink from /etc/systemd/system/multi-user.target.wants/logstash.service to /etc/systemd/system/logstash.service.
[root@elkbn ~]#

[root@elkbn ~]# systemctl status logstash
● logstash.service - logstash
   Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2019-06-27 09:31:33 UTC; 10s ago
 Main PID: 5123 (java)
   CGroup: /system.slice/logstash.service
           └─5123 /bin/java -Xms1g -Xmx1g -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -Djava.awt.headless=true -Dfile.encoding=UTF-8 -Djruby....

Jun 27 09:31:33 elkbn systemd[1]: Started logstash.
[root@elkbn ~]#

Instalación y configuración de Filebeat

Filebeat es un servicio capaz de enviar datos a ElasticSearch y a Logstash. Lo instalaremos para enviar los logs que queremos a Logstash.

Lo primero de todo es instalarlo:

yum install filebeat -y

En el fichero de configuración /etc/filebeat/filebeat.yml indicamos en qué IP y puerto está escuchando el servicio de ElasticSearch y de Logstash:

#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------

output.logstash:
  # The Logstash hosts
  hosts: ["localhost:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

En esta prueba están todos los servicios configurados en el mismo servidor local pero dejo deshabilitado el servicio de ElasticSearch.

Habilitamos el módulo System para capturar los logs del sistema operativo:

[root@elkbn ~]# filebeat modules enable system
Enabled system
[root@elkbn ~]#

[root@elkbn ~]# filebeat modules list
Enabled:
system  <-- Módulo habilitado

Disabled:
apache
auditd
cisco
coredns
elasticsearch
envoyproxy
haproxy
icinga
iis
iptables
kafka
kibana
logstash
mongodb
mysql
nats
netflow
nginx
osquery
panw
postgresql
rabbitmq
redis
santa
suricata
traefik
zeek
[root@elkbn ~]#

Esto significa que hemos activado una plantilla por defecto para la recogida de logs del sistema.

Deshabilitarlo es igual de sencillo:

[root@elkbn filebeat]# filebeat modules disable system
Disabled system
[root@elkbn filebeat]#

[root@elkbn filebeat]# filebeat modules list
Enabled:

Disabled:
apache
auditd
cisco
coredns
elasticsearch
envoyproxy
haproxy
icinga
iis
iptables
kafka
kibana
logstash
mongodb
mysql
nats
netflow
nginx
osquery
panw
postgresql
rabbitmq
redis
santa
suricata
system
traefik
zeek
[root@elkbn filebeat]#

Y cargamos la plantilla de filebeat en ElasticSearch para el tratamiento de los logs del sistema:

[root@elkbn ~]# filebeat setup --index-management -E output.logstash.enabled=false -E 'output.elasticsearch.hosts=["localhost:9200"]'
Index setup finished.
[root@elkbn ~]#

En Kibana ya podemos ver el índice que acabamos de crear:

Kibana - Visualización de la Plantilla de Filebeat en ElasticSearch

Arrancamos el servicio de Filebeat:

[root@elkbn ~]# systemctl start filebeat
[root@elkbn ~]#

[root@elkbn ~]# systemctl enable filebeat
Created symlink from /etc/systemd/system/multi-user.target.wants/filebeat.service to /usr/lib/systemd/system/filebeat.service.
[root@elkbn ~]#

[root@elkbn ~]# systemctl status filebeat
● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
   Loaded: loaded (/usr/lib/systemd/system/filebeat.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2019-06-27 09:53:58 UTC; 9s ago
     Docs: https://www.elastic.co/products/beats/filebeat
 Main PID: 6420 (filebeat)
   CGroup: /system.slice/filebeat.service
           └─6420 /usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/fil...

Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.439Z        INFO        cfgfile/reload.go:172        Config reloader started
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        log/input.go:148        Configured paths: [/var/log/auth.log* /var/log/secure*]
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        log/input.go:148        Configured paths: [/var/log/messages* /var/log/syslog*]
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        input/input.go:114        Starting input of type: log; ID: 14859378308450008226
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        input/input.go:114        Starting input of type: log; ID: 6524930022985820847
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.440Z        INFO        cfgfile/reload.go:227        Loading of config files completed.
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.441Z        INFO        log/harvester.go:253        Harvester started for file: /var/log/secure
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.441Z        INFO        log/harvester.go:253        Harvester started for file: /var/log/messages
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.526Z        INFO        pipeline/output.go:95        Connecting to backoff(async(tcp://localhost:5044))
Jun 27 09:53:58 elkbn filebeat[6420]: 2019-06-27T09:53:58.527Z        INFO        pipeline/output.go:105        Connection to backoff(async(tcp://localhost:5044)) established
[root@elkbn ~]#

Si nos volvemos a conectar a la consola de Kibana, podemos visualizar los logs configurados desde la interfaz WEB:

Visualizar logs desde Kibana

Personalización de recogida de logs

Si lo que queremos es personalizar la recogida de logs del sistema en vez de utilizar una plantilla predefinida, editaremos el fichero /etc/filebeat/filebeat.yml indicando el path absoluto de los logs que queremos recoger. Por ejemplo, el boot.log.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:    
    - /var/log/messages
    - /var/log/boot.log

Una vez configurado el fichero filebeat.yml, reiniciamos el servicio:

systemctl restart filebeat

¿Te ha gustado? ¡Compártelo!

Share on facebook
Share on google
Share on twitter
Share on linkedin
Share on whatsapp

Tal vez también te gustaría leer...

Tutorial de Ansible

Recientemente me he estado mirando el funcionamiento de Ansible para automatizar tareas de manera masiva en servidores Linux remotos mediante esta aplicación. Hasta ahora utilizo otra pero como Ansible está cada vez más extendida y considero que vale la pena mirárselo. Más aún si RedHat está apostando por esta herramienta como estándar de automatización. Su

Leer más »

Insertar el botón «Continuar leyendo» en WordPress

Lo normal es presentar en la portada de un blog varios artículos pero no mostrar el contenido completo de cada uno de ellos porque la portada sería muy larga. Es una buena práctica insertar un botón o enlace en cada uno de los artículos de la portada que indique «Continuar leyendo» y que el usuario

Leer más »

Crear un Espinner con las Hojas de Cálculo de Google

Si no sabes lo que es un Espinner consiste en crear contenido diferente con un único texto a través de variables. Mejor lo explico con un ejemplo: Pongamos por caso que tenemos el siguiente texto con variables: Lo que hay dentro de cada llave son diferentes variables para cada palabra clave. Cuando procesemos este texto

Leer más »

Tipos de Enlaces WEB: Nofollow, Sponsored y UGC

Hasta hace muy poco, Google distinguía entre dos tipos de enlaces hacia otras URLs: Follow o, también conocidos como «DoFollow», son aquellos «puntúan» en el posicionamiento de una página WEB. NoFollow son justo lo contrario. Es decir, podemos insertar un enlace en un artículo, un foro, etc. pero no traspasaremos autoridad de nuestra página a

Leer más »

WP All Import – Importar fichero CSV y XML en WordPress

El plugin de WordPress WP All Import es muy útil para importar a WordPress una cantidad ingente de datos. El problema es que es de pago pero nos ahorrará mucho tiempo en estos casos. Imaginaos el tiempo que podéis tardar en crear 200 entradas en WordPress de forma manual. Con WP All Import, el proceso

Leer más »