ELK #07 LogStash

จากที่ได้กล่าวถึงมายาวนานในเรื่อง ELK  และ  ELK #02 ที่ได้กล่าวถึงการติดตั้ง LogStash ไว้เบื้องต้น ในบทความนี้จะมาลงลึก ถึงกระบวนการทำงานของ LogStash ซึ่งเป็นส่วนสำคัญในการเปลี่ยนข้อมูล Unstructured ให้เป็น Structured

ตอนนี้ เราจะทำงานใน /etc/logstash/conf.d/

Simple input – output plugin

สร้างไฟล์ 01-input-file.conf มีเนื้อหาดังนี้

input {
	file {
		path => ["/tmp/input.txt"]
		mode => "tail"
		}
}

ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash อ่านไฟล์ /tmp/input.txt โดยให้อ่านบรรทัดล่าสุด (ต่อจาก Checkpoint ก่อนหน้า) เข้ามา โดยถ้าไม่กำหนด mode => “tail” ระบบจะอ่านไฟล์ก็ต่อเมื่อ มีการสร้างไฟล์ใหม่เท่านั้น

สร้างไฟล์ 98-output-file.conf มีเนื้อหาดังนี้

output {
        file {
                path => "/tmp/output.txt"
        }
}

ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash เขียนไฟล์ /tmp/output.txt

เมื่อปรับเปลี่ยน configuration ต้องทำการ Restart Service

service logstash restart

ลองส่งข้อมูลเข้าไปในไฟล์ /tmp/input.txt ด้วยคำสั่ง

echo "Hello World 1" >> /tmp/input.txt

ดูผลลัพธ์ใน /tmp/output.txt

cat /tmp/output.txt
{"path":"/tmp/input.txt","@version":"1","message":"Hello World 1","@timestamp":"2018-09-11T03:42:33.645Z","host":"elk1"}

แสดงให้เห็นว่า ระบบ LogStash สามารถรับข้อมูลจากไฟล์ และส่งข้อมูลออกไปยังไฟล์ได้

Filter Plugin

ก่อนอื่น Stop Service ด้วยคำสั่ง

service logstash stop

ในการจัดการข้อมูลก่อนบันทึก เช่นการกรอง การจัดรูปแบบ LogStash ทำงานผ่าน Filter Plugin ซึ่งมีหลายรูปแบบ (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) แต่ในที่นี้ จะใช้ grok เหมาะกับข้อมูล Unstructured อย่าง syslog เป็นต้น ซึ่งมักจะเป็น Log ที่ให้มนุษย์อ่านได้ง่าย แต่ไม่ค่อยเหมาะสำหรับให้คอมพิวเตอร์เอาไปใช้งานต่อ ซึ่ง LogStash มีไว้ให้แล้วกว่า 120 ตัว

ตัวอย่าง grok-pattern

ต่อไป สร้าง 44-filter-basic.conf มีเนื้อหาดังนี้

filter {
        grok {
                match => {
                        "message" => "%{IP:ipaddress} %{NUMBER:size}"
                }
        }
}

จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

service logstash start

แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

echo "192.168.1.1 120" >> /tmp/input.txt

และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

{"message":"192.168.1.1 120","@version":"1","path":"/tmp/input.txt","@timestamp":"2018-09-11T04:56:03.662Z","size":"120","host":"elk1","ipaddress":"192.168.1.1"}

แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

Example : Postfix Log

ก่อนอื่น Stop Service ด้วยคำสั่ง

service logstash stop

เนื่องจาก Log แต่ละชนิด แต่ละซอฟต์แวร์มีความหลากหลายมาก แต่ดีที่มีผู้เชี่ยวชาญเค้าเขียน Pattern เอาไว้ให้ ให้ใช้คำสั่งต่อไปนี้ สร้างไดเรคทอรี่ /etc/logstash/patterns.d/ และ ดาวน์โหลด มาเก็บไว้

mkdir /etc/logstash/patterns.d
wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns -O /etc/logstash/patterns.d/grok-patterns
wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/postfix.grok -O /etc/logstash/patterns.d/postfix.grok

ในกรณีของ Postfix จากนั้น ดาวน์โหลด Filter Plugin มาเก็บไว้ใน /etc/logstash/conf.d/ ด้วยคำสั่ง

wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/50-filter-postfix.conf -O /etc/logstash/conf.d/50-filter-postfix.conf

และ ต้องสร้างอีกไฟล์ เพื่อเตรียมข้อมูล ชื่อ 49-filter-postfix-prepare.conf ใน /etc/logstash/conf.d/ เนื้อหาตามนี้

filter {
	grok {
    		match => { "message" => "%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{DATA:program}(?:\[%{POSINT}\])?: %{GREEDYDATA:message}" }
    		overwrite => "message"
	}
}

จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

service logstash start

แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

echo "Sep 11 12:05:26 mailscan postfix/smtp[105836]: 268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)" >> /tmp/input.txt

และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

{"program":"postfix/smtp","postfix_delay":43.0,"postfix_dsn":"2.0.0","postfix_relay_port":25,"message":"268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)","path":"/tmp/input.txt","postfix_queueid":"268E04DFFE6","postfix_delay_conn_setup":0.01,"@version":"1","host":"elk1","postfix_to":"xxx.y@psu.ac.th","postfix_relay_hostname":"mail.psu.ac.th","postfix_delay_transmission":0.01,"tags":["_grokparsefailure","_grok_postfix_success"],"postfix_smtp_response":"250 2.0.0 Ok: queued as DE294461637","postfix_delay_before_qmgr":43.0,"postfix_relay_ip":"192.168.107.11","@timestamp":"2018-09-11T07:57:20.354Z","postfix_delay_in_qmgr":0.0,"postfix_status":"sent"}

แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

From Syslog to ElasticSearch

จากตัวอย่างข้างต้น เราทำงานกับไฟล์ /tmp/input.txt และ /tmp/output.txt ต่อไปนี้ จะเป็นการ รับ Input จาก syslog จริง ๆ จากเซิร์ฟเวอร์ ผ่าน Filter และส่งผลออกไปเก็บใน ElasticSearch

ก่อนอื่น Stop Service ด้วยคำสั่ง

service logstash stop

สร้างไฟล์ 02-input-syslog.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
( เปิดรับ syslog ที่ tcp/5514 )

input {
        syslog {
                port => "5514"
        }
}

สร้างไฟล์ 99-output-elasticsearch.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
( ในที่นี้ ใช้ ElasticSearch บน localhost ที่ tcp/9200 และ ไม่ได้ตั้ง Security ใด ๆ )

output {
        elasticsearch {
                hosts => ["localhost:9200"]
        }
}

จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

service logstash start

ที่เซิร์ฟเวอร์ที่จะส่ง Log มาเก็บใน ElasticSearch ผ่าน LogStash ให้แก้ไข /etc/rsyslog.d/50-default.conf ชี้ mail.* ไปยัง LogStash  ที่ tcp/5514

mail.* @@logstash.ip:5514

หากทุกอย่างเรียบร้อย ก็จะสามารถดูผลจาก Kibana ได้อย่างสวยงาม

สามารถนำข้อมูลไปใช้วิเคราะห์ได้ต่อไป