Tag: pyspark

  • การใช้งาน Google Datalab Notebook บน Dataproc เพื่อสร้าง Machine Learning Model เบื้องต้น

    ต่อจาก สร้าง Hadoop และ Spark Cluster เพื่องานด้าน Data Science ด้วย Google Cloud Dataproc + Datalab

    1. จาก Google Cloud Datalab คลิก Notebookแล้ว ตั้งชื่อ Demo01

      เลือได้ว่า จะใช้ Python2 หรือ Python3 ในที่นี้จะเลือก Python3
    2. ตรวจสอบรุ่นของ Spark ที่ใช้งานด้วยคำสั่ง
      spark.version

      แล้วกดปุ่ม Shift+Enter เพื่อ Run

    3. สามารถใช้คำสั่งไปย้ง Shell ซึ่งเป็น Linux ได้ โดยใช้เครื่องหมาย ! นำหน้า
      ในที่นี้ จะ Download iris dataset จาก https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data มาไว้ในเครื่อง mycluster-m ด้วย คำสั่ง

      ! wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

      แล้ว เอาไปใส่ใน HDFS ด้วยคำสั่ง

      ! hdfs dfs -put iris.data /

      จะได้ผลประมาณนี้

    4. จาก Machine Learning #01 – Python with iris dataset ซึ่งเดิมใช้ sklearn จะเปลี่ยนเป็น Spark MLlib เพื่อใช้ความสามารถของ Spark Cluster ได้ เริ่มต้นจาก Import Library ที่จำเป็นดังนี้
      # Import Libaries
      from pyspark.ml import Pipeline
      from pyspark.ml.evaluation import MulticlassClassificationEvaluator
      from pyspark.ml.classification import LogisticRegression
      from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
      from pyspark.ml.feature import VectorAssembler
      from pyspark.sql.types import *
    5. จากนั้น สร้าง Spark Dataframe (Concept จะคล้ายกับ Pandas แต่มีรายละเอียดที่มากกว่า)
      # get into DataFrame
      csvFile = spark.read.csv('/iris.data', inferSchema=True)
      diz = {"Iris-setosa":"1", "Iris-versicolor":"2", "Iris-virginica":"3" }
      df = csvFile.na.replace(diz,1,"_c4")
      df2 = df.withColumn("label",df["_c4"].cast(IntegerType())) \
      .withColumnRenamed("_c0","sepal_length") \
      .withColumnRenamed("_c1","sepal_width") \
      .withColumnRenamed("_c2","petal_length") \
      .withColumnRenamed("_c3","petal_width") 
      train,test = df2.randomSplit([0.75,0.25])

      เริ่มจาก ให้ spark session (spark) อ่านไฟล์ CSV จาก HDFS /iris.data โดยระบุว่า ให้กำหนด Data Type อัตโนมัติ (inforSchema=True) และไฟล์นี้ไม่มี Header

      Dataset นี้ ประกอบด้วย 5 columns เมื่อ Spark อ่านข้อมูลเข้ามา จะตั้งชื่อ column เป็น _c0, _c1, _c2, _c3, _c4 โดย _c4 จะเป็น label ของชนิดของดอก iris ซึ่งกำหนดเป็น String => Iris-setosa, Iris-vesicolor, Iris-virginica ในการใช้งาน Logistic Regression ขั้นตอนต่อไป ไม่สามารถนำเข้าข้อมูลชนิด String เพื่อไปใช้งานได้ จึงต้องทำการเปลี่ยน จาก “Iris-setosa” เป็น “1” แล้วทำการเปลี่ยน “1” ซึ่งเป็น String ให้เป็น Integer ด้วย ฟังก์ชั่น cast และตั้งชื่อว่า column ว่า “label”

      จากนั้น ทำการเปลี่ยนชื่อ column _c0, _c1, _c2, _c3 เป็นชื่อตามต้องการ

      สุดท้าย ใช้ randomSplit([0.75, 0.25]) เพื่อแบ่งข้อมูลสำหรับ train 75% และ test 25%

    6. ลอง แสดง Schema ดู
      df2.printSchema()

      ได้ผลดังนี้


      และใช้คำสั่งนี้ เพื่อดูข้อมูล

      df2.show()

      ได้ผลประมาณนี้

    7. ใน Spark 2.x จะมี Concept ของการใช้ Pipeline เพื่อให้สามารถออกแบบการทดลอง ปรับค่า Meta Parameter ต่าง ๆ ของโมเดล และทำงานอย่างเป็นระบบยิ่งขึ้น (ในขั้นตอนนี้ ขอไม่ปรับค่าใด ๆ ก่อน)
      # Model
      assembler = VectorAssembler(
      inputCols=["sepal_length","sepal_width","petal_length","petal_width"],
      outputCol="features")
      lr = LogisticRegression()
      paramGrid = ParamGridBuilder().build()
      
      #Pipeline
      pipeline = Pipeline(stages=[assembler, lr])

      ในการใช้งาน Logistic Regression ต้องกำหนดค่า field คือ features โดยกำหนดให้มาจาก Column sepal_length, sepal_width, petal_length, petal_width ส่วน label ได้กำหนดในขั้นก่อนหน้าแล้ว

      จากนั้นสร้าง lr เป็น instant ของ LogisticRegression

      ในการปรับค่า Parameter จะมาใส่ใน ParamGridBuilder ซึ่งจะไม่กล่าวถึงในขั้นนี้

      สุดท้าย นำ assembler และ lr มาเข้าสู่ stage วิธีการนี้ทำให้การทำซ้ำขั้นตอนต่าง ๆ ใน Pipeline สะดวกยิ่งขึ้น (ต้องเห็นกระบวนการที่ซับซ้อนกว่านี้ จึงจะเห็นประโยชน์)

    8.  ขั้นตอนสำคัญ  pipeline มาแล้ว ก็ต้องนำมาสร้าง model โดยการ Train ด้วยชุดข้อมูล “train”
      model = pipeline.fit(train)
      predictions = model.transform(train)

      แล้ว นำ model ที่ได้ มาทดลอง predictions ด้วย transform() บนข้อมูล train ผลที่ได้ คือ ผลการ Predict จาก Model

    9. ต่อไป คือ การตรวจสอบว่า Model ที่สร้างขึ้น มีความแม่นยำแค่ไหน ในที่นี้ จะใช้ MulticlassClassificationEvaluator เพราะ label มีมากว่า 2 ชนิด
      evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

      แล้วนำ เปรียบเทียบว่า สิ่งที่ predict ได้จาก model

      evaluator.evaluate(predictions)

      ถูกต้องมากน้อยขนาดไหน กับข้อมูล test

      evaluator.evaluate(model.transform(test))
    10. ผลที่ได้ ประมาณนี้
      โดยจะเห็นได้ว่า มีความถูกต้อง 0.9521 … หรือ 95.21% นั่นเอง

     

  • สร้าง Hadoop และ Spark Cluster เพื่องานด้าน Data Science ด้วย Google Cloud Dataproc + Datalab

    จาก Ambari #01: ติดตั้ง Ambari Server , Ambari #02 ติดตั้ง Ambari Agent , Ambari #04 การสร้าง Hadoop ด้วย Ambari บน AWS และ GCP #01 วิธีการสร้าง Virtual Machine บน Google Cloud Platform จะเห็นได้ว่า ก็ยังมีความยุ่งยากอยู่ อีกทั้ง หากต้องการใช้ PySpark ก็ต้องตามติดตั้ง Python Packages ต้องปรับค่ามากมาย และหากต้องการขยายระบบ ก็มีงานต้องทำอีกเยอะ

    ในบทความนี้ จะแนะนำอีกวิธีหนึ่ง คือ การใช้งาน Google Cloud Dataproc ซึ่งจะทำให้เราได้ใช้ Hadoop + Spark Cluster ซึ่งได้รับการทดสอบเป็นอย่างดี อีกทั้งยังสามารถเลือกใช้ Spark รุ่นต่างๆได้อย่างง่ายได้ ทำให้สามารถโฟกัสไปยัง Data และ กระบวนทำ Machine Learning ได้เต็มที่

    1. ไปที่ Google Cloud Console เพื่อเลือก Project ที่จะทำงานด้วย และเปิดช้งาน Cloud Dataproc และ Compute Engine APIs และ ในที่นี้ จะมี project-id คือ kx-dataproc-01 (สามารถสร้างในชื่อที่ต้องการเองได้)
      https://console.cloud.google.com/
    2. เปิดใช้งาน Google Cloud Dataproc
      https://console.cloud.google.com/dataproc/
    3. เปิด GCLOUD COMMAND
    4. ในที่นี้ จะสร้าง Cluster ชื่อ mycluster ใน project-id ชื่อ kx-dataproc-01 แล้วให้ copy คำสั่งต่อไปนี้ลงไปใน gcloud command แล้วกดปุ่ม Enter
      gcloud dataproc clusters create mycluster --project kx-dataproc-01 --initialization-actions gs://dataproc-initialization-actions/datalab/datalab.sh
    5. ใช้เวลาประมาณ 5 นาที ก็จะได้ Hadoop + Spark Cluster ที่มี 1 Master และ 2 Workers
      ซึ่ง Master จะชื่อว่า mycluster-m
      และ Workers จะชื่อ mycluster-w-0 และ mycluster-w-1
    6. ต่อไป ทำ SSH Tunnel จาก Master คือ mycluster-m Port 8080 ออกมา โดยพิมพ์คำสั่งต่อไปนี้
      gcloud compute ssh mycluster-m --project kx-dataproc-01 --zone=asia-southeast1-a -- -4 -N -L 8080:mycluster-m:8080

      โดย
      –project ไว้สำหรับระบุชื่อ project-id
      –zone ไว้ระบุ Zone ที่ Cluster อยู่
      — ไว้เป็นตัวคั่น (separator) ว่าหลังจากนี้เป็นคำสั่งของ ssh
      -4 บอกว่า ติดต่อด้วย IPv4
      -N บอกว่า ไม่ต้องเปิด Shell ของเครื่อง Master
      -L บอกว่า จะ Forward Port 8080 ไปยังเครื่อง mycluster-m ที่ port 8080

    7. จากนั้น เปิด Web Preview on port 8080
    8. ก็จะได้ Google Cloud Datalab ซึ่งติดต่อกับ Hadoop+Spark ที่อยู่บน Google Cloud Dataproc ได้แล้ว

    Reference:

    https://cloud.google.com/dataproc/docs/tutorials/dataproc-datalab

  • Ambari #08 ปรับแต่ง pyspark ให้สามารถใช้งาน spark.ml ได้ ด้วย conda package management

    เราสามารถใช้งาน Spark ในด้าน Machine Learning ด้วย pyspark แต่ปัญหาอยู่ที่ว่า python ที่ติดตั้งบน Ubuntu 14.04 นั้น ไม่มี package ที่จำเป็นต้องใช้ ได้แก่ numpy, scipy, scikit-learn, matplotlib ซึ่งขั้นตอนการติดตั้ง ก็จะยุ่งยาก เพราะต้อง compile code เองด้วย

    แต่ปัจจุบัน มีเครื่องมือที่เรียกว่า “conda” ทำหน้าที่ติดตั้ง package ที่ต้องการได้สะดวก ในที่นี้ จะเลือกใช้ python 2.7 และ จะติดตั้งลงไปใน /opt/conda

    ขั้นตอนการติดตั้ง conda

    1. ไปเลือก setup script จาก https://conda.io/miniconda.html
    2. ในการนี้ ขอให้ทำในสิทธิ์ของ root
      sudo su
      cd
    3. Download script
      wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
    4. จากนั้น ใช้คำสั่งต่อไปนี้ เพื่อติดตั้ง conda ลงไปใน /opt/conda และ เลือกใช้ค่า default
      bash Miniconda2-latest-Linux-x86_64.sh -p /opt/conda -b
    5. ติดตั้ง scikit-learn package ซึ่งจะติดตั้ง package อื่นๆที่จำเป็นสำหรับ spark.ml เข้ามาด้วย
      /opt/conda/bin/conda install scikit-learn -y
    6. ทำขั้นตอน 3-6 กับ “ทุกๆ node” ใน Hadoop Cluster

    ต่อไปตั้งค่า Zeppelin ให้สามารถใช้งาน conda แทน python เดิม

    1. เปิด Zeppelin ขึ้นมา
    2. คลิก Interpreter > ค้นหา spark

      แล้วคลิก edit
    3. จากนั้น หาเลื่อนหาค่า pyspark.python แล้วแก้ไขเป็น /opt/conda/bin/python แล้วคลิก save
    4. จากนั้นก็จะสามารถใช้งาน spark.ml ได้แล้ว
  • Spark #04 – Pyspark connect to MySQL

    ในบทความนี้ จะกล่าวถึง การดึงข้อมูลจาก MySQL ผ่าน JDBC เพื่อนำมาใช้งานใน Spark ด้วยภาษา Python ซึ่งจะใช้ Library Pyspark

    ในขั้นตอนนี้ขอกล่าวเฉพาะวิธีการก่อน (รายละเอียดจะตามมาทีหลัง)

    1. สร้าง SparkSession ตั้งชื่อว่า myspark
      from pyspark.sql import SparkSession
      myspark = SparkSession \
       .builder \
       .appName("Python Spark SQL basic example") \
       .config("spark.some.config.option", "some-value") \
       .getOrCreate()
    2. ติดต่อ MySQL และสร้าง View ชื่อ myuser
      myuser=myspark.read.jdbc(url="jdbc:mysql://mysql/mysql",table="user", properties={
       'user': 'user1', 'password': '123456'}
       )
      myuser.createOrReplaceTempView(name="myuser")
    3. จากนั้นก็จะสามารถ Query ข้อมูลที่เก็บไว้มาใช้งานใน Spark ได้
      myspark.sql(sqlQuery="select user,host from myuser where user='user1'").show()

    ซึ่งต่อจากนี้ จะสามารถใช้ความสามารถของ Spark ซึ่งทำงานด้าน Distributed Computing ได้ดี มาปรับปรุงความเร็วในการ Query ที่ซับซ้อน เช่นการ JOIN ได้ โดยจะกล่าวในบทความต่อๆไป