Tag: spark

  • การใช้งาน Google Datalab Notebook บน Dataproc เพื่อสร้าง Machine Learning Model เบื้องต้น

    ต่อจาก สร้าง Hadoop และ Spark Cluster เพื่องานด้าน Data Science ด้วย Google Cloud Dataproc + Datalab

    1. จาก Google Cloud Datalab คลิก Notebookแล้ว ตั้งชื่อ Demo01

      เลือได้ว่า จะใช้ Python2 หรือ Python3 ในที่นี้จะเลือก Python3
    2. ตรวจสอบรุ่นของ Spark ที่ใช้งานด้วยคำสั่ง
      spark.version

      แล้วกดปุ่ม Shift+Enter เพื่อ Run

    3. สามารถใช้คำสั่งไปย้ง Shell ซึ่งเป็น Linux ได้ โดยใช้เครื่องหมาย ! นำหน้า
      ในที่นี้ จะ Download iris dataset จาก https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data มาไว้ในเครื่อง mycluster-m ด้วย คำสั่ง

      ! wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

      แล้ว เอาไปใส่ใน HDFS ด้วยคำสั่ง

      ! hdfs dfs -put iris.data /

      จะได้ผลประมาณนี้

    4. จาก Machine Learning #01 – Python with iris dataset ซึ่งเดิมใช้ sklearn จะเปลี่ยนเป็น Spark MLlib เพื่อใช้ความสามารถของ Spark Cluster ได้ เริ่มต้นจาก Import Library ที่จำเป็นดังนี้
      # Import Libaries
      from pyspark.ml import Pipeline
      from pyspark.ml.evaluation import MulticlassClassificationEvaluator
      from pyspark.ml.classification import LogisticRegression
      from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
      from pyspark.ml.feature import VectorAssembler
      from pyspark.sql.types import *
    5. จากนั้น สร้าง Spark Dataframe (Concept จะคล้ายกับ Pandas แต่มีรายละเอียดที่มากกว่า)
      # get into DataFrame
      csvFile = spark.read.csv('/iris.data', inferSchema=True)
      diz = {"Iris-setosa":"1", "Iris-versicolor":"2", "Iris-virginica":"3" }
      df = csvFile.na.replace(diz,1,"_c4")
      df2 = df.withColumn("label",df["_c4"].cast(IntegerType())) \
      .withColumnRenamed("_c0","sepal_length") \
      .withColumnRenamed("_c1","sepal_width") \
      .withColumnRenamed("_c2","petal_length") \
      .withColumnRenamed("_c3","petal_width") 
      train,test = df2.randomSplit([0.75,0.25])

      เริ่มจาก ให้ spark session (spark) อ่านไฟล์ CSV จาก HDFS /iris.data โดยระบุว่า ให้กำหนด Data Type อัตโนมัติ (inforSchema=True) และไฟล์นี้ไม่มี Header

      Dataset นี้ ประกอบด้วย 5 columns เมื่อ Spark อ่านข้อมูลเข้ามา จะตั้งชื่อ column เป็น _c0, _c1, _c2, _c3, _c4 โดย _c4 จะเป็น label ของชนิดของดอก iris ซึ่งกำหนดเป็น String => Iris-setosa, Iris-vesicolor, Iris-virginica ในการใช้งาน Logistic Regression ขั้นตอนต่อไป ไม่สามารถนำเข้าข้อมูลชนิด String เพื่อไปใช้งานได้ จึงต้องทำการเปลี่ยน จาก “Iris-setosa” เป็น “1” แล้วทำการเปลี่ยน “1” ซึ่งเป็น String ให้เป็น Integer ด้วย ฟังก์ชั่น cast และตั้งชื่อว่า column ว่า “label”

      จากนั้น ทำการเปลี่ยนชื่อ column _c0, _c1, _c2, _c3 เป็นชื่อตามต้องการ

      สุดท้าย ใช้ randomSplit([0.75, 0.25]) เพื่อแบ่งข้อมูลสำหรับ train 75% และ test 25%

    6. ลอง แสดง Schema ดู
      df2.printSchema()

      ได้ผลดังนี้


      และใช้คำสั่งนี้ เพื่อดูข้อมูล

      df2.show()

      ได้ผลประมาณนี้

    7. ใน Spark 2.x จะมี Concept ของการใช้ Pipeline เพื่อให้สามารถออกแบบการทดลอง ปรับค่า Meta Parameter ต่าง ๆ ของโมเดล และทำงานอย่างเป็นระบบยิ่งขึ้น (ในขั้นตอนนี้ ขอไม่ปรับค่าใด ๆ ก่อน)
      # Model
      assembler = VectorAssembler(
      inputCols=["sepal_length","sepal_width","petal_length","petal_width"],
      outputCol="features")
      lr = LogisticRegression()
      paramGrid = ParamGridBuilder().build()
      
      #Pipeline
      pipeline = Pipeline(stages=[assembler, lr])

      ในการใช้งาน Logistic Regression ต้องกำหนดค่า field คือ features โดยกำหนดให้มาจาก Column sepal_length, sepal_width, petal_length, petal_width ส่วน label ได้กำหนดในขั้นก่อนหน้าแล้ว

      จากนั้นสร้าง lr เป็น instant ของ LogisticRegression

      ในการปรับค่า Parameter จะมาใส่ใน ParamGridBuilder ซึ่งจะไม่กล่าวถึงในขั้นนี้

      สุดท้าย นำ assembler และ lr มาเข้าสู่ stage วิธีการนี้ทำให้การทำซ้ำขั้นตอนต่าง ๆ ใน Pipeline สะดวกยิ่งขึ้น (ต้องเห็นกระบวนการที่ซับซ้อนกว่านี้ จึงจะเห็นประโยชน์)

    8.  ขั้นตอนสำคัญ  pipeline มาแล้ว ก็ต้องนำมาสร้าง model โดยการ Train ด้วยชุดข้อมูล “train”
      model = pipeline.fit(train)
      predictions = model.transform(train)

      แล้ว นำ model ที่ได้ มาทดลอง predictions ด้วย transform() บนข้อมูล train ผลที่ได้ คือ ผลการ Predict จาก Model

    9. ต่อไป คือ การตรวจสอบว่า Model ที่สร้างขึ้น มีความแม่นยำแค่ไหน ในที่นี้ จะใช้ MulticlassClassificationEvaluator เพราะ label มีมากว่า 2 ชนิด
      evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

      แล้วนำ เปรียบเทียบว่า สิ่งที่ predict ได้จาก model

      evaluator.evaluate(predictions)

      ถูกต้องมากน้อยขนาดไหน กับข้อมูล test

      evaluator.evaluate(model.transform(test))
    10. ผลที่ได้ ประมาณนี้
      โดยจะเห็นได้ว่า มีความถูกต้อง 0.9521 … หรือ 95.21% นั่นเอง

     

  • สร้าง Hadoop และ Spark Cluster เพื่องานด้าน Data Science ด้วย Google Cloud Dataproc + Datalab

    จาก Ambari #01: ติดตั้ง Ambari Server , Ambari #02 ติดตั้ง Ambari Agent , Ambari #04 การสร้าง Hadoop ด้วย Ambari บน AWS และ GCP #01 วิธีการสร้าง Virtual Machine บน Google Cloud Platform จะเห็นได้ว่า ก็ยังมีความยุ่งยากอยู่ อีกทั้ง หากต้องการใช้ PySpark ก็ต้องตามติดตั้ง Python Packages ต้องปรับค่ามากมาย และหากต้องการขยายระบบ ก็มีงานต้องทำอีกเยอะ

    ในบทความนี้ จะแนะนำอีกวิธีหนึ่ง คือ การใช้งาน Google Cloud Dataproc ซึ่งจะทำให้เราได้ใช้ Hadoop + Spark Cluster ซึ่งได้รับการทดสอบเป็นอย่างดี อีกทั้งยังสามารถเลือกใช้ Spark รุ่นต่างๆได้อย่างง่ายได้ ทำให้สามารถโฟกัสไปยัง Data และ กระบวนทำ Machine Learning ได้เต็มที่

    1. ไปที่ Google Cloud Console เพื่อเลือก Project ที่จะทำงานด้วย และเปิดช้งาน Cloud Dataproc และ Compute Engine APIs และ ในที่นี้ จะมี project-id คือ kx-dataproc-01 (สามารถสร้างในชื่อที่ต้องการเองได้)
      https://console.cloud.google.com/
    2. เปิดใช้งาน Google Cloud Dataproc
      https://console.cloud.google.com/dataproc/
    3. เปิด GCLOUD COMMAND
    4. ในที่นี้ จะสร้าง Cluster ชื่อ mycluster ใน project-id ชื่อ kx-dataproc-01 แล้วให้ copy คำสั่งต่อไปนี้ลงไปใน gcloud command แล้วกดปุ่ม Enter
      gcloud dataproc clusters create mycluster --project kx-dataproc-01 --initialization-actions gs://dataproc-initialization-actions/datalab/datalab.sh
    5. ใช้เวลาประมาณ 5 นาที ก็จะได้ Hadoop + Spark Cluster ที่มี 1 Master และ 2 Workers
      ซึ่ง Master จะชื่อว่า mycluster-m
      และ Workers จะชื่อ mycluster-w-0 และ mycluster-w-1
    6. ต่อไป ทำ SSH Tunnel จาก Master คือ mycluster-m Port 8080 ออกมา โดยพิมพ์คำสั่งต่อไปนี้
      gcloud compute ssh mycluster-m --project kx-dataproc-01 --zone=asia-southeast1-a -- -4 -N -L 8080:mycluster-m:8080

      โดย
      –project ไว้สำหรับระบุชื่อ project-id
      –zone ไว้ระบุ Zone ที่ Cluster อยู่
      — ไว้เป็นตัวคั่น (separator) ว่าหลังจากนี้เป็นคำสั่งของ ssh
      -4 บอกว่า ติดต่อด้วย IPv4
      -N บอกว่า ไม่ต้องเปิด Shell ของเครื่อง Master
      -L บอกว่า จะ Forward Port 8080 ไปยังเครื่อง mycluster-m ที่ port 8080

    7. จากนั้น เปิด Web Preview on port 8080
    8. ก็จะได้ Google Cloud Datalab ซึ่งติดต่อกับ Hadoop+Spark ที่อยู่บน Google Cloud Dataproc ได้แล้ว

    Reference:

    https://cloud.google.com/dataproc/docs/tutorials/dataproc-datalab

  • Ambari #08 ปรับแต่ง pyspark ให้สามารถใช้งาน spark.ml ได้ ด้วย conda package management

    เราสามารถใช้งาน Spark ในด้าน Machine Learning ด้วย pyspark แต่ปัญหาอยู่ที่ว่า python ที่ติดตั้งบน Ubuntu 14.04 นั้น ไม่มี package ที่จำเป็นต้องใช้ ได้แก่ numpy, scipy, scikit-learn, matplotlib ซึ่งขั้นตอนการติดตั้ง ก็จะยุ่งยาก เพราะต้อง compile code เองด้วย

    แต่ปัจจุบัน มีเครื่องมือที่เรียกว่า “conda” ทำหน้าที่ติดตั้ง package ที่ต้องการได้สะดวก ในที่นี้ จะเลือกใช้ python 2.7 และ จะติดตั้งลงไปใน /opt/conda

    ขั้นตอนการติดตั้ง conda

    1. ไปเลือก setup script จาก https://conda.io/miniconda.html
    2. ในการนี้ ขอให้ทำในสิทธิ์ของ root
      sudo su
      cd
    3. Download script
      wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
    4. จากนั้น ใช้คำสั่งต่อไปนี้ เพื่อติดตั้ง conda ลงไปใน /opt/conda และ เลือกใช้ค่า default
      bash Miniconda2-latest-Linux-x86_64.sh -p /opt/conda -b
    5. ติดตั้ง scikit-learn package ซึ่งจะติดตั้ง package อื่นๆที่จำเป็นสำหรับ spark.ml เข้ามาด้วย
      /opt/conda/bin/conda install scikit-learn -y
    6. ทำขั้นตอน 3-6 กับ “ทุกๆ node” ใน Hadoop Cluster

    ต่อไปตั้งค่า Zeppelin ให้สามารถใช้งาน conda แทน python เดิม

    1. เปิด Zeppelin ขึ้นมา
    2. คลิก Interpreter > ค้นหา spark

      แล้วคลิก edit
    3. จากนั้น หาเลื่อนหาค่า pyspark.python แล้วแก้ไขเป็น /opt/conda/bin/python แล้วคลิก save
    4. จากนั้นก็จะสามารถใช้งาน spark.ml ได้แล้ว
  • Spark #04 – Pyspark connect to MySQL

    ในบทความนี้ จะกล่าวถึง การดึงข้อมูลจาก MySQL ผ่าน JDBC เพื่อนำมาใช้งานใน Spark ด้วยภาษา Python ซึ่งจะใช้ Library Pyspark

    ในขั้นตอนนี้ขอกล่าวเฉพาะวิธีการก่อน (รายละเอียดจะตามมาทีหลัง)

    1. สร้าง SparkSession ตั้งชื่อว่า myspark
      from pyspark.sql import SparkSession
      myspark = SparkSession \
       .builder \
       .appName("Python Spark SQL basic example") \
       .config("spark.some.config.option", "some-value") \
       .getOrCreate()
    2. ติดต่อ MySQL และสร้าง View ชื่อ myuser
      myuser=myspark.read.jdbc(url="jdbc:mysql://mysql/mysql",table="user", properties={
       'user': 'user1', 'password': '123456'}
       )
      myuser.createOrReplaceTempView(name="myuser")
    3. จากนั้นก็จะสามารถ Query ข้อมูลที่เก็บไว้มาใช้งานใน Spark ได้
      myspark.sql(sqlQuery="select user,host from myuser where user='user1'").show()

    ซึ่งต่อจากนี้ จะสามารถใช้ความสามารถของ Spark ซึ่งทำงานด้าน Distributed Computing ได้ดี มาปรับปรุงความเร็วในการ Query ที่ซับซ้อน เช่นการ JOIN ได้ โดยจะกล่าวในบทความต่อๆไป

  • Spark #03: Query Apache Access Log with Spark SQL

    ต่อจาก

    บทความนี้ จะกล่าวถึงการนำเข้าไฟล์ Apache Access Log เข้าไปเก็บไว้ใน Hadoop HDFS แล้ว ให้ Apache Spark Cluster เข้าไปค้นหาข้อมูล โดยใช้ภาษา SQL ผ่าน Spark SQL API

    นำ Apache Access Log เข้า HDFS

    1. ให้ Copy Apache Access Log ที่มีอยู่มาเก็บไว้ในเครื่องที่สามารถติดต่อ Hadoop HDFS ได้ (ในที่นี้ ชื่อไฟล์เป็น apache.access.log)
    2. ใช้คำสั่งต่อไป
      (แทน /test/ ด้วย Path ที่สร้างไว้ใน HDFS)

      hdfs dfs -copyFromLocal apache.access.log /test/
    3. เมื่อไปดูผ่าน Web UI ของ Hadoop HDFS ก็จะเห็นไฟล์อยู่ดังนี้

    วิธี Query ข้อมูลจาก Zeppelin ไปยัง Spark Cluster

    1. เปิด Zeppelin Web UI แล้วสร้าง Note ใหม่ โดยคลิกที่ Create new node
      แล้วใส่ชื่อ Note เช่น Query Apache Access Log
      ตั้ง Default Interpreter เป็น Spark
      แล้วคลิก Create Note
    2. ใส่ Code ต่อไปนี้ลงไป
    3. ด้านขวามือบน จะมีรูปเฟือง ให้คลิก แล้วเลือก Insert New
    4. แล้วใส่ข้อความนี้ลงไป
    5. จากนั้นคลิก Run all paragraphs
    6. ผลที่ได้

    ตอนต่อไปจะมาอธิบายวิธีการเขียนคำสั่งครับ

  • Spark #02: Cluster Installation

    ต่อจาก Spark #01: Standalone Installation

    Apache Spark ทำงานแบบ Master – Slave โดย Spark Cluster Component ดังภาพ


    ภาพจาก http://spark.apache.org/docs/latest/img/cluster-overview.png

    การใช้งาน Apache Spark จะใช้ผ่านการเขียนโปรแกรมด้วยภาษา Scala, Java, Python หรือ R แล้วสั่งการผ่าน “Driver” ซึ่งจะทำการส่งการไปยัง “Worker” เพื่อให้ Execute ตามที่ต้องการ การสร้าง Cluster จะมี Cluster Manager เป็น Standalone, Apache Mesos และ Hadoop YARN [1]

    ในบทความนี้ จะกล่าวถึงเฉพาะ การติดตั้ง Apache Spark Cluster แบบ Standalone คือใช้ Apache Spark เองเป็น Cluster Manager

    1. ติดตั้ง Ubuntu 16.04 อีกเครื่องหนึ่ง แล้วติดตั้งตามขึ้นตอนที่กล่าวใน Spark #01: Standalone Installation ข้อ 1-2 เท่านั้น (ไม่ต้อง Start Master ขึ้นมา)
    2. ตอนนี้จะมีเครื่อง Master และ เครื่อง Slave ซึ่งแนะนำให้ทำ Password-less SSH จากเครื่อง Master ไปยัง Slave เพื่อสะดวกต่อการใช้งาน
    3. ที่เครื่อง Master ใช้คำสั่งต่อไปนี้ เพื่อสร้างไฟล์ spark-env.sh ซึ่งเป็นตัวกำหนดการทำงานต่างๆของ Spark Cluster โดยในที่นี้ จะ SPARK_MASTER_HOST เป็น IP ของเครื่อง Master (แทนที่ 192.168.XXX.YYY ด้วย IP ของ Master )
      cp conf/spark-env.sh.template conf/spark-env.sh
      
      echo "SPARK_MASTER_HOST=192.168.XXX.YYY" >> conf/spark-env.sh
    4. ที่เครื่อง Master ใช้คำสั่งต่อไปนี้ เพื่อสร้างไฟล์ slaves ซึ่งจะกำหนดว่า เครื่องใดบ้างจะเป็น Slave ของ Cluster นี้ (หากมี Slave หลายเครื่อง ก็ใส่ IP ลงไปในไฟล์ conf/slaves ให้หมด)
      cp conf/slaves.template conf/slaves
      
      echo "192.168.XXX.ZZZ" >> conf/slaves
    5. ที่เครื่อง Master ใช้คำสั่งต่อไปนี้ เพื่อเชื่อมต่อ Cluster
      sbin/start-all.sh

      หมายเหตุ: หากไม่ได้ทำ Password-less SSH ก็จะต้องใส่ Password ทีละเครื่องจนเสร็จ

    6. เมื่อเสร็จเรียบร้อย ก็จะสามารถดูสถานะได้ที่ http://192.168.XXX.YYY:8080 ดังภาพ

    ประมาณนี้

    บทความต่อไป จะลงรายละเอียดเกี่ยวกับการเขียน Program เพื่อทำงานบน Spark Cluster

    Reference:

    1. http://spark.apache.org/docs/latest/cluster-overview.html

     

  • Spark #01: Standalone Installation

    Apache Spark : เป็นระบบ Data Processing ในระดับ Large-Scale ซึ่งทำงานได้เร็วกว่า Apache Hadoop MapReduce 100 เท่าบน Memory และ 10 เท่าบน Disk สามารถพัฒนาโปรแกรมเพื่อทำงานบน Spark ได้หลายภาษา ทั้ง Java, Scala, Python และ R อีกทั้งยังมี Library ทำงานกับ SQL, Machine Learning และ Graph Parallel Computation

    ในบทความนี้ จะกล่าวถึงเฉพาะวิธีการติดตั้ง Apache Spark เบื้องต้น บน Ubuntu 16.04 จำนวน 1 Machine ก่อน

    1. ไปที่ http://spark.apache.org/downloads.html
      เลือก Release, Package ที่ต้องการ แล้วเลือก Copy Link จาก Download Link มาได้เลย
    2. ที่ Ubuntu Server ใช้คำสั่งต่อไปนี้ ติดตั้ง Java และ Download Spark มาไว้บนเครื่อง
      sudo apt-get install default-jre openjdk-8-jdk-headless
      cat <<EOF >> .bashrc 
      export SPARK_HOME=/home/mama/spark
      export PATH=\$PATH:\$SPARK_HOME/bin
      EOF
      wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
      tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz ; ln -s spark-2.1.0-bin-hadoop2.7 spark
      cd spark
    3. แล้วทำการ Start Spark Master Server ด้วยคำสั่ง
      sbin/start-master.sh
    4. จากนั้น สามารถเรียกดู Web UI ได้ที่ port 8080 (Default) และต่อไป เครื่อง Worker หรือ เครื่องที่จะมาเข้า Cluster จะติดต่อเครื่องนี้ผ่าน port 7077 (Default)
    5. สามารถใช้งาน Spark Shell ซึ่ง จะเป็นภาษา Scala แบบ Interactive ด้วยคำสั่ง
      bin/spark-shell
    6. สามารถดู Jobs ที่ทำงานได้ผ่านทาง Web UI ที่ port 4040 (Default)

    ประมาณนี้ก่อน ในบทความต่อไปจะเป็นการสร้าง Spark Cluster

    Reference:

    1. http://spark.apache.org/