Tag: elk

  • [กันลืม] Elasticsearch API พื้นฐาน

    INDEX

    วิธีดูว่ามี index อะไรบ้าง

    GET /_cat

    Response

    =^.^=
    /_cat/allocation
    /_cat/shards
    /_cat/shards/{index}
    /_cat/master
    /_cat/nodes
    /_cat/tasks
    /_cat/indices
    /_cat/indices/{index}
    /_cat/segments
    /_cat/segments/{index}
    /_cat/count
    /_cat/count/{index}
    

    เมื่อทราบว่ามี index อะไรบ้าง ต้องการดูรายละเอียด ใส่ query string parameter (qrs) “v”

    GET /_cat/indices?v

    Response

    health status index                    uuid                   pri rep docs.count docs.deleted store.size pri.store.size
    yellow open   logstash-2020.06.20      JeGeb67mSNKN3gCgC9AWCQ   1   1    4573283            0    808.7mb        808.7mb
    yellow open   logstash-2020.05.30      Bg8-hUUAS0m4VbGs0-1lIw   1   1    4124813            0    721.9mb        721.9mb
    yellow open   logstash-2020.05.31      qJX9VbaySG2ssBXMTwmapA   1   1    4333363            0    772.6mb        772.6mb
    yellow open   logstash-2020.06.21      9on104uKQnGllS5QZNyUKg   1   1    4712048            0    869.2mb        869.2mb
    yellow open   logstash-2020.06.22      lVfX4tHjSUeMF640wmIzLw   1   1    8441356            0      1.6gb          1.6gb
    yellow open   logstash-2020.06.01      P_9wbTd7Qo6YNme-NRvFLQ   1   1    7360129            0      1.3gb          1.3gb
    yellow open   logstash-2020.06.23      xvQRTdNHTt2wKfiSlvR87A   1   1    7631987            0      1.4gb          1.4gb
    yellow open   logstash-2020.06.02      H-kPw4FXQ-W1AphBfjtcMw   1   1    7344418            0      1.3gb          1.3gb
    

    ต้องการทราบว่า แต่ละ Fields มีความหมายอย่างไร ใช้ qrs ‘help’

    GET /_cat/indices?help

    Response

    health                           | h                              | current health status                                                                                            
    status                           | s                              | open/close status                                                                                                
    index                            | i,idx                          | index name                                                                                                       
    uuid                             | id,uuid                        | index uuid                                                                                                       
    pri                              | p,shards.primary,shardsPrimary | number of primary shards                                                                                         
    rep                              | r,shards.replica,shardsReplica | number of replica shards                                                                                         
    docs.count                       | dc,docsCount                   | available docs                                                                                                   
    docs.deleted                     | dd,docsDeleted                 | deleted docs                                                                                                     
    creation.date                    | cd                             | index creation date (millisecond value)                                                                          
    creation.date.string             | cds                            | index creation date (as string)                                                                                  
    store.size                       | ss,storeSize                   | store size of primaries & replicas 
    

    ต้องการแสดงเฉพาะบาง Fields ใช้ qrs ‘h=’

    GET /_cat/indices?h=idx,dc,ss&v

    Response

    idx                           dc      ss
    logstash-2020.06.20      4573283 808.7mb
    logstash-2020.05.30      4124813 721.9mb
    logstash-2020.05.31      4333363 772.6mb
    logstash-2020.06.21      4712048 869.2mb
    logstash-2020.06.22      8441356   1.6gb
    logstash-2020.06.23      7631987   1.4gb
    logstash-2020.06.01      7360129   1.3gb
    logstash-2020.06.02      7344418   1.3gb
    logstash-2020.06.24      7300718   1.4gb
    

    ต้องการดูขนาดจัดเก็บ ใช้ qrs ‘bytes=’

    GET /_cat/indices?h=idx,dc,ss&bytes=b&v

    Response

    idx                           dc         ss
    logstash-2020.05.30      4124813  756971768
    logstash-2020.06.20      4573283  848085505
    logstash-2020.05.31      4333363  810175019
    logstash-2020.06.21      4712048  911450929
    logstash-2020.06.22      8441356 1736003983
    logstash-2020.06.01      7360129 1455314526
    logstash-2020.06.23      7631987 1559554324
    logstash-2020.06.24      7300718 1506134380
    logstash-2020.06.02      7344418 1484297643
    logstash-2020.06.25      8409242 1747862513
    logstash-2020.06.03      4424701  860877705
    

    ต้องการเรียงลำดับ ใช้ qrs ‘s=’ และ สามารถกำกับ ‘:desc’, ‘:asc’

    GET /_cat/indices?h=idx,dc,ss&bytes=b&s=ss:desc&v

    ลบ INDEX

    DELETE /kx01

    DOCUMENTS

    Document เป็น JSON ที่มีรายละเอียดเกี่ยวกับการสร้างขึ้นมา เช่น _id, _version และ _source ซึ่ง source หรือ (stored fields)

    create / update if exist

    POST /kx01/_doc/1
    {
      "name": "kanakorn",
      "HN": "1746436"
    }
    

    โดยใน Index เดียวกับ เก็บ Document คนละ Schema กันก็ได้

    POST /kx01/_doc/2
    {
      "HR": "100",
      "RR": "88",
      "age": 10
    }
    

    Check if exist

    HEAD /kx01/_doc/1/

    Response

    200 - OK

    get a source (stored fields)

    GET /kx01/_doc/1/

    Response

    {
      "_index" : "kx01",
      "_type" : "_doc",
      "_id" : "1",
      "_version" : 1,
      "_seq_no" : 0,
      "_primary_term" : 1,
      "found" : true,
      "_source" : {
        "name" : "kanakorn",
        "HN" : "1234567"
      }
    }

    get only document value

    GET /kx01/_source/1/

    Response

    {
      "name" : "kanakorn",
      "HN" : "1234567"
    }

    อื่น ๆ

    • Source filtering
      • source_include
      • source_exclude
  • ELK #09 Anomaly Detection (Case Study)

    ระบบ PSU Email ให้บริการผู้ใช้ของมหาวิทยาลัยสงขลานครินทร์ ซึ่งมีการใช้งานจากทั่วโลก ทั้งระบบประกอบขึ้นจากคอมพิวเตอร์หลายเครื่อง การจะตรวจสอบ Log เมื่อเกิด Incident ขึ้น อาจจะต้องใช้ระยะเวลานาน และเป็นการยากพอสมควรที่จะเชื่อมโยงความสัมพันธ์ของเหตุการณ์ และสรุปออกมาเป็นรายงานได้ จึงเริ่มใช้ ELK สำหรับรวบรวม Log ของทั้งระบบไว้ที่ส่วนกลาง และพัฒนาต่อยอดเพื่อการตรวจจับความผิดปรกติต่าง ๆ ได้

    ในบทความนี้ จะนำเสนอวิธีการใช้ ELK เพื่อตรวจจับ การ Login ที่ผิดปรกติบน PSU Email โดยจะสนใจ ผู้ใช้ที่มีการ Login จากนอกประเทศเป็นหลัก

    การส่ง Log จาก Server เข้า ELK

    ที่เครื่อง Server แต่ละเครื่อง กำหนดให้ส่ง Log จาก /etc/rsyslog.d/50-default.conf เข้าไปที่ your.logstash.server:port ตามที่กำหนดไว้

    การสร้าง Logstash Filter

    ที่ Logstash Server

    • Input เพื่อรับข้อมูลจาก syslog ที่ port ที่ต้องการ เช่นในที่นี้เป็น 5516 เป็นต้น
    • Filter ใช้ Grok Plugin เพื่อจับข้อมูล จาก message แบ่งเป็นส่วน ๆ ตามลักษณะ แล้วตั้งชื่อตาม Field ตามต้องการ ในที่นี้คือ description, username, domainname, clientip, actiondate, actiontime เป็นต้น (ตัวที่สำคัญในตอนนี้คือ username และ clientip)
    • Output ตั้งว่าให้ส่งผลไปยัง Elasticsearch ที่ “your.elasticsearch.server” ที่ port 9200

    [ตรงนี้มีกระบวนการบางอย่าง ซึ่งค่อยมาลงรายละเอียด]

    เมื่อมี Log ไหลเข้าสู่ Logstash และ ถูกประมวลผลแล้ว ก็จะเข้าสู่ Elasticsearch แล้ว ก็นำไปใช้งานบน Kibana

    หลังจากนั้น สามารถ Search ข้อมูล และใส่ Fields ที่สนใจ เช่น Time, Username, geoip.country_name และ description ได้ แล้ว Save เอาไว้ใช้งานต่อ ในที่นี้ ตั้งชื่อว่า squirrelmail-geoip

    จากนั้น สามารถเอาไปสร้างเป็น Visualization แบบ Coordinate Map ได้ เช่น ดูว่า มีการ Login Success / Failed Login / Sent จากที่ไหนบ้างในโลก

    จะเห็นได้ว่า ส่วนใหญ่ ใช้งานจากในประเทศไทย (วงกลมสีแดงเข้ม ๆ) ส่วนนอกประเทศ จะเป็นวงสีเหลืองเล็ก ๆ

    การตรวจหาการใช้งานที่ผิดปรกติ

    สร้าง Search ใหม่ กรองเฉพาะ ที่มี (exist) Username และ ไม่เป็น N/A และ มี (exist) geoip.country_code และ ไม่ใช่ Thailand แล้ว Save ไว้ใช้งานต่อไป ในที่ตั้งชื่อว่า squirrelmail-geoip-outside-th

    จากนั้น เอาไปสร้าง Visualization แบบ Vertical Bar
    กำหนดให้
    Y Axis เป็นจำนวน
    X Axis เป็น Username
    โดยที่ Group by geoip.country_name และ description
    ก็จะทำให้รู้ว่า ใครบ้างที่ มีการใช้งานนอกประเทศ และ เป็นการใช้งานแบบไหน

    จะเห็นได้ว่า จะมีบางคนที่ แสดงสีแค่สีเดียว กับบางคนมีหลายสี เนื่องจาก มีหลายประเทศ และ หลายประเภทการใช้งาน เราสามารถ กรองเอาเฉพาะ ข้อมูลที่สนใจได้ โดยคลิกที่ Inspect แล้วกดเครื่องหมาย + กับข้อมูลที่ต้องการ เช่น description ที่เป็น “Failed webmail login” ก็ได้

    ก็จะกรองเฉพาะ Username ที่มีการ Login จากต่างประเทศ แต่ไม่สำเร็จ จากภาพด้านล่าง แสดงว่า 3 คนนี้ น่าจะโดนอะไรเข้าแล้ว

    หรือ ถ้าจะกรองข้อมูล เฉพาะคนที่ “Failed webmail login” และ “Message sent via webmail” ก็ได้ แต่ต้องเปลี่ยน ชนิดการ Filter เป็น “is one of”

    ผลที่ได้ดังภาพ แต่เนื่องจาก ก็ยังเป็น 3 คนนี้อยู่ จะเห็นได้ว่า คน ๆ เดียว (ซ้ายสุด) มีการ Login จากหลายประเทศ ภายใน 24 ชั่วโมง

    ต่อไป ถ้าเราสนใจเฉพาะ คนที่ “ส่งอีเมล” จากนอกประเทศ ในเวลาที่กำหนด จะได้ผลประมาณนี้

    พบว่า คนซ้ายสุด คนเดิมนั่นแหล่ะ แต่เราจะมาดูรายละเอียด ก็คลิกที่ปุ่ม Inspect แล้ว เลือก Include เฉพาะ Username นั้น

    ก็พบว่า คนนี้มีการส่ง email ออกจากประเทศ USA, Canada, Panama, Argentina, Mexico แล้วบินมา UK ภายในวันเดียว –> ทำได้ไง !!! (ดังภาพด้านล่าง)

    เมื่อลองตรวจสอบ ก็จะพบว่า Username นี้ มีพฤติกรรม ส่ง Spam จริง ๆ ก็จะจัดการ “จำกัดความเสียหาย” ต่อไป

    วิธีการที่กล่าวมาข้างต้น สามารถสร้างเป็น Process อัตโนมัติ (เว้นแต่ขั้นตอนการ จำกัดความเสียหาย จะ Automatic ก็ได้ แต่ตอนนี้ขอ Manual ก่อน) เอาไว้สำหรับ Monitoring ได้ โดยอาจจะสั่งให้ เฝ้าดู 1 ชั่วโมงล่าสุด และ Refresh ทุก 1 นาที ดังภาพ

    หวังว่าจะเป็นประโยชน์

    ส่วนรายละเอียด คอยติดตามตอนต่อไปครับ

  • ELK #08 Oracle Audit Trail

    ต่อจา ELK #07 – Logstash คราวนี้ มาใช้งานจริง โดยใช้ ELK เพื่อเก็บ Log ของ Oracle Audit Trail

    1. Oracle Audit Trail บน Database Server เก็บ Log ในรูปแบบ XML โดยแต่ละ Event จะมี tag <AuditRecord> … </AuditRecord> คุมอยู่ ที่แตกต่างจาก Log ทั่วไปคือ ในแต่ละ Event จะมีเครื่องหมาย CRLF (การขึ้นบรรทัดใหม่) เป็นระยะ ๆ
    2. ออกแบบให้ Logstash รับข้อมูล (Input Plugin) จาก TCP Port 5515 ซึ่งต้องใช้ Codec ในการรวบ Multiline ในแต่ละ Event เข้าด้วยกัน โดยหา pattern “<AuditRecord>” เป็นจุดเริ่มต้น ส่วนบรรทัดที่ไม่เจอ Pattern ดังกล่าวนั้นการตั้งค่า negate => “true” เป็นการบอกว่า “ให้ดำเนินการต่อไป” โดยจะเอาบรรทัดที่ตามมาจากนี้ ต่อท้าย ด้วยการตั้งค่า what=> “previous”
    3. ในส่วนของ Filter Plugin จะอ่านค่าจาก “message” และ ส่งสิ่งที่ถอดจาก XML ได้ ไปยัง “doc”
    4. ในส่วของ Output Plugin จะส่งออกไปยัง ElasticSearch ที่ TCP port 9200

    ดัง Configuration ต่อไปนี้

    input {
       syslog {
          port => 5515
          codec => multiline {
               pattern => "<AuditRecord>"
               negate  => "true"
               what    => "previous"
          }
       }
    }
    filter {
       xml {
          source => "message"
          target => "doc"
       }
    }
    output {
      elasticsearch {
         hosts => ["elk.server:9200"]
      }
    }

    จากนั้น ทาง Oracle Database Server ทำการเปิด Audit Trail แล้วเขียน Log ลงไฟล์ แล้วเขียน Cron เพื่อ Netcat ไฟล์ส่งมาให้ Lostash ที่เปิด Port TCP 5515 ไว้รอรับ

    ผลที่ได้คือ

    โดยวิธีนี้ จะเป็นการนำ Log ซึ่งจากเดิมเป็น Text Format นำมาเป็น NoSQL ได้ ซึ่งจะสามารถ Query ข้อมูลได้ง่ายยิ่งขึ้น

    หวังว่าจะเป็นประโยชน์ครับ

  • ELK #07 LogStash

    จากที่ได้กล่าวถึงมายาวนานในเรื่อง ELK  และ  ELK #02 ที่ได้กล่าวถึงการติดตั้ง LogStash ไว้เบื้องต้น ในบทความนี้จะมาลงลึก ถึงกระบวนการทำงานของ LogStash ซึ่งเป็นส่วนสำคัญในการเปลี่ยนข้อมูล Unstructured ให้เป็น Structured

    ตอนนี้ เราจะทำงานใน /etc/logstash/conf.d/

    Simple input – output plugin

    สร้างไฟล์ 01-input-file.conf มีเนื้อหาดังนี้

    input {
    	file {
    		path => ["/tmp/input.txt"]
    		mode => "tail"
    		}
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash อ่านไฟล์ /tmp/input.txt โดยให้อ่านบรรทัดล่าสุด (ต่อจาก Checkpoint ก่อนหน้า) เข้ามา โดยถ้าไม่กำหนด mode => “tail” ระบบจะอ่านไฟล์ก็ต่อเมื่อ มีการสร้างไฟล์ใหม่เท่านั้น

    สร้างไฟล์ 98-output-file.conf มีเนื้อหาดังนี้

    output {
            file {
                    path => "/tmp/output.txt"
            }
    }
    

    ในส่วนนี้ เป็นการกำหนดว่า ให้ LogStash เขียนไฟล์ /tmp/output.txt

    เมื่อปรับเปลี่ยน configuration ต้องทำการ Restart Service

    service logstash restart
    

    ลองส่งข้อมูลเข้าไปในไฟล์ /tmp/input.txt ด้วยคำสั่ง

    echo "Hello World 1" >> /tmp/input.txt

    ดูผลลัพธ์ใน /tmp/output.txt

    cat /tmp/output.txt
    {"path":"/tmp/input.txt","@version":"1","message":"Hello World 1","@timestamp":"2018-09-11T03:42:33.645Z","host":"elk1"}

    แสดงให้เห็นว่า ระบบ LogStash สามารถรับข้อมูลจากไฟล์ และส่งข้อมูลออกไปยังไฟล์ได้

    Filter Plugin

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    ในการจัดการข้อมูลก่อนบันทึก เช่นการกรอง การจัดรูปแบบ LogStash ทำงานผ่าน Filter Plugin ซึ่งมีหลายรูปแบบ (https://www.elastic.co/guide/en/logstash/current/filter-plugins.html) แต่ในที่นี้ จะใช้ grok เหมาะกับข้อมูล Unstructured อย่าง syslog เป็นต้น ซึ่งมักจะเป็น Log ที่ให้มนุษย์อ่านได้ง่าย แต่ไม่ค่อยเหมาะสำหรับให้คอมพิวเตอร์เอาไปใช้งานต่อ ซึ่ง LogStash มีไว้ให้แล้วกว่า 120 ตัว

    ตัวอย่าง grok-pattern

    ต่อไป สร้าง 44-filter-basic.conf มีเนื้อหาดังนี้

    filter {
            grok {
                    match => {
                            "message" => "%{IP:ipaddress} %{NUMBER:size}"
                    }
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "192.168.1.1 120" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"message":"192.168.1.1 120","@version":"1","path":"/tmp/input.txt","@timestamp":"2018-09-11T04:56:03.662Z","size":"120","host":"elk1","ipaddress":"192.168.1.1"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    Example : Postfix Log

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    เนื่องจาก Log แต่ละชนิด แต่ละซอฟต์แวร์มีความหลากหลายมาก แต่ดีที่มีผู้เชี่ยวชาญเค้าเขียน Pattern เอาไว้ให้ ให้ใช้คำสั่งต่อไปนี้ สร้างไดเรคทอรี่ /etc/logstash/patterns.d/ และ ดาวน์โหลด มาเก็บไว้

    mkdir /etc/logstash/patterns.d
    wget https://raw.githubusercontent.com/logstash-plugins/logstash-patterns-core/master/patterns/grok-patterns -O /etc/logstash/patterns.d/grok-patterns
    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/postfix.grok -O /etc/logstash/patterns.d/postfix.grok
    
    

    ในกรณีของ Postfix จากนั้น ดาวน์โหลด Filter Plugin มาเก็บไว้ใน /etc/logstash/conf.d/ ด้วยคำสั่ง

    wget https://raw.githubusercontent.com/whyscream/postfix-grok-patterns/master/50-filter-postfix.conf -O /etc/logstash/conf.d/50-filter-postfix.conf
    

    และ ต้องสร้างอีกไฟล์ เพื่อเตรียมข้อมูล ชื่อ 49-filter-postfix-prepare.conf ใน /etc/logstash/conf.d/ เนื้อหาตามนี้

    filter {
    	grok {
        		match => { "message" => "%{SYSLOGTIMESTAMP} %{SYSLOGHOST} %{DATA:program}(?:\[%{POSINT}\])?: %{GREEDYDATA:message}" }
        		overwrite => "message"
    	}
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    แล้วส่งข้อมูลต่อไปนี้ต่อท้ายไฟล์ /tmp/input.txt

    echo "Sep 11 12:05:26 mailscan postfix/smtp[105836]: 268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)" >> /tmp/input.txt

    และเมื่อดูผลใน /tmp/output.txt จะพบบรรทัดสุดท้าย

    {"program":"postfix/smtp","postfix_delay":43.0,"postfix_dsn":"2.0.0","postfix_relay_port":25,"message":"268E04DFFE6: to=, relay=mail.psu.ac.th[192.168.107.11]:25, delay=43, delays=43/0/0.01/0.01, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as DE294461637)","path":"/tmp/input.txt","postfix_queueid":"268E04DFFE6","postfix_delay_conn_setup":0.01,"@version":"1","host":"elk1","postfix_to":"xxx.y@psu.ac.th","postfix_relay_hostname":"mail.psu.ac.th","postfix_delay_transmission":0.01,"tags":["_grokparsefailure","_grok_postfix_success"],"postfix_smtp_response":"250 2.0.0 Ok: queued as DE294461637","postfix_delay_before_qmgr":43.0,"postfix_relay_ip":"192.168.107.11","@timestamp":"2018-09-11T07:57:20.354Z","postfix_delay_in_qmgr":0.0,"postfix_status":"sent"}

    แสดงให้เห็นว่า สามารถใช้ filter นี้ แยกแยะข้อมูลเบื้องต้นได้

    From Syslog to ElasticSearch

    จากตัวอย่างข้างต้น เราทำงานกับไฟล์ /tmp/input.txt และ /tmp/output.txt ต่อไปนี้ จะเป็นการ รับ Input จาก syslog จริง ๆ จากเซิร์ฟเวอร์ ผ่าน Filter และส่งผลออกไปเก็บใน ElasticSearch

    ก่อนอื่น Stop Service ด้วยคำสั่ง

    service logstash stop
    

    สร้างไฟล์ 02-input-syslog.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( เปิดรับ syslog ที่ tcp/5514 )

    input {
            syslog {
                    port => "5514"
            }
    }
    

    สร้างไฟล์ 99-output-elasticsearch.conf ไว้ใน /etc/logstash/conf.d/ เนื้อหาดังนี้
    ( ในที่นี้ ใช้ ElasticSearch บน localhost ที่ tcp/9200 และ ไม่ได้ตั้ง Security ใด ๆ )

    output {
            elasticsearch {
                    hosts => ["localhost:9200"]
            }
    }
    

    จากนั้น Start Service ด้วยคำสั่ง (รอสักครู่ด้วย)

    service logstash start
    

    ที่เซิร์ฟเวอร์ที่จะส่ง Log มาเก็บใน ElasticSearch ผ่าน LogStash ให้แก้ไข /etc/rsyslog.d/50-default.conf ชี้ mail.* ไปยัง LogStash  ที่ tcp/5514

    mail.* @@logstash.ip:5514
    

    หากทุกอย่างเรียบร้อย ก็จะสามารถดูผลจาก Kibana ได้อย่างสวยงาม

    สามารถนำข้อมูลไปใช้วิเคราะห์ได้ต่อไป

  • ELK #6 วิธีการติดตั้ง ELK และ Geoserver แบบ Docker ให้ทำงานร่วมกัน

    จาก ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS และ การสร้าง Web Map Service (WMS) บน Geoserver ก็จะเห็นถึงการนำไปใช้เบื้องต้น

    >> ขอบคุณ คุณนพัส กังวานตระกูล สถานวิจัยสารสนเทศภูมิศาสตร์ทรัพยากรธรรมชาติและสิ่งแวดล้อม ศูนย์ภูมิภาคเทคโนโลยีอวกาศและภูมิสารสนเทศ (ภาคใต้) สำหรับความรู้มากมายครับ <<

     

    ต่อไปนี้ จะเป็นขั้นตอนการติดตั้ง ELK และ Geoserver แบบ Docker โดยผมได้สร้าง Github Repository เอาไว้ ซึ่งได้แก้ไขให้ระบบสามารถเก็บข้อมูลไว้ภายนอก

    Prerequisite

    1. ถ้าเป็น Windows ก็ต้องติดตั้ง Docker Toolbox หรือ Docker for Windows ให้เรียบร้อย
    2. ถ้าเป็น Linux ก็ติดตั้ง docker-ce ให้เรียบร้อย (เรียนรู้เกี่ยวกับ Docker ได้จาก ติดตั้ง docker 17.06.0 CE บน Ubuntu)

    ขั้นตอนการติดตั้ง

    1. สร้าง Folder ชื่อ Docker เอาไว้ในเครื่อง เช่นใน Documents หรือ จะเป็น D:\ หรืออะไรก็แล้วแต่
    2. เปิด Terminal หรือ Docker Quickstart Terminal จากนั้นให้ cd เข้าไปมา Folder “Docker” ที่สร้างไว้
    3. ดึง ELK ลงมา ด้วยคำสั่ง
      git clone https://github.com/deviantony/docker-elk.git
    4. ดึง Geoserver ลงมา ด้วยคำสั่ง (อันนี้ผมทำต่อยอดเค้าอีกทีหนึ่ง ต้นฉบับคือ https://hub.docker.com/r/fiware/gisdataprovider/)
      git clone https://github.com/nagarindkx/geoserver.git
    5. เนื่องจาก ไม่อยากจะไปแก้ไข Git ของต้นฉบับ เราจึงต้องปรับแต่งนิดหน่อยเอง
      ให้แก้ไขไฟล์ docker-elk/docker-compose.yml
      โดยจะเพิ่ม Volume  “data” เพื่อไป mount ส่วนของ data directory ของ Elasticsearch ออกมาจาก Containerแก้ไขจาก

      elasticsearch:
       build: elasticsearch/
       volumes:
       - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml

      เป็น

      elasticsearch:
       build: elasticsearch/
       volumes:
       - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
       - ./elasticsearch/data:/usr/share/elasticsearch/data
    6. สร้าง docker-elk/elasticsearch/data
      mkdir docker-elk/elasticsearch/data
    7. แก้ไขไฟล์ docker-elk/logstash/pipeline/logstash.conf ตามต้องการ เช่น ใส่ filter
      filter {
       csv {
         separator => ","
         columns => [
      	"cid","name","lname","pid","house","road","diagcode","latitude","longitude","village","tambon","ampur","changwat"
         ]
       }
       if [cid] == "CID" {
         drop { }
       } else {
         # continue processing data
         mutate {
           remove_field => [ "message" ]
         }
         mutate {
           convert => { "longitude" => "float" }
           convert => { "latitude" => "float" }
         }
         mutate {
           rename => {
             "longitude" => "[geoip][location][lon]"
             "latitude" => "[geoip][location][lat]"
           }
         }
       }
      }
    8. จาก Terminal ให้เข้าไปใน docker-elk แล้ว start ด้วยคำสั่ง
      cd docker-elk
      docker-compose up -d
    9. จาก Terminal ให้เข้าไปใน geoserver แล้ว start ด้วยคำสั่ง
      cd ../geoserver
      docker-compose up -d

    ถึงขั้นตอนนี้ ก็จะได้ ELK และ Geoserver ทำงานขึ้นแล้ว

    ELK: http://localhost:5601

    Geoserver: http://localhost:9090/geoserver/web

     

    ขั้นตอนต่อไป จะเป็นการ นำข้อมูลเข้า และ เชื่อ Kibana กับ Geoserver

    วิธีการนำข้อมูลเข้า Elasticsearch

    เนื่องจาก pipeline ของ Logstash กำหนดว่า จะรับข้อมูลทาง TCP Port 5000 จึงใช้วิธี netcat ไฟล์เข้าไป ด้วยคำสั่ง (ตัวอย่างนี้ ใช้ข้อมูลจากไฟล์ sample.csv)

    cat sample.csv | nc localhost 5000

    วิธีการดึง Map จาก Geoserver มาใช้งานใน Kibana

    ทำตามขั้นตอนที่กล่าวไว้ใน การสร้าง Web Map Service (WMS) บน Geoserver ซึ่งจะได้ URL ของ Layer Preview มา ประมาณนี้
    http://localhost:9090/geoserver/test/wms?service=WMS&version=1.1.0&request=GetMap&layers=test:hadyai_vil&styles=&bbox=631866.963048935,748605.6609660918,677997.0295239205,791055.6681053439&width=768&height=706&srs=EPSG:32647&format=application/openlayers

    ทำตามขั้นตอนที่กล่าวไว้ใน ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS ในส่วนของ วิธีใส่ Map Server อื่น แล้วเอา URL นี้ไปใส่ และรายละเอียดเกี่ยวกับ Layer, version, format ตามที่กำหนดไว้ ก็จะสามารถเอา Map ที่เราต้องการ พร้อม Shape File มาใช้งานได้

    หวังว่าจะเป็นประโยชน์ครับ

  • ELK #5 การประยุกต์ใช้ ELK ในงานด้าน GIS

    คราวนี้ มาดูการประยุกต์ใช้ ELK ในงานด้าน GIS

    ต่อจาก ELK #01 > ELK #02 > ELK #03 > ELK #04 ซึ่งเป็นการติดตั้งทั้งหมด คราวนี้มาดูการประยุกต์ใช้งานกันบ้าง

    โจทย์มีอยู่ว่า มีการไปเก็บข้อมูลในภาคสนาม แล้วมีการบันทึก พิกัดด้วย GPS เป็น Latitude กับ Longitude พร้อมกับค่าบางอย่าง ทั้งหมดถูกเก็บไว้ในฐานข้อมูล MySQL

    การนำข้อมูลเข้า ELK ก็เลย Export ข้อมูลจาก MySQL มาเป็น CSV File ประกอบด้วย

    id,LATITUDE,LONGITUDE,something

    ตัวอย่างข้อมูล มีดังนี้

    id,LATITUDE,LONGITUDE,something
    1,6.97585,100.448963,100
    2,6.975627,100.450841,19
    3,6.973472,100.449196,65
    4,6.973468,100.449104,53
    5,6.973455,100.449135,33
    6,6.973252,100.44888,13
    7,6.985862,100.45292,85
    8,6.993386,100.416214,90
    9,7.005465,100.447984,1

    นำข้อมูลเข้า ELK ผ่านทาง Logstash

    ใน  ELK #2 ได้อธิบายขั้นตอนการติดตั้ง Logstash ไว้แล้วนั้น ต่อไปเป็นการนำข้อมูลชนิด CSV เข้าไปใส่ใน Elasticsearch

    Logstash จะอ่าน “กระบวนการทำงาน” หรือเรียกว่า Pipeline จากไฟล์ Configuration ซึ่งประกอบด้วย 3 ส่วนหลักๆ คือ Input, Filter และ Output

    input {
       stdin { }
    }

    ในส่วน input นี้ จะเป็นการอ่าน STDIN หรือ ทาง Terminal

    filter {
     csv {
       separator => ","
       columns => [
         "id","latitude","longitude","something"
       ]
     }
     if [id] == "id" {
       drop { }
     } else {
       # continue processing data
       mutate {
         remove_field => [ "message" ]
       }
       mutate {
         convert => { "something" => "integer" }
         convert => { "longitude" => "float" }
         convert => { "latitude" => "float" }
       }
       mutate {
         rename => {
           "longitude" => "[geoip][location][lon]"
           "latitude" => "[geoip][location][lat]"
         }
       }
     }
    }

    ในส่วนของ filter นี้ เริ่มจาก เลือกใช้ Filter Plugin ชื่อ “csv” เพื่อจัดการไฟล์ CSV โดยกำหนด “separator” เป็น “,” แล้วกำหนดว่ามีชื่อ Column เป็น “id”,”latitude”,”longitude”,”something”

    จากนั้น ก็ตรวจสอบว่า ถ้าข้อมูลที่อ่านเข้ามา ใน Column “id” มีค่าเป็น “id” (ซึ่งก็คือบรรทัดหัวตารางของไฟล์ csv นั่นเอง) ก้ให้ “drop” ไป

    แต่หากไม่ใช่ ก็ให้ทำดังนี้ (mutate คือการแก้ไข)

    • remove field ชื่อ message (ซึ่งจะปรากฏเป็น Default อยู่ ก็เลยเอาออกเพราะไม่จำเป็น)
    • convert หรือ เปลี่ยน “ชนิด”  ของแต่ละ field เป็นไปตามที่ต้องการ ได้แก่ ให้ something เป็น Integer, latitude และ longitude เป็น float
    • rename จาก latitude เป็น [geoip][location][lat] และ longitude เป็น [geoip][location][lon] ซึ่งตรงนี้สำคัญ เพราะ geoip.location Field ข้อมูลชนิก “geo_point” ซึ่งจำเป็นต่อการนำไปใช้งานเกำหนดตำแหน่งพิกัดบนแผนที่ (เป็น Field ที่สร้างจาก Template พื้นฐานของ Logstash ซึ่งจะไม่กล่าวถึงในบทความนี้)
    output {
     stdout { codec => rubydebug }
     elasticsearch {
       hosts => ["http://your.elastic.host:9200"]
     }
    }

    ในส่วนของ Output จะกำหนดว่า ข้อมูลที่อ่านจาก csv และผ่าน filter ตามที่กล่าวมาข้างต้น จะส่งไปที่ใน จากการกำหนดนี้ บอกว่า จะส่งออกไป

    • stdout คือ การแสดงผลออกมาทาง terminal โดยมีรูปแบบเป็น rubydebug (รูปแบบหนึ่ง)
    • Elasticsearch ซึ่งอยู่ที่ http://your.elastic.host:9200

    จากนั้น Save ไฟล์นี้ แล้วตั้งชื่อว่า gis.conf

    แล้วใช้คำสั่ง

    cat sample1.csv | /usr/share/logstash/bin/logstash -f gis.conf

    การแสดงผลข้อมูลใน Elasticsearch ผ่าน Kibana

    จากบทความก่อนหน้า ได้แสดงวิธีการติดตั้ง Kibana และเชื่อมต่อกับ Elasticsearch แล้ว โดยจะเข้าถึง Kibana ได้ทางเว็บไซต์ http://your.kibana.host:5601

    ในกระบวนการของ Logstash ข้างต้น จะไปสร้าง Elasticsearch Index ชื่อ “logstash-YYYY-MM-DD”, ใน Kibana ก็จะต้องไป คลิกที่ Setting (รูปเฟือง) จากนั้นคลิกที่ Index Pattern โดยให้ไปอ่าน index ซึ่งมีชื่อเป็น Pattern คือ “logstash-*” จากนั้น คลิกปุ่ม Create

    จะได้ผลประมาณนี้

    ต่อไป คลิกที่ Discover ก็จะเห็นข้อมูลเข้ามา

    แสดงข้อมูลในรูปแบบของ Tile Map

    คลิกที่ Visualization > Create a visualization

    เลือก Tile Map

    เลือก Index ที่ต้องการ ในที่นี้คือ logstash-*

    คลิก Geo Coordinates

    จากนั้น คลิก Apply แล้วคลิก Fit Data Bound

    ก็จะได้เฉพาะ พื้นที่ทีมีข้อมุล

    วิธีใส่ Map Server อื่น

    ปัญหาของ Defaul Map Service ที่มากับ Kibana คือ Elastic Map Service นั้น จะจำกัดระดับในการ Zoom จึงต้องหา WMS (Web Map Service) อื่นมาใช้แทน ต้องขอบคุณ คุณนพัส กังวานตระกูล สถานวิจัยสารสนเทศภูมิศาสตร์ทรัพยากรธรรมชาติและสิ่งแวดล้อม ศูนย์ภูมิภาคเทคโนโลยีอวกาศและภูมิสารสนเทศ (ภาคใต้)  สำหรับคำแนะนำในการใช้งาน WMS และระบบ GIS ตลอดมาครับ 🙂

    โดย เราจะใช้ WMS ของ Longdo Map API : http://api.longdo.com/map/doc/
    ข้อมูลการใช้งาน เอามาจาก http://api.longdo.com/map/doc/demo/advance/02-layer.php

    วิธีการตั้งค่าใน Kibana

    คลิกที่ Option > WMS compliant map server
    แล้วกรอกข้อมูล

    URL : https://ms.longdo.com/mapproxy/service
    Layer: bluemarble_terrain
    Version: 1.3.0
    Format: image/png
    Attribute: Longdo API

    จากนั้นคลิก Apply

    จากนั้นให้ Save พร้อมตั้งชื่อ

    ซึ่ง Longdo Map API สามารถ Zoom ได้ละเอียดพอสมควร

    สามารถนำเสนอระบบ GIS ได้บน Website ทันที

    หวังว่าจะเป็นประโยชน์ครับ