จากที่ได้กล่าวถึงมายาวนานในเรื่อง ELK และ ELK #02 ที่ได้กล่าวถึงการติดตั้ง LogStash ไว้เบื้องต้น ในบทความนี้จะมาลงลึก ถึงกระบวนการทำงานของ LogStash ซึ่งเป็นส่วนสำคัญในการเปลี่ยนข้อมูล Unstructured ให้เป็น Structured
ตอนนี้ เราจะทำงานใน /etc/logstash/conf.d/
Simple input – output plugin
สร้างไฟล์ 01-input-file.conf มีเนื้อหาดังนี้
input {
file {
path => ["/tmp/input.txt"]
mode => "tail"
}
}
ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash อ่านไฟล์ /tmp/input.txt โดยให้อ่านบรรทัดล่าสุด (ต่อจาก Checkpoint ก่อนหน้า) เข้ามา โดยถ้าไม่กำหนด mode => “tail” ระบบจะอ่านไฟล์ก็ต่อเมื่อ มีการสร้างไฟล์ใหม่เท่านั้น
สร้างไฟล์ 98-output-file.conf มีเนื้อหาดังนี้
output {
file {
path => "/tmp/output.txt"
}
}
ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash เขียนไฟล์ /tmp/output.txt
เมื่อปรับเปลี่ยน configuration ต้องทำการ Restart Service
service logstash restart
ลองส่งข้อมูลเข้าไปในไฟล์ /tmp/input.txt ด้วยคำสั่ง
echo "Hello World 1" >> /tmp/input.txt
ดูผลลัพธ์ใน /tmp/output.txt
cat /tmp/output.txt
{"path":"/tmp/input.txt","@version":"1","message":"Hello World 1","@timestamp":"2018-09-11T03:42:33.645Z","host":"elk1"}
แสดงให้เห็นว่า ระบบ LogStash สามารถรับข้อมูลจากไฟล์ และส่งข้อมูลออกไปยังไฟล์ได้
Filter Plugin
ก่อนอื่น Stop Service ด้วยคำสั่ง
service logstash stop
ในการจัดการข้อมูลก่อนบันทึก เช่นการกรอง การจัดรูปแบบ LogStash ทำงานผ่าน Filter Plugin ซึ่งมีหลายรูปแบบ (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) แต่ในที่นี้ จะใช้ grok เหมาะกับข้อมูล Unstructured อย่าง syslog เป็นต้น ซึ่งมักจะเป็น Log ที่ให้มนุษย์อ่านได้ง่าย แต่ไม่ค่อยเหมาะสำหรับให้คอมพิวเตอร์เอาไปใช้งานต่อ ซึ่ง LogStash มีไว้ให้แล้วกว่า 120 ตัว
ต่อไป สร้าง 44-filter-basic.conf มีเนื้อหาดังนี้
filter {
grok {
match => {
"message" => "%{IP:ipaddress} %{NUMBER:size}"
}
}
}
จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)
service logstash start
แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt
echo "192.168.1.1 120" >> /tmp/input.txt
และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย
{"message":"192.168.1.1 120","@version":"1","path":"/tmp/input.txt","@timestamp":"2018-09-11T04:56:03.662Z","size":"120","host":"elk1","ipaddress":"192.168.1.1"}
แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้
Example : Postfix Log
ก่อนอื่น Stop Service ด้วยคำสั่ง
service logstash stop
เนื่องจาก Log แต่ละชนิด แต่ละซอฟต์แวร์มีความหลากหลายมาก แต่ดีที่มีผู้เชี่ยวชาญเค้าเขียน Pattern เอาไว้ให้ ให้ใช้คำสั่งต่อไปนี้ สร้างไดเรคทอรี่ /etc/logstash/patterns.d/ และ ดาวน์โหลด มาเก็บไว้
mkdir /etc/logstash/patterns.d
wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns -O /etc/logstash/patterns.d/grok-patterns
wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/postfix.grok -O /etc/logstash/patterns.d/postfix.grok
ในกรณีของ Postfix จากนั้น ดาวน์โหลด Filter Plugin มาเก็บไว้ใน /etc/logstash/conf.d/ ด้วยคำสั่ง
wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/50-filter-postfix.conf -O /etc/logstash/conf.d/50-filter-postfix.conf
และ ต้องสร้างอีกไฟล์ เพื่อเตรียมข้อมูล ชื่อ 49-filter-postfix-prepare.conf ใน /etc/logstash/conf.d/ เนื้อหาตามนี้
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{DATA:program}(?:\[%{POSINT}\])?: %{GREEDYDATA:message}" }
overwrite => "message"
}
}
จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)
service logstash start
แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt
echo "Sep 11 12:05:26 mailscan postfix/smtp[105836]: 268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)" >> /tmp/input.txt
และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย
{"program":"postfix/smtp","postfix_delay":43.0,"postfix_dsn":"2.0.0","postfix_relay_port":25,"message":"268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)","path":"/tmp/input.txt","postfix_queueid":"268E04DFFE6","postfix_delay_conn_setup":0.01,"@version":"1","host":"elk1","postfix_to":"xxx.y@psu.ac.th","postfix_relay_hostname":"mail.psu.ac.th","postfix_delay_transmission":0.01,"tags":["_grokparsefailure","_grok_postfix_success"],"postfix_smtp_response":"250 2.0.0 Ok: queued as DE294461637","postfix_delay_before_qmgr":43.0,"postfix_relay_ip":"192.168.107.11","@timestamp":"2018-09-11T07:57:20.354Z","postfix_delay_in_qmgr":0.0,"postfix_status":"sent"}
แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้
From Syslog to ElasticSearch
จากตัวอย่างข้างต้น เราทำงานกับไฟล์ /tmp/input.txt และ /tmp/output.txt ต่อไปนี้ จะเป็นการ รับ Input จาก syslog จริง ๆ จากเซิร์ฟเวอร์ ผ่าน Filter และส่งผลออกไปเก็บใน ElasticSearch
ก่อนอื่น Stop Service ด้วยคำสั่ง
service logstash stop
สร้างไฟล์ 02-input-syslog.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
( เปิดรับ syslog ที่ tcp/5514 )
input {
syslog {
port => "5514"
}
}
สร้างไฟล์ 99-output-elasticsearch.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
( ในที่นี้ ใช้ ElasticSearch บน localhost ที่ tcp/9200 และ ไม่ได้ตั้ง Security ใด ๆ )
output {
elasticsearch {
hosts => ["localhost:9200"]
}
}
จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)
service logstash start
ที่เซิร์ฟเวอร์ที่จะส่ง Log มาเก็บใน ElasticSearch ผ่าน LogStash ให้แก้ไข /etc/rsyslog.d/50-default.conf ชี้ mail.* ไปยัง LogStash ที่ tcp/5514
mail.* @@logstash.ip:5514
หากทุกอย่างเรียบร้อย ก็จะสามารถดูผลจาก Kibana ได้อย่างสวยงาม
สามารถนำข้อมูลไปใช้วิเคราะห์ได้ต่อไป