การใช้งาน Google Datalab Notebook บน Dataproc เพื่อสร้าง Machine Learning Model เบื้องต้น

ต่อจาก สร้าง Hadoop และ Spark Cluster เพื่องานด้าน Data Science ด้วย Google Cloud Dataproc + Datalab

  1. จาก Google Cloud Datalab คลิก Notebookแล้ว ตั้งชื่อ Demo01

    เลือได้ว่า จะใช้ Python2 หรือ Python3 ในที่นี้จะเลือก Python3
  2. ตรวจสอบรุ่นของ Spark ที่ใช้งานด้วยคำสั่ง
    spark.version

    แล้วกดปุ่ม Shift+Enter เพื่อ Run

  3. สามารถใช้คำสั่งไปย้ง Shell ซึ่งเป็น Linux ได้ โดยใช้เครื่องหมาย ! นำหน้า
    ในที่นี้ จะ Download iris dataset จาก https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data มาไว้ในเครื่อง mycluster-m ด้วย คำสั่ง

    ! wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data

    แล้ว เอาไปใส่ใน HDFS ด้วยคำสั่ง

    ! hdfs dfs -put iris.data /

    จะได้ผลประมาณนี้

  4. จาก Machine Learning #01 – Python with iris dataset ซึ่งเดิมใช้ sklearn จะเปลี่ยนเป็น Spark MLlib เพื่อใช้ความสามารถของ Spark Cluster ได้ เริ่มต้นจาก Import Library ที่จำเป็นดังนี้
    # Import Libaries
    from pyspark.ml import Pipeline
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.feature import VectorAssembler
    from pyspark.sql.types import *
  5. จากนั้น สร้าง Spark Dataframe (Concept จะคล้ายกับ Pandas แต่มีรายละเอียดที่มากกว่า)
    # get into DataFrame
    csvFile = spark.read.csv('/iris.data', inferSchema=True)
    diz = {"Iris-setosa":"1", "Iris-versicolor":"2", "Iris-virginica":"3" }
    df = csvFile.na.replace(diz,1,"_c4")
    df2 = df.withColumn("label",df["_c4"].cast(IntegerType())) \
    .withColumnRenamed("_c0","sepal_length") \
    .withColumnRenamed("_c1","sepal_width") \
    .withColumnRenamed("_c2","petal_length") \
    .withColumnRenamed("_c3","petal_width") 
    train,test = df2.randomSplit([0.75,0.25])

    เริ่มจาก ให้ spark session (spark) อ่านไฟล์ CSV จาก HDFS /iris.data โดยระบุว่า ให้กำหนด Data Type อัตโนมัติ (inforSchema=True) และไฟล์นี้ไม่มี Header

    Dataset นี้ ประกอบด้วย 5 columns เมื่อ Spark อ่านข้อมูลเข้ามา จะตั้งชื่อ column เป็น _c0, _c1, _c2, _c3, _c4 โดย _c4 จะเป็น label ของชนิดของดอก iris ซึ่งกำหนดเป็น String => Iris-setosa, Iris-vesicolor, Iris-virginica ในการใช้งาน Logistic Regression ขั้นตอนต่อไป ไม่สามารถนำเข้าข้อมูลชนิด String เพื่อไปใช้งานได้ จึงต้องทำการเปลี่ยน จาก “Iris-setosa” เป็น “1” แล้วทำการเปลี่ยน “1” ซึ่งเป็น String ให้เป็น Integer ด้วย ฟังก์ชั่น cast และตั้งชื่อว่า column ว่า “label”

    จากนั้น ทำการเปลี่ยนชื่อ column _c0, _c1, _c2, _c3 เป็นชื่อตามต้องการ

    สุดท้าย ใช้ randomSplit([0.75, 0.25]) เพื่อแบ่งข้อมูลสำหรับ train 75% และ test 25%

  6. ลอง แสดง Schema ดู
    df2.printSchema()

    ได้ผลดังนี้


    และใช้คำสั่งนี้ เพื่อดูข้อมูล

    df2.show()

    ได้ผลประมาณนี้

  7. ใน Spark 2.x จะมี Concept ของการใช้ Pipeline เพื่อให้สามารถออกแบบการทดลอง ปรับค่า Meta Parameter ต่าง ๆ ของโมเดล และทำงานอย่างเป็นระบบยิ่งขึ้น (ในขั้นตอนนี้ ขอไม่ปรับค่าใด ๆ ก่อน)
    # Model
    assembler = VectorAssembler(
    inputCols=["sepal_length","sepal_width","petal_length","petal_width"],
    outputCol="features")
    lr = LogisticRegression()
    paramGrid = ParamGridBuilder().build()
    
    #Pipeline
    pipeline = Pipeline(stages=[assembler, lr])

    ในการใช้งาน Logistic Regression ต้องกำหนดค่า field คือ features โดยกำหนดให้มาจาก Column sepal_length, sepal_width, petal_length, petal_width ส่วน label ได้กำหนดในขั้นก่อนหน้าแล้ว

    จากนั้นสร้าง lr เป็น instant ของ LogisticRegression

    ในการปรับค่า Parameter จะมาใส่ใน ParamGridBuilder ซึ่งจะไม่กล่าวถึงในขั้นนี้

    สุดท้าย นำ assembler และ lr มาเข้าสู่ stage วิธีการนี้ทำให้การทำซ้ำขั้นตอนต่าง ๆ ใน Pipeline สะดวกยิ่งขึ้น (ต้องเห็นกระบวนการที่ซับซ้อนกว่านี้ จึงจะเห็นประโยชน์)

  8.  ขั้นตอนสำคัญ  pipeline มาแล้ว ก็ต้องนำมาสร้าง model โดยการ Train ด้วยชุดข้อมูล “train”
    model = pipeline.fit(train)
    predictions = model.transform(train)

    แล้ว นำ model ที่ได้ มาทดลอง predictions ด้วย transform() บนข้อมูล train ผลที่ได้ คือ ผลการ Predict จาก Model

  9. ต่อไป คือ การตรวจสอบว่า Model ที่สร้างขึ้น มีความแม่นยำแค่ไหน ในที่นี้ จะใช้ MulticlassClassificationEvaluator เพราะ label มีมากว่า 2 ชนิด
    evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

    แล้วนำ เปรียบเทียบว่า สิ่งที่ predict ได้จาก model

    evaluator.evaluate(predictions)

    ถูกต้องมากน้อยขนาดไหน กับข้อมูล test

    evaluator.evaluate(model.transform(test))
  10. ผลที่ได้ ประมาณนี้
    โดยจะเห็นได้ว่า มีความถูกต้อง 0.9521 … หรือ 95.21% นั่นเอง