Search results for: “elk”

  • ELK #09 Anomaly Detection (Case Study)

    ระบบ PSU Email ให้บริการผู้ใช้ของมหาวิทยาลัยสงขลานครินทร์ ซึ่งมีการใช้งานจากทั่วโลก ทั้งระบบประกอบขึ้นจากคอมพิวเตอร์หลายเครื่อง การจะตรวจสอบ Log เมื่อเกิด Incident ขึ้น อาจจะต้องใช้ระยะเวลานาน และเป็นการยากพอสมควรที่จะเชื่อมโยงความสัมพันธ์ของเหตุการณ์ และสรุปออกมาเป็นรายงานได้ จึงเริ่มใช้ ELK สำหรับรวบรวม Log ของทั้งระบบไว้ที่ส่วนกลาง และพัฒนาต่อยอดเพื่อการตรวจจับความผิดปรกติต่าง ๆ ได้

    ในบทความนี้ จะนำเสนอวิธีการใช้ ELK เพื่อตรวจจับ การ Login ที่ผิดปรกติบน PSU Email โดยจะสนใจ ผู้ใช้ที่มีการ Login จากนอกประเทศเป็นหลัก

    การส่ง Log จาก Server เข้า ELK

    ที่เครื่อง Server แต่ละเครื่อง กำหนดให้ส่ง Log จาก /etc/rsyslog.d/50-default.conf เข้าไปที่ your.logstash.server:port ตามที่กำหนดไว้

    การสร้าง Logstash Filter

    ที่ Logstash Server

    • Input เพื่อรับข้อมูลจาก syslog ที่ port ที่ต้องการ เช่นในที่นี้เป็น 5516 เป็นต้น
    • Filter ใช้ Grok Plugin เพื่อจับข้อมูล จาก message แบ่งเป็นส่วน ๆ ตามลักษณะ แล้วตั้งชื่อตาม Field ตามต้องการ ในที่นี้คือ description, username, domainname, clientip, actiondate, actiontime เป็นต้น (ตัวที่สำคัญในตอนนี้คือ username และ clientip)
    • Output ตั้งว่าให้ส่งผลไปยัง Elasticsearch ที่ “your.elasticsearch.server” ที่ port 9200

    [ตรงนี้มีกระบวนการบางอย่าง ซึ่งค่อยมาลงรายละเอียด]

    เมื่อมี Log ไหลเข้าสู่ Logstash และ ถูกประมวลผลแล้ว ก็จะเข้าสู่ Elasticsearch แล้ว ก็นำไปใช้งานบน Kibana

    หลังจากนั้น สามารถ Search ข้อมูล และใส่ Fields ที่สนใจ เช่น Time, Username, geoip.country_name และ description ได้ แล้ว Save เอาไว้ใช้งานต่อ ในที่นี้ ตั้งชื่อว่า squirrelmail-geoip

    จากนั้น สามารถเอาไปสร้างเป็น Visualization แบบ Coordinate Map ได้ เช่น ดูว่า มีการ Login Success / Failed Login / Sent จากที่ไหนบ้างในโลก

    จะเห็นได้ว่า ส่วนใหญ่ ใช้งานจากในประเทศไทย (วงกลมสีแดงเข้ม ๆ) ส่วนนอกประเทศ จะเป็นวงสีเหลืองเล็ก ๆ

    การตรวจหาการใช้งานที่ผิดปรกติ

    สร้าง Search ใหม่ กรองเฉพาะ ที่มี (exist) Username และ ไม่เป็น N/A และ มี (exist) geoip.country_code และ ไม่ใช่ Thailand แล้ว Save ไว้ใช้งานต่อไป ในที่ตั้งชื่อว่า squirrelmail-geoip-outside-th

    จากนั้น เอาไปสร้าง Visualization แบบ Vertical Bar
    กำหนดให้
    Y Axis เป็นจำนวน
    X Axis เป็น Username
    โดยที่ Group by geoip.country_name และ description
    ก็จะทำให้รู้ว่า ใครบ้างที่ มีการใช้งานนอกประเทศ และ เป็นการใช้งานแบบไหน

    จะเห็นได้ว่า จะมีบางคนที่ แสดงสีแค่สีเดียว กับบางคนมีหลายสี เนื่องจาก มีหลายประเทศ และ หลายประเภทการใช้งาน เราสามารถ กรองเอาเฉพาะ ข้อมูลที่สนใจได้ โดยคลิกที่ Inspect แล้วกดเครื่องหมาย + กับข้อมูลที่ต้องการ เช่น description ที่เป็น “Failed webmail login” ก็ได้

    ก็จะกรองเฉพาะ Username ที่มีการ Login จากต่างประเทศ แต่ไม่สำเร็จ จากภาพด้านล่าง แสดงว่า 3 คนนี้ น่าจะโดนอะไรเข้าแล้ว

    หรือ ถ้าจะกรองข้อมูล เฉพาะคนที่ “Failed webmail login” และ “Message sent via webmail” ก็ได้ แต่ต้องเปลี่ยน ชนิดการ Filter เป็น “is one of”

    ผลที่ได้ดังภาพ แต่เนื่องจาก ก็ยังเป็น 3 คนนี้อยู่ จะเห็นได้ว่า คน ๆ เดียว (ซ้ายสุด) มีการ Login จากหลายประเทศ ภายใน 24 ชั่วโมง

    ต่อไป ถ้าเราสนใจเฉพาะ คนที่ “ส่งอีเมล” จากนอกประเทศ ในเวลาที่กำหนด จะได้ผลประมาณนี้

    พบว่า คนซ้ายสุด คนเดิมนั่นแหล่ะ แต่เราจะมาดูรายละเอียด ก็คลิกที่ปุ่ม Inspect แล้ว เลือก Include เฉพาะ Username นั้น

    ก็พบว่า คนนี้มีการส่ง email ออกจากประเทศ USA, Canada, Panama, Argentina, Mexico แล้วบินมา UK ภายในวันเดียว –> ทำได้ไง !!! (ดังภาพด้านล่าง)

    เมื่อลองตรวจสอบ ก็จะพบว่า Username นี้ มีพฤติกรรม ส่ง Spam จริง ๆ ก็จะจัดการ “จำกัดความเสียหาย” ต่อไป

    วิธีการที่กล่าวมาข้างต้น สามารถสร้างเป็น Process อัตโนมัติ (เว้นแต่ขั้นตอนการ จำกัดความเสียหาย จะ Automatic ก็ได้ แต่ตอนนี้ขอ Manual ก่อน) เอาไว้สำหรับ Monitoring ได้ โดยอาจจะสั่งให้ เฝ้าดู 1 ชั่วโมงล่าสุด และ Refresh ทุก 1 นาที ดังภาพ

    หวังว่าจะเป็นประโยชน์

    ส่วนรายละเอียด คอยติดตามตอนต่อไปครับ

  • ELK #08 Oracle Audit Trail

    ต่อจา ELK #07 – Logstash คราวนี้ มาใช้งานจริง โดยใช้ ELK เพื่อเก็บ Log ของ Oracle Audit Trail

    1. Oracle Audit Trail บน Database Server เก็บ Log ในรูปแบบ XML โดยแต่ละ Event จะมี tag <AuditRecord> … </AuditRecord> คุมอยู่ ที่แตกต่างจาก Log ทั่วไปคือ ในแต่ละ Event จะมีเครื่องหมาย CRLF (การขึ้นบรรทัดใหม่) เป็นระยะ ๆ
    2. ออกแบบให้ Logstash รับข้อมูล (Input Plugin) จาก TCP Port 5515 ซึ่งต้องใช้ Codec ในการรวบ Multiline ในแต่ละ Event เข้าด้วยกัน โดยหา pattern “<AuditRecord>” เป็นจุดเริ่มต้น ส่วนบรรทัดที่ไม่เจอ Pattern ดังกล่าวนั้นการตั้งค่า negate => “true” เป็นการบอกว่า “ให้ดำเนินการต่อไป” โดยจะเอาบรรทัดที่ตามมาจากนี้ ต่อท้าย ด้วยการตั้งค่า what=> “previous”
    3. ในส่วนของ Filter Plugin จะอ่านค่าจาก “message” และ ส่งสิ่งที่ถอดจาก XML ได้ ไปยัง “doc”
    4. ในส่วของ Output Plugin จะส่งออกไปยัง ElasticSearch ที่ TCP port 9200

    ดัง Configuration ต่อไปนี้

    input {
       syslog {
          port => 5515
          codec => multiline {
               pattern => "<AuditRecord>"
               negate  => "true"
               what    => "previous"
          }
       }
    }
    filter {
       xml {
          source => "message"
          target => "doc"
       }
    }
    output {
      elasticsearch {
         hosts => ["elk.server:9200"]
      }
    }

    จากนั้น ทาง Oracle Database Server ทำการเปิด Audit Trail แล้วเขียน Log ลงไฟล์ แล้วเขียน Cron เพื่อ Netcat ไฟล์ส่งมาให้ Lostash ที่เปิด Port TCP 5515 ไว้รอรับ

    ผลที่ได้คือ

    โดยวิธีนี้ จะเป็นการนำ Log ซึ่งจากเดิมเป็น Text Format นำมาเป็น NoSQL ได้ ซึ่งจะสามารถ Query ข้อมูลได้ง่ายยิ่งขึ้น

    หวังว่าจะเป็นประโยชน์ครับ

  • ELK #07 LogStash

    จากที่ได้กล่าวถึงมายาวนานในเรื่อง ELK  และ  ELK #02 ที่ได้กล่าวถึงการติดตั้ง LogStash ไว้เบื้องต้น ในบทความนี้จะมาลงลึก ถึงกระบวนการทำงานของ LogStash ซึ่งเป็นส่วนสำคัญในการเปลี่ยนข้อมูล Unstructured ให้เป็น Structured

    ตอนนี้ เราจะทำงานใน /etc/logstash/conf.d/

    Simple input – output plugin

    สร้างไฟล์ 01-input-file.conf มีเนื้อหาดังนี้

    input {
    	file {
    		path => ["/tmp/input.txt"]
    		mode => "tail"
    		}
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash อ่านไฟล์ /tmp/input.txt โดยให้อ่านบรรทัดล่าสุด (ต่อจาก Checkpoint ก่อนหน้า) เข้ามา โดยถ้าไม่กำหนด mode => “tail” ระบบจะอ่านไฟล์ก็ต่อเมื่อ มีการสร้างไฟล์ใหม่เท่านั้น

    สร้างไฟล์ 98-output-file.conf มีเนื้อหาดังนี้

    output {
            file {
                    path => "/tmp/output.txt"
            }
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash เขียนไฟล์ /tmp/output.txt

    เมื่อปรับเปลี่ยน configuration ต้องทำการ Restart Service

    service logstash restart
    

    ลองส่งข้อมูลเข้าไปในไฟล์ /tmp/input.txt ด้วยคำสั่ง

    echo "Hello World 1" >> /tmp/input.txt

    ดูผลลัพธ์ใน /tmp/output.txt

    cat /tmp/output.txt
    {"path":"/tmp/input.txt","@version":"1","message":"Hello World 1","@timestamp":"2018-09-11T03:42:33.645Z","host":"elk1"}

    แสดงให้เห็นว่า ระบบ LogStash สามารถรับข้อมูลจากไฟล์ และส่งข้อมูลออกไปยังไฟล์ได้

    Filter Plugin

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    ในการจัดการข้อมูลก่อนบันทึก เช่นการกรอง การจัดรูปแบบ LogStash ทำงานผ่าน Filter Plugin ซึ่งมีหลายรูปแบบ (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) แต่ในที่นี้ จะใช้ grok เหมาะกับข้อมูล Unstructured อย่าง syslog เป็นต้น ซึ่งมักจะเป็น Log ที่ให้มนุษย์อ่านได้ง่าย แต่ไม่ค่อยเหมาะสำหรับให้คอมพิวเตอร์เอาไปใช้งานต่อ ซึ่ง LogStash มีไว้ให้แล้วกว่า 120 ตัว

    ตัวอย่าง grok-pattern

    ต่อไป สร้าง 44-filter-basic.conf มีเนื้อหาดังนี้

    filter {
            grok {
                    match => {
                            "message" => "%{IP:ipaddress} %{NUMBER:size}"
                    }
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "192.168.1.1 120" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"message":"192.168.1.1 120","@version":"1","path":"/tmp/input.txt","@timestamp":"2018-09-11T04:56:03.662Z","size":"120","host":"elk1","ipaddress":"192.168.1.1"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    Example : Postfix Log

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    เนื่องจาก Log แต่ละชนิด แต่ละซอฟต์แวร์มีความหลากหลายมาก แต่ดีที่มีผู้เชี่ยวชาญเค้าเขียน Pattern เอาไว้ให้ ให้ใช้คำสั่งต่อไปนี้ สร้างไดเรคทอรี่ /etc/logstash/patterns.d/ และ ดาวน์โหลด มาเก็บไว้

    mkdir /etc/logstash/patterns.d
    wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns -O /etc/logstash/patterns.d/grok-patterns
    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/postfix.grok -O /etc/logstash/patterns.d/postfix.grok
    
    

    ในกรณีของ Postfix จากนั้น ดาวน์โหลด Filter Plugin มาเก็บไว้ใน /etc/logstash/conf.d/ ด้วยคำสั่ง

    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/50-filter-postfix.conf -O /etc/logstash/conf.d/50-filter-postfix.conf
    

    และ ต้องสร้างอีกไฟล์ เพื่อเตรียมข้อมูล ชื่อ 49-filter-postfix-prepare.conf ใน /etc/logstash/conf.d/ เนื้อหาตามนี้

    filter {
    	grok {
        		match => { "message" => "%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{DATA:program}(?:\[%{POSINT}\])?: %{GREEDYDATA:message}" }
        		overwrite => "message"
    	}
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "Sep 11 12:05:26 mailscan postfix/smtp[105836]: 268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"program":"postfix/smtp","postfix_delay":43.0,"postfix_dsn":"2.0.0","postfix_relay_port":25,"message":"268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)","path":"/tmp/input.txt","postfix_queueid":"268E04DFFE6","postfix_delay_conn_setup":0.01,"@version":"1","host":"elk1","postfix_to":"xxx.y@psu.ac.th","postfix_relay_hostname":"mail.psu.ac.th","postfix_delay_transmission":0.01,"tags":["_grokparsefailure","_grok_postfix_success"],"postfix_smtp_response":"250 2.0.0 Ok: queued as DE294461637","postfix_delay_before_qmgr":43.0,"postfix_relay_ip":"192.168.107.11","@timestamp":"2018-09-11T07:57:20.354Z","postfix_delay_in_qmgr":0.0,"postfix_status":"sent"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    From Syslog to ElasticSearch

    จากตัวอย่างข้างต้น เราทำงานกับไฟล์ /tmp/input.txt และ /tmp/output.txt ต่อไปนี้ จะเป็นการ รับ Input จาก syslog จริง ๆ จากเซิร์ฟเวอร์ ผ่าน Filter และส่งผลออกไปเก็บใน ElasticSearch

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    สร้างไฟล์ 02-input-syslog.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( เปิดรับ syslog ที่ tcp/5514 )

    input {
            syslog {
                    port => "5514"
            }
    }
    

    สร้างไฟล์ 99-output-elasticsearch.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( ในที่นี้ ใช้ ElasticSearch บน localhost ที่ tcp/9200 และ ไม่ได้ตั้ง Security ใด ๆ )

    output {
            elasticsearch {
                    hosts => ["localhost:9200"]
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    ที่เซิร์ฟเวอร์ที่จะส่ง Log มาเก็บใน ElasticSearch ผ่าน LogStash ให้แก้ไข /etc/rsyslog.d/50-default.conf ชี้ mail.* ไปยัง LogStash  ที่ tcp/5514

    mail.* @@logstash.ip:5514
    

    หากทุกอย่างเรียบร้อย ก็จะสามารถดูผลจาก Kibana ได้อย่างสวยงาม

    สามารถนำข้อมูลไปใช้วิเคราะห์ได้ต่อไป

  • ELK #6 วิธีการติดตั้ง ELK และ Geoserver แบบ Docker ให้ทำงานร่วมกัน

    จาก ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS และ การสร้าง Web Map Service (WMS) บน Geoserver ก็จะเห็นถึงการนำไปใช้เบื้องต้น

    >> ขอบคุณ คุณนพัส กังวานตระกูล สถานวิจัยสารสนเทศภูมิศาสตร์ทรัพยากรธรรมชาติและสิ่งแวดล้อม ศูนย์ภูมิภาคเทคโนโลยีอวกาศและภูมิสารสนเทศ (ภาคใต้) สำหรับความรู้มากมายครับ <<

     

    ต่อไปนี้ จะเป็นขั้นตอนการติดตั้ง ELK และ Geoserver แบบ Docker โดยผมได้สร้าง Github Repository เอาไว้ ซึ่งได้แก้ไขให้ระบบสามารถเก็บข้อมูลไว้ภายนอก

    Prerequisite

    1. ถ้าเป็น Windows ก็ต้องติดตั้ง Docker Toolbox หรือ Docker for Windows ให้เรียบร้อย
    2. ถ้าเป็น Linux ก็ติดตั้ง docker-ce ให้เรียบร้อย (เรียนรู้เกี่ยวกับ Docker ได้จาก ติดตั้ง docker 17.06.0 CE บน Ubuntu)

    ขั้นตอนการติดตั้ง

    1. สร้าง Folder ชื่อ Docker เอาไว้ในเครื่อง เช่นใน Documents หรือ จะเป็น D:\ หรืออะไรก็แล้วแต่
    2. เปิด Terminal หรือ Docker Quickstart Terminal จากนั้นให้ cd เข้าไปมา Folder “Docker” ที่สร้างไว้
    3. ดึง ELK ลงมา ด้วยคำสั่ง
      git clone https://github.com/deviantony/docker-elk.git
    4. ดึง Geoserver ลงมา ด้วยคำสั่ง (อันนี้ผมทำต่อยอดเค้าอีกทีหนึ่ง ต้นฉบับคือ https://hub.docker.com/r/fiware/gisdataprovider/)
      git clone https://github.com/nagarindkx/geoserver.git
    5. เนื่องจาก ไม่อยากจะไปแก้ไข Git ของต้นฉบับ เราจึงต้องปรับแต่งนิดหน่อยเอง
      ให้แก้ไขไฟล์ docker-elk/docker-compose.yml
      โดยจะเพิ่ม Volume  “data” เพื่อไป mount ส่วนของ data directory ของ Elasticsearch ออกมาจาก Containerแก้ไขจาก

      elasticsearch:
       build: elasticsearch/
       volumes:
       - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml

      เป็น

      elasticsearch:
       build: elasticsearch/
       volumes:
       - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
       - ./elasticsearch/data:/usr/share/elasticsearch/data
    6. สร้าง docker-elk/elasticsearch/data
      mkdir docker-elk/elasticsearch/data
    7. แก้ไขไฟล์ docker-elk/logstash/pipeline/logstash.conf ตามต้องการ เช่น ใส่ filter
      filter {
       csv {
         separator => ","
         columns => [
      	"cid","name","lname","pid","house","road","diagcode","latitude","longitude","village","tambon","ampur","changwat"
         ]
       }
       if [cid] == "CID" {
         drop { }
       } else {
         # continue processing data
         mutate {
           remove_field => [ "message" ]
         }
         mutate {
           convert => { "longitude" => "float" }
           convert => { "latitude" => "float" }
         }
         mutate {
           rename => {
             "longitude" => "[geoip][location][lon]"
             "latitude" => "[geoip][location][lat]"
           }
         }
       }
      }
    8. จาก Terminal ให้เข้าไปใน docker-elk แล้ว start ด้วยคำสั่ง
      cd docker-elk
      docker-compose up -d
    9. จาก Terminal ให้เข้าไปใน geoserver แล้ว start ด้วยคำสั่ง
      cd ../geoserver
      docker-compose up -d

    ถึงขั้นตอนนี้ ก็จะได้ ELK และ Geoserver ทำงานขึ้นแล้ว

    ELK: http://localhost:5601

    Geoserver: http://localhost:9090/geoserver/web

     

    ขั้นตอนต่อไป จะเป็นการ นำข้อมูลเข้า และ เชื่อ Kibana กับ Geoserver

    วิธีการนำข้อมูลเข้า Elasticsearch

    เนื่องจาก pipeline ของ Logstash กำหนดว่า จะรับข้อมูลทาง TCP Port 5000 จึงใช้วิธี netcat ไฟล์เข้าไป ด้วยคำสั่ง (ตัวอย่างนี้ ใช้ข้อมูลจากไฟล์ sample.csv)

    cat sample.csv | nc localhost 5000

    วิธีการดึง Map จาก Geoserver มาใช้งานใน Kibana

    ทำตามขั้นตอนที่กล่าวไว้ใน การสร้าง Web Map Service (WMS) บน Geoserver ซึ่งจะได้ URL ของ Layer Preview มา ประมาณนี้
    http://localhost:9090/geoserver/test/wms?service=WMS&version=1.1.0&request=GetMap&layers=test:hadyai_vil&styles=&bbox=631866.963048935,748605.6609660918,677997.0295239205,791055.6681053439&width=768&height=706&srs=EPSG:32647&format=application/openlayers

    ทำตามขั้นตอนที่กล่าวไว้ใน ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS ในส่วนของ วิธีใส่ Map Server อื่น แล้วเอา URL นี้ไปใส่ และรายละเอียดเกี่ยวกับ Layer, version, format ตามที่กำหนดไว้ ก็จะสามารถเอา Map ที่เราต้องการ พร้อม Shape File มาใช้งานได้

    หวังว่าจะเป็นประโยชน์ครับ

  • ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS

    คราวนี้ มาดูการประยุกต์ใช้ ELK ในงานด้าน GIS

    ต่อจาก ELK #01 > ELK #02 > ELK #03 > ELK #04 ซึ่งเป็นการติดตั้งทั้งหมด คราวนี้มาดูการประยุกต์ใช้งานกันบ้าง

    โจทย์มีอยู่ว่า มีการไปเก็บข้อมูลในภาคสนาม แล้วมีการบันทึก พิกัดด้วย GPS เป็น Latitude กับ Longitude พร้อมกับค่าบางอย่าง ทั้งหมดถูกเก็บไว้ในฐานข้อมูล MySQL

    การนำข้อมูลเข้า ELK ก็เลย Export ข้อมูลจาก MySQL มาเป็น CSV File ประกอบด้วย

    id,LATITUDE,LONGITUDE,something

    ตัวอย่างข้อมูล มีดังนี้

    id,LATITUDE,LONGITUDE,something
    1,6.97585,100.448963,100
    2,6.975627,100.450841,19
    3,6.973472,100.449196,65
    4,6.973468,100.449104,53
    5,6.973455,100.449135,33
    6,6.973252,100.44888,13
    7,6.985862,100.45292,85
    8,6.993386,100.416214,90
    9,7.005465,100.447984,1

    นำข้อมูลเข้า ELK ผ่านทาง Logstash

    ใน  ELK #2 ได้อธิบายขั้นตอนการติดตั้ง Logstash ไว้แล้วนั้น ต่อไปเป็นการนำข้อมูลชนิด CSV เข้าไปใส่ใน Elasticsearch

    Logstash จะอ่าน “กระบวนการทำงาน” หรือเรียกว่า Pipeline จากไฟล์ Configuration ซึ่งประกอบด้วย 3 ส่วนหลักๆ คือ Input, Filter และ Output

    input {
       stdin { }
    }

    ในส่วน input นี้ จะเป็นการอ่าน STDIN หรือ ทาง Terminal

    filter {
     csv {
       separator => ","
       columns => [
         "id","latitude","longitude","something"
       ]
     }
     if [id] == "id" {
       drop { }
     } else {
       # continue processing data
       mutate {
         remove_field => [ "message" ]
       }
       mutate {
         convert => { "something" => "integer" }
         convert => { "longitude" => "float" }
         convert => { "latitude" => "float" }
       }
       mutate {
         rename => {
           "longitude" => "[geoip][location][lon]"
           "latitude" => "[geoip][location][lat]"
         }
       }
     }
    }

    ในส่วนของ filter นี้ เริ่มจาก เลือกใช้ Filter Plugin ชื่อ “csv” เพื่อจัดการไฟล์ CSV โดยกำหนด “separator” เป็น “,” แล้วกำหนดว่ามีชื่อ Column เป็น “id”,”latitude”,”longitude”,”something”

    จากนั้น ก็ตรวจสอบว่า ถ้าข้อมูลที่อ่านเข้ามา ใน Column “id” มีค่าเป็น “id” (ซึ่งก็คือบรรทัดหัวตารางของไฟล์ csv นั่นเอง) ก้ให้ “drop” ไป

    แต่หากไม่ใช่ ก็ให้ทำดังนี้ (mutate คือการแก้ไข)

    • remove field ชื่อ message (ซึ่งจะปรากฏเป็น Default อยู่ ก็เลยเอาออกเพราะไม่จำเป็น)
    • convert หรือ เปลี่ยน “ชนิด”  ของแต่ละ field เป็นไปตามที่ต้องการ ได้แก่ ให้ something เป็น Integer, latitude และ longitude เป็น float
    • rename จาก latitude เป็น [geoip][location][lat] และ longitude เป็น [geoip][location][lon] ซึ่งตรงนี้สำคัญ เพราะ geoip.location Field ข้อมูลชนิก “geo_point” ซึ่งจำเป็นต่อการนำไปใช้งานเกำหนดตำแหน่งพิกัดบนแผนที่ (เป็น Field ที่สร้างจาก Template พื้นฐานของ Logstash ซึ่งจะไม่กล่าวถึงในบทความนี้)
    output {
     stdout { codec => rubydebug }
     elasticsearch {
       hosts => ["http://your.elastic.host:9200"]
     }
    }

    ในส่วนของ Output จะกำหนดว่า ข้อมูลที่อ่านจาก csv และผ่าน filter ตามที่กล่าวมาข้างต้น จะส่งไปที่ใน จากการกำหนดนี้ บอกว่า จะส่งออกไป

    • stdout คือ การแสดงผลออกมาทาง terminal โดยมีรูปแบบเป็น rubydebug (รูปแบบหนึ่ง)
    • Elasticsearch ซึ่งอยู่ที่ http://your.elastic.host:9200

    จากนั้น Save ไฟล์นี้ แล้วตั้งชื่อว่า gis.conf

    แล้วใช้คำสั่ง

    cat sample1.csv | /usr/share/logstash/bin/logstash -f gis.conf

    การแสดงผลข้อมูลใน Elasticsearch ผ่าน Kibana

    จากบทความก่อนหน้า ได้แสดงวิธีการติดตั้ง Kibana และเชื่อมต่อกับ Elasticsearch แล้ว โดยจะเข้าถึง Kibana ได้ทางเว็บไซต์ http://your.kibana.host:5601

    ในกระบวนการของ Logstash ข้างต้น จะไปสร้าง Elasticsearch Index ชื่อ “logstash-YYYY-MM-DD”, ใน Kibana ก็จะต้องไป คลิกที่ Setting (รูปเฟือง) จากนั้นคลิกที่ Index Pattern โดยให้ไปอ่าน index ซึ่งมีชื่อเป็น Pattern คือ “logstash-*” จากนั้น คลิกปุ่ม Create

    จะได้ผลประมาณนี้

    ต่อไป คลิกที่ Discover ก็จะเห็นข้อมูลเข้ามา

    แสดงข้อมูลในรูปแบบของ Tile Map

    คลิกที่ Visualization > Create a visualization

    เลือก Tile Map

    เลือก Index ที่ต้องการ ในที่นี้คือ logstash-*

    คลิก Geo Coordinates

    จากนั้น คลิก Apply แล้วคลิก Fit Data Bound

    ก็จะได้เฉพาะ พื้นที่ทีมีข้อมุล

    วิธีใส่ Map Server อื่น

    ปัญหาของ Defaul Map Service ที่มากับ Kibana คือ Elastic Map Service นั้น จะจำกัดระดับในการ Zoom จึงต้องหา WMS (Web Map Service) อื่นมาใช้แทน ต้องขอบคุณ คุณนพัส กังวานตระกูล สถานวิจัยสารสนเทศภูมิศาสตร์ทรัพยากรธรรมชาติและสิ่งแวดล้อม ศูนย์ภูมิภาคเทคโนโลยีอวกาศและภูมิสารสนเทศ (ภาคใต้)  สำหรับคำแนะนำในการใช้งาน WMS และระบบ GIS ตลอดมาครับ 🙂

    โดย เราจะใช้ WMS ของ Longdo Map API : http://api.longdo.com/map/doc/
    ข้อมูลการใช้งาน เอามาจาก http://api.longdo.com/map/doc/demo/advance/02-layer.php

    วิธีการตั้งค่าใน Kibana

    คลิกที่ Option > WMS compliant map server
    แล้วกรอกข้อมูล

    URL : https://ms.longdo.com/mapproxy/service
    Layer: bluemarble_terrain
    Version: 1.3.0
    Format: image/png
    Attribute: Longdo API

    จากนั้นคลิก Apply

    จากนั้นให้ Save พร้อมตั้งชื่อ

    ซึ่ง Longdo Map API สามารถ Zoom ได้ละเอียดพอสมควร

    สามารถนำเสนอระบบ GIS ได้บน Website ทันที

    หวังว่าจะเป็นประโยชน์ครับ

  • ELK #04

    คราวนี้มาติดตั้งบน Docker บ้าง

    1. ถ้าเครื่อง Server เป็น Ubuntu 16.04 ทำตามขั้นตอนนี้เพื่อให้สามารถใช้งาน Docker ได้
      วิธีการติดตั้ง Docker บน Ubuntu 16.04
    2. เนื่องจาก Elasticsearch 5.x ใช้ Virtual Memory มากขึ้น ลองใช้คำสั่งนี้ดูค่าปัจจุบัน
       sysctl vm.max_map_count

      ค่า Default น่าจะประมาณนี้
      vm.max_map_count = 65530 ให้ทำการเพิ่มด้วยคำสั่งนี้

      sudo -i
      sudo echo "vm.max_map_count=262144" >> /etc/sysctl.conf
      exit

      จากนั้นให้ทำการ Reboot

    3. ติดตั้ง docker image ของ sebp/elk ด้วยคำสั่ง
       sudo docker pull sebp/elk

      โดย Default จะได้ Lastest Version

    4. ใช้คำสี่งต่อไปนี้ เพื่อ Start ELK ขึ้นมา โดยเปิด port ให้ Kibana: 5601, Elasticsearch: 9200, Logstash: 5044 และทำงานเป็นแบบ Detach หรือ Background นั่นเอง
      sudo docker run -d -p 5601:5601 -p 9200:9200 -p 5044:5044 -it --name elk sebp/elk

      หรือถ้าจะใช้ Docker Compose ก็สามารถใช้งานด้วยวิธีการนี้
      เริ่มจาก ติดตั้ง Docker Compose ด้วยคำสั่ง

      sudo -i
      curl -L https://github.com/docker/compose/releases/download/1.14.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
      chmod +x /usr/local/bin/docker-compose
      exit

      จากนั้น สร้างไฟล์ /path/to/your/config/elk.yml เนื้อหาดังนี้

      elk:
       image: sebp/elk
       ports:
       - "5601:5601"
       - "9200:9200"
       - "5044:5044"

      จากนั้นก็ Start

      sudo /usr/local/bin/docker-compose -f /path/to/your/config/elk.yml  up -d elk

      หากต้องการให้ container ทำการ start ทุกครั้งที่ Reboot ใช้คำสั่ง

      sudo crontab -e

      แล้วใส่บรรทัดนี้ต่อท้ายไฟล์

      @reboot /usr/local/bin/docker-compose -f /home/mama/elk.yml up -d elk
    5. ทดสอบว่า Container ที่กำลังทำงานอยู่มีอะไรบ้าง
      sudo docker ps

      วิธีดูว่า มี Container อะไรบ้าง (ทั้งที่ทำงานและไม่ทำงาน)

      sudo docker ps -a

      วิธีดูว่าเกิดอะไรขึ้นกับ Container (ในที่นี้ ชื่อ elk)

      sudo docker logs elk
    6. ถึงจุดนี้ ก็สามารถใช้งาน Kibana ทาง web url: http://your.host:5601 ได้แล้ว

     

    Reference:

    https://elk-docker.readthedocs.io/

     

  • ELK #03

    วิธีการติดตั้ง Kibana บน Ubuntu 16.04

    1. ก่อนอื่น Update
      sudo apt -y update ; sudo apt -y upgrade
    2. ติดตั้ง Java JDK
      sudo apt -y install default-jdk
    3. Download และติดตั้ง
      wget https://artifacts.elastic.co/downloads/kibana/kibana-5.4.2-amd64.deb
      sudo dpkg -i kibana-5.4.2-amd64.deb
    4. จากนั้นแก้ไขไฟล์ /etc/kibana/kibana.yml
      เพิ่มบรรทัดสุดท้าย

      server.host: "192.168.xxx.yyy"
      elasticsearch.url: "http://your.elastic.host:9200"
    5. จากนั้น Start Service
      sudo service kibana start
    6. เปิด Web Browser ไปที่
      http://192.168.xxx.yyy:5601
  • ELK #02

    ขั้นตอนการติดตั้ง Logstash บน Ubuntu 16.04

    1. ก่อนอื่น Update
      sudo apt -y update ; sudo apt -y upgrade
    2. ติดตั้ง Java JDK
      sudo apt -y install default-jdk
    3. Download และติดตั้ง
      wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.2.deb
      sudo dpkg -i logstash-5.4.2.deb
    4. Start Logstash Service
      sudo service logstash start
    5. ต่อไป สร้าง Configuration ไว้ใน /etc/logstash/conf.d/
      เช่น จะสร้าง Pipeline ที่อ่านจาก File /tmp/test.log แล้ว ส่งไปที่ Elasticsearch โดยตรง
      ให้สร้างไฟล์ /etc/logstash/conf.d/file.conf ดังนี้

      input {
              file {
                      path => "/tmp/test.log"
                      type=> "test"
              }
      }
      output {
              file {
                      path => "/tmp/output.txt"
              }
      }
      
    6. เมื่อลองใช้คำสั่ง
       echo "$(date): New World" >> /tmp/test.log
      

      ก็จะปรากฏไฟล์ /tmp/output.txt ขึ้น

    7. ต่อไป ลองเปลี่ยน Output เป็น Elasticsearch โดยการสร้างไฟล์ /etc/logstash/conf.d/es.conf
      input {
              file {
                      path => "/tmp/test.log"
                      type=> "test"
              }
      }
      output {
              elasticsearch {
                      hosts => ["http://your.elastic.host:9200"]
              }
      }
      
      
    8. เมื่อลองใช้คำสั่ง
       echo "$(date): New World" >> /tmp/test.log
      

      ก็จะปรากฏบรรทัดใหม่ใน /tmp/output.txt และ มีการเขียนไปบน Elasticsearch ด้วย

    9. ลองเปิด Web Browser แล้วใช้คำสั่งต่อไปนี้
      http://your.elastic.host:9200/_cat/indices?v
      ก็จะได้ผลลัพธ์ประมาณนี้
    10. จากนั้น วิธีที่จะแสดงผลที่เก็บไว้ใน Elasticsearch ให้เปิด URL นี้
      http://your.elastic.host:9200/logstash-2017.06.24/_search?q=*
      ก็จะได้ผลลัพธ์ประมาณนี้

    แล้วยังไง ??? รอดูตอนต่อไป

  • ELK #01

    ELK = ElasticSearch + LogStash + Kibana

    วิธีการติดตั้ง ElasticSearch บน Ubuntu 16.04

    1. ก่อนอื่น Update
      sudo apt -y update ; sudo apt -y upgrade
    2. ติดตั้ง Java JDK
      sudo apt -y install default-jdk
    3. Download และติดตั้ง
      wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.2.deb
      sudo dpkg -i elasticsearch-5.4.2.deb
      sudo update-rc.d elasticsearch defaults 95 10
    4. แก้ไขไฟล์ /etc/elasticsearch/elasticsearch.yml โดยเพิ่มบรรทัดสุดท้าย
      cluster.name: my-cluster-name
      network.host: [_site_]
      node.name: ${HOSTNAME}
    5. เริ่มทำงาน
      sudo -i service elasticsearch start
    6. ทดสอบการทำงาน โดยใช้คำสั่ง
      curl -XGET “${HOSTNAME}:9200/?pretty”
    7. ดู Log ได้ที่
      sudo tail -f /var/log/elasticsearch/my-cluster-name.log