Tag: logstash

  • ELK #07 LogStash

    จากที่ได้กล่าวถึงมายาวนานในเรื่อง ELK  และ  ELK #02 ที่ได้กล่าวถึงการติดตั้ง LogStash ไว้เบื้องต้น ในบทความนี้จะมาลงลึก ถึงกระบวนการทำงานของ LogStash ซึ่งเป็นส่วนสำคัญในการเปลี่ยนข้อมูล Unstructured ให้เป็น Structured

    ตอนนี้ เราจะทำงานใน /etc/logstash/conf.d/

    Simple input – output plugin

    สร้างไฟล์ 01-input-file.conf มีเนื้อหาดังนี้

    input {
    	file {
    		path => ["/tmp/input.txt"]
    		mode => "tail"
    		}
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash อ่านไฟล์ /tmp/input.txt โดยให้อ่านบรรทัดล่าสุด (ต่อจาก Checkpoint ก่อนหน้า) เข้ามา โดยถ้าไม่กำหนด mode => “tail” ระบบจะอ่านไฟล์ก็ต่อเมื่อ มีการสร้างไฟล์ใหม่เท่านั้น

    สร้างไฟล์ 98-output-file.conf มีเนื้อหาดังนี้

    output {
            file {
                    path => "/tmp/output.txt"
            }
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash เขียนไฟล์ /tmp/output.txt

    เมื่อปรับเปลี่ยน configuration ต้องทำการ Restart Service

    service logstash restart
    

    ลองส่งข้อมูลเข้าไปในไฟล์ /tmp/input.txt ด้วยคำสั่ง

    echo "Hello World 1" >> /tmp/input.txt

    ดูผลลัพธ์ใน /tmp/output.txt

    cat /tmp/output.txt
    {"path":"/tmp/input.txt","@version":"1","message":"Hello World 1","@timestamp":"2018-09-11T03:42:33.645Z","host":"elk1"}

    แสดงให้เห็นว่า ระบบ LogStash สามารถรับข้อมูลจากไฟล์ และส่งข้อมูลออกไปยังไฟล์ได้

    Filter Plugin

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    ในการจัดการข้อมูลก่อนบันทึก เช่นการกรอง การจัดรูปแบบ LogStash ทำงานผ่าน Filter Plugin ซึ่งมีหลายรูปแบบ (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) แต่ในที่นี้ จะใช้ grok เหมาะกับข้อมูล Unstructured อย่าง syslog เป็นต้น ซึ่งมักจะเป็น Log ที่ให้มนุษย์อ่านได้ง่าย แต่ไม่ค่อยเหมาะสำหรับให้คอมพิวเตอร์เอาไปใช้งานต่อ ซึ่ง LogStash มีไว้ให้แล้วกว่า 120 ตัว

    ตัวอย่าง grok-pattern

    ต่อไป สร้าง 44-filter-basic.conf มีเนื้อหาดังนี้

    filter {
            grok {
                    match => {
                            "message" => "%{IP:ipaddress} %{NUMBER:size}"
                    }
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "192.168.1.1 120" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"message":"192.168.1.1 120","@version":"1","path":"/tmp/input.txt","@timestamp":"2018-09-11T04:56:03.662Z","size":"120","host":"elk1","ipaddress":"192.168.1.1"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    Example : Postfix Log

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    เนื่องจาก Log แต่ละชนิด แต่ละซอฟต์แวร์มีความหลากหลายมาก แต่ดีที่มีผู้เชี่ยวชาญเค้าเขียน Pattern เอาไว้ให้ ให้ใช้คำสั่งต่อไปนี้ สร้างไดเรคทอรี่ /etc/logstash/patterns.d/ และ ดาวน์โหลด มาเก็บไว้

    mkdir /etc/logstash/patterns.d
    wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns -O /etc/logstash/patterns.d/grok-patterns
    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/postfix.grok -O /etc/logstash/patterns.d/postfix.grok
    
    

    ในกรณีของ Postfix จากนั้น ดาวน์โหลด Filter Plugin มาเก็บไว้ใน /etc/logstash/conf.d/ ด้วยคำสั่ง

    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/50-filter-postfix.conf -O /etc/logstash/conf.d/50-filter-postfix.conf
    

    และ ต้องสร้างอีกไฟล์ เพื่อเตรียมข้อมูล ชื่อ 49-filter-postfix-prepare.conf ใน /etc/logstash/conf.d/ เนื้อหาตามนี้

    filter {
    	grok {
        		match => { "message" => "%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{DATA:program}(?:\[%{POSINT}\])?: %{GREEDYDATA:message}" }
        		overwrite => "message"
    	}
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "Sep 11 12:05:26 mailscan postfix/smtp[105836]: 268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"program":"postfix/smtp","postfix_delay":43.0,"postfix_dsn":"2.0.0","postfix_relay_port":25,"message":"268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)","path":"/tmp/input.txt","postfix_queueid":"268E04DFFE6","postfix_delay_conn_setup":0.01,"@version":"1","host":"elk1","postfix_to":"xxx.y@psu.ac.th","postfix_relay_hostname":"mail.psu.ac.th","postfix_delay_transmission":0.01,"tags":["_grokparsefailure","_grok_postfix_success"],"postfix_smtp_response":"250 2.0.0 Ok: queued as DE294461637","postfix_delay_before_qmgr":43.0,"postfix_relay_ip":"192.168.107.11","@timestamp":"2018-09-11T07:57:20.354Z","postfix_delay_in_qmgr":0.0,"postfix_status":"sent"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    From Syslog to ElasticSearch

    จากตัวอย่างข้างต้น เราทำงานกับไฟล์ /tmp/input.txt และ /tmp/output.txt ต่อไปนี้ จะเป็นการ รับ Input จาก syslog จริง ๆ จากเซิร์ฟเวอร์ ผ่าน Filter และส่งผลออกไปเก็บใน ElasticSearch

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    สร้างไฟล์ 02-input-syslog.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( เปิดรับ syslog ที่ tcp/5514 )

    input {
            syslog {
                    port => "5514"
            }
    }
    

    สร้างไฟล์ 99-output-elasticsearch.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( ในที่นี้ ใช้ ElasticSearch บน localhost ที่ tcp/9200 และ ไม่ได้ตั้ง Security ใด ๆ )

    output {
            elasticsearch {
                    hosts => ["localhost:9200"]
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    ที่เซิร์ฟเวอร์ที่จะส่ง Log มาเก็บใน ElasticSearch ผ่าน LogStash ให้แก้ไข /etc/rsyslog.d/50-default.conf ชี้ mail.* ไปยัง LogStash  ที่ tcp/5514

    mail.* @@logstash.ip:5514
    

    หากทุกอย่างเรียบร้อย ก็จะสามารถดูผลจาก Kibana ได้อย่างสวยงาม

    สามารถนำข้อมูลไปใช้วิเคราะห์ได้ต่อไป

  • ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS

    คราวนี้ มาดูการประยุกต์ใช้ ELK ในงานด้าน GIS

    ต่อจาก ELK #01 > ELK #02 > ELK #03 > ELK #04 ซึ่งเป็นการติดตั้งทั้งหมด คราวนี้มาดูการประยุกต์ใช้งานกันบ้าง

    โจทย์มีอยู่ว่า มีการไปเก็บข้อมูลในภาคสนาม แล้วมีการบันทึก พิกัดด้วย GPS เป็น Latitude กับ Longitude พร้อมกับค่าบางอย่าง ทั้งหมดถูกเก็บไว้ในฐานข้อมูล MySQL

    การนำข้อมูลเข้า ELK ก็เลย Export ข้อมูลจาก MySQL มาเป็น CSV File ประกอบด้วย

    id,LATITUDE,LONGITUDE,something

    ตัวอย่างข้อมูล มีดังนี้

    id,LATITUDE,LONGITUDE,something
    1,6.97585,100.448963,100
    2,6.975627,100.450841,19
    3,6.973472,100.449196,65
    4,6.973468,100.449104,53
    5,6.973455,100.449135,33
    6,6.973252,100.44888,13
    7,6.985862,100.45292,85
    8,6.993386,100.416214,90
    9,7.005465,100.447984,1

    นำข้อมูลเข้า ELK ผ่านทาง Logstash

    ใน  ELK #2 ได้อธิบายขั้นตอนการติดตั้ง Logstash ไว้แล้วนั้น ต่อไปเป็นการนำข้อมูลชนิด CSV เข้าไปใส่ใน Elasticsearch

    Logstash จะอ่าน “กระบวนการทำงาน” หรือเรียกว่า Pipeline จากไฟล์ Configuration ซึ่งประกอบด้วย 3 ส่วนหลักๆ คือ Input, Filter และ Output

    input {
       stdin { }
    }

    ในส่วน input นี้ จะเป็นการอ่าน STDIN หรือ ทาง Terminal

    filter {
     csv {
       separator => ","
       columns => [
         "id","latitude","longitude","something"
       ]
     }
     if [id] == "id" {
       drop { }
     } else {
       # continue processing data
       mutate {
         remove_field => [ "message" ]
       }
       mutate {
         convert => { "something" => "integer" }
         convert => { "longitude" => "float" }
         convert => { "latitude" => "float" }
       }
       mutate {
         rename => {
           "longitude" => "[geoip][location][lon]"
           "latitude" => "[geoip][location][lat]"
         }
       }
     }
    }

    ในส่วนของ filter นี้ เริ่มจาก เลือกใช้ Filter Plugin ชื่อ “csv” เพื่อจัดการไฟล์ CSV โดยกำหนด “separator” เป็น “,” แล้วกำหนดว่ามีชื่อ Column เป็น “id”,”latitude”,”longitude”,”something”

    จากนั้น ก็ตรวจสอบว่า ถ้าข้อมูลที่อ่านเข้ามา ใน Column “id” มีค่าเป็น “id” (ซึ่งก็คือบรรทัดหัวตารางของไฟล์ csv นั่นเอง) ก้ให้ “drop” ไป

    แต่หากไม่ใช่ ก็ให้ทำดังนี้ (mutate คือการแก้ไข)

    • remove field ชื่อ message (ซึ่งจะปรากฏเป็น Default อยู่ ก็เลยเอาออกเพราะไม่จำเป็น)
    • convert หรือ เปลี่ยน “ชนิด”  ของแต่ละ field เป็นไปตามที่ต้องการ ได้แก่ ให้ something เป็น Integer, latitude และ longitude เป็น float
    • rename จาก latitude เป็น [geoip][location][lat] และ longitude เป็น [geoip][location][lon] ซึ่งตรงนี้สำคัญ เพราะ geoip.location Field ข้อมูลชนิก “geo_point” ซึ่งจำเป็นต่อการนำไปใช้งานเกำหนดตำแหน่งพิกัดบนแผนที่ (เป็น Field ที่สร้างจาก Template พื้นฐานของ Logstash ซึ่งจะไม่กล่าวถึงในบทความนี้)
    output {
     stdout { codec => rubydebug }
     elasticsearch {
       hosts => ["http://your.elastic.host:9200"]
     }
    }

    ในส่วนของ Output จะกำหนดว่า ข้อมูลที่อ่านจาก csv และผ่าน filter ตามที่กล่าวมาข้างต้น จะส่งไปที่ใน จากการกำหนดนี้ บอกว่า จะส่งออกไป

    • stdout คือ การแสดงผลออกมาทาง terminal โดยมีรูปแบบเป็น rubydebug (รูปแบบหนึ่ง)
    • Elasticsearch ซึ่งอยู่ที่ http://your.elastic.host:9200

    จากนั้น Save ไฟล์นี้ แล้วตั้งชื่อว่า gis.conf

    แล้วใช้คำสั่ง

    cat sample1.csv | /usr/share/logstash/bin/logstash -f gis.conf

    การแสดงผลข้อมูลใน Elasticsearch ผ่าน Kibana

    จากบทความก่อนหน้า ได้แสดงวิธีการติดตั้ง Kibana และเชื่อมต่อกับ Elasticsearch แล้ว โดยจะเข้าถึง Kibana ได้ทางเว็บไซต์ http://your.kibana.host:5601

    ในกระบวนการของ Logstash ข้างต้น จะไปสร้าง Elasticsearch Index ชื่อ “logstash-YYYY-MM-DD”, ใน Kibana ก็จะต้องไป คลิกที่ Setting (รูปเฟือง) จากนั้นคลิกที่ Index Pattern โดยให้ไปอ่าน index ซึ่งมีชื่อเป็น Pattern คือ “logstash-*” จากนั้น คลิกปุ่ม Create

    จะได้ผลประมาณนี้

    ต่อไป คลิกที่ Discover ก็จะเห็นข้อมูลเข้ามา

    แสดงข้อมูลในรูปแบบของ Tile Map

    คลิกที่ Visualization > Create a visualization

    เลือก Tile Map

    เลือก Index ที่ต้องการ ในที่นี้คือ logstash-*

    คลิก Geo Coordinates

    จากนั้น คลิก Apply แล้วคลิก Fit Data Bound

    ก็จะได้เฉพาะ พื้นที่ทีมีข้อมุล

    วิธีใส่ Map Server อื่น

    ปัญหาของ Defaul Map Service ที่มากับ Kibana คือ Elastic Map Service นั้น จะจำกัดระดับในการ Zoom จึงต้องหา WMS (Web Map Service) อื่นมาใช้แทน ต้องขอบคุณ คุณนพัส กังวานตระกูล สถานวิจัยสารสนเทศภูมิศาสตร์ทรัพยากรธรรมชาติและสิ่งแวดล้อม ศูนย์ภูมิภาคเทคโนโลยีอวกาศและภูมิสารสนเทศ (ภาคใต้)  สำหรับคำแนะนำในการใช้งาน WMS และระบบ GIS ตลอดมาครับ 🙂

    โดย เราจะใช้ WMS ของ Longdo Map API : http://api.longdo.com/map/doc/
    ข้อมูลการใช้งาน เอามาจาก http://api.longdo.com/map/doc/demo/advance/02-layer.php

    วิธีการตั้งค่าใน Kibana

    คลิกที่ Option > WMS compliant map server
    แล้วกรอกข้อมูล

    URL : https://ms.longdo.com/mapproxy/service
    Layer: bluemarble_terrain
    Version: 1.3.0
    Format: image/png
    Attribute: Longdo API

    จากนั้นคลิก Apply

    จากนั้นให้ Save พร้อมตั้งชื่อ

    ซึ่ง Longdo Map API สามารถ Zoom ได้ละเอียดพอสมควร

    สามารถนำเสนอระบบ GIS ได้บน Website ทันที

    หวังว่าจะเป็นประโยชน์ครับ