Categories
Computer Data

#14 ดึงข้อมูลจาก Database มาโชว์ใน Dashboard

อันนี้เป็นโปรเจ็คที่ 2 ของ Data Engineer ที่สร้าง Data Pipeline ดึงข้อมูลจาก Database สินค้ากับยอดขายและอัตราแลกเปลี่ยน ตัวบทความอธิบายขั้นตอนการสร้างแบบละเอียด

ต่อมาโปรเจคก่อนหน้าที่ทำ Data Pipeline ที่ดึงข้อมูลไฟล์ Excel จากเว็บไซต์ของกระทรวงอว. (กระทรวงการอุดมศึกษา วิทยาศาสตร์ วิจัยและนวัตกรรม) คราวนี้เรามาทำอีกโปรเจคหนึ่งที่สร้าง Data Pipeline มาดึงข้อมูลจากฐานข้อมูล (Database) เพื่อนำมาทำ Dashboard

แต่ก่อนจะไปทำถึง Data Pipeline เรามาพูดถึง Database กันเสียก่อน

Database

a person standing near data base wooden drawer
รูปโดย Tima Miroshnichenko จากเว็บ Pexels.com

ฐานข้อมูล (Database) คือเป็นพื้นที่เก็บข้อมูลสำหรับ Structured Data หรือ Semi-structured Data ที่ต้องการเข้าถึงข้อมูลอย่างรวดเร็ว ได้แก่ เว็บไซต์ หรือแอปมือถือ หรือระบบต่าง ๆ ที่มีผู้ใช้

โดย Database จัดอยู่ในหมวดหมู่ OLTP (Online Transaction Processing) ที่แตกต่างกับ OLAP (Online Analytical Processing) ที่ Data Warehouse อยู่ในหมวดหมู่นั้น ที่ออกแบบมาเพื่ออ่านข้อมูลเยอะ เพื่อนำไปวิเคราะห์ หรือทำ Data Visualization ที่เหมาะกับผู้ใช้อย่าง Data Analyst และ Data Scientist

Database สามารถแบ่งออกได้สองประเภทใหญ่ ๆ ได้แก่

  • SQL Database (หรือ RDBMS – Relational Database Management System) ที่เป็น Database สำหรับการเก็บข้อมูลลักษณะ Structured Data ที่ดึงข้อมูลได้โดย SQL
  • NoSQL Database เป็น Database อีกประเภทหนึ่งที่เป็น Semi-Structured Data ตัวอย่างฐานข้อมูลลักษณะนี้ได้แก่
    • MongoDB ที่เป็น Document stores ตัวอย่างเว็บที่ใช้ Database นี้คือเว็บ Pantip
    • Redis ที่เป็น Key-value stores
    • Cassandra ที่เป็น Wide column stores ตัวอย่างเว็บที่ใช้ Database นี้คือ FaceBook
    • และ Neo4j ที่เป็น Graph DBMS

ในบทความนี้เราจะพูดถึง Database ที่เป็น SQL (หรือ RDBMS) ที่มี Database ที่ดัง ๆ ก็ได้แก่ MySQL, PostgreSQL, MariaDB, Microsoft SQL Server, และ Oracle Database [1]

แนะนำ Database ประเภท SQL (หรือ RDBMS)

SQL Database (หรือ RDBMS – Relational Database Management System ที่เรียกเป็นภาษาไทยได้ว่าระบบจัดการฐานข้อมูลเชิงสัมพันธ์) เป็น Database สำหรับการเก็บข้อมูลลักษณะ Structured Data ที่ดึงข้อมูลได้โดย SQL ที่คิดค้นโดย E.F. Codd จากบริษัท IBM ช่วงปี 1970s

Database ชนิดนี้ ประกอบไปด้วยตาราง (Table) ที่เราสามารถมีได้หลาย Table โดยวิธีการเก็บข้อมูลมีลักษณะคล้ายกันกับเอกสาร Spreadsheet (เช่น Excel) ที่ช่วยธุรกิจในการเก็บ จัดการ และเชื่อมโยงข้อมูล โดยแต่ละ Table ประกอบไปด้วย

  • คอลัมน์ (Columns) เก็บข้อมูลคุณลักษณะ (Attributes) ร่วมกับระบุถึงชื่อคอลัมน์และชนิดข้อมูล
  • แถว (Rows หรือเรียกอีกอย่างว่า Record) เก็บข้อมูลค่าแต่ละค่าที่จำเพาะต่อชนิดข้อมูลที่กำหนดในแต่ละคอลัมน์
ส่วนประกอบของ Table

นอกจาก Columns และ Rows แล้ว ทุกตารางที่เก็บใน Database ประเภท RDBMS จะมีคอลัมน์หนึ่งที่เรียกว่า Primary Key ที่เป็นคีย์ที่ไม่ซ้ำกัน และไม่เป็นค่าช่องว่าง (Null) เพื่อใช้อ้างอิงในแต่ละแถว โดยเราสามารถนำคอลัมน์ Primary Key นี้ไปสร้างความสัมพันธ์ (Relation) กับตารางอื่น ๆ ผ่านการกำหนดคอลัมน์ที่เรียกว่า Foreign Key เพื่อให้ตารางที่มี Primary Key สามารถเชื่อมข้อมูลกับตารางอื่น และทำให้ข้อมูลใน Database สอดคล้องกัน [2]

Primary Key และ Foreign Key

ตัวอย่างการสร้าง Database ที่ใช้ Database ประเภทนี้ก็ทำได้โดย มีตารางอยู่ 2 ตาราง ได้แก่ ตาราง Customer และ ตาราง Order

ตาราง Customer เก็บข้อมูล ได้แก่

  • Customer ID (primary key)
  • Customer name
  • Billing address
  • Shipping address 

ในตาราง Customer ตัว Primary Key ของตารางนี้คือ Customer ID ที่เป็นค่าที่ระบุถึงลูกค้าใน Database โดยลูกค้าแต่ละท่านจะมีรหัสนี้เหมือนกันไม่ได้

ส่วนตาราง Order เก็บข้อมูลการซื้อ-ขายสินค้าที่ประกอบไปด้วย

  • Order ID (primary key)
  • Customer ID (foreign key)
  • Order date 
  • Shipping date
  • Order status

ตารางนี้เก็บ Primary Key สำหรับรหัสการซื้อ-ขายสินค้า (Order ID) โดยสามารถเชื่อมโยงถึงลูกค้าแต่ละท่านได้โดยการกำหนด Foreign Key เพื่อเชื่อมไปยังตาราง Customer

จากรายละเอียดข้างบนนี้ เราจะพบว่าทั้งสองตารางมีความสัมพันธ์กับโดยใช้ Customer ID ซึ่งหมายความว่าเราสามารถเรียกข้อมูลจากทั้ง 2 ตารางเพื่อสร้างรายการ หรือเพื่อนำข้อมูลไปใช้ประยุกต์กับงานอื่น ๆ ได้

ตัวอย่างเช่น ผู้บริหารร้านค้าสามารถสร้างรายงานถึงลูกค้าทุกคนที่ซื้อสินค้าในวันที่ระบุไว้ หรือสามารถดูข้อมูลลูกค้าที่สั่งสินค้าแล้วได้สินค้าล่าช้ากว่ากำหนดในเดือนที่แล้ว

จากตัวอย่างการสร้างตาราง Customer และตาราง Order นี้เป็นแบบง่าย แต่ในความเป็นจริง Database ซับซ้อนมากกว่านั้น แต่ยังมี Relation ระหว่างตารางเพื่อเชื่อมโยงข้อมูลระหว่างตารางกันได้ ทำให้เราสามารถอ้างอิงได้หลายตารางเลย ตราบใดที่ยังเข้ากันได้กับรูปแบบของ Relation ของ Database ของเราเองที่ได้ระบุไว้

เมื่อตารางถูกจัดเก็บแบบที่มีการระบุ Relation มาก่อนแล้ว เราสามารถเรียกดู (Query) ข้อมูลได้โดยการเขียนโค้ดด้วยภาษา SQL (Structured Query Language) [1]

ภาษา SQL (Structured Query Language)

ภาษา SQL (Structured Query Language) เป็นภาษาสำหรับการดึงข้อมูลจาก Database แบบ Structured Data โดย

  • ตัว Syntax ของภาษาเป็นภาษาอังกฤษที่คนใช้กันทั่วโลก
  • เป็นภาษาที่ใช้ในงานของ Data Analyst, Data Scientist และ Data Engineer
  • ถูกใช้งานใน Database และ Data Warehouse แทบทุกแบรนด์ รวมถึง NoSQL บางตัว

อย่างไรก็ดี ภาษา SQL มีหลายเวอร์ชันขึ้นกับผู้พัฒนาฐานข้อมูลนั้น ๆ ตัวอย่างเช่น Transact-SQL (หรือ T-SQL) ที่ถูกใช้งานในฐานข้อมูล Microsoft SQL (หรือ MSSQL)

ต่อมา นอกจากเรื่องเวอร์ชันของภาษา SQL แล้ว เรามาดูคำสั่งที่ใช้ SQL กันดีกว่า คำสั่งใน SQL สามารถจัดแบ่งเป็นหมวดหมู่ได้ทั้งหมด 4 หมวดหมู่ โดยสรุปได้ตามภาพด้านล่างนี้

Chart สรุปคำสั่ง SQL ตามหมวดหมู่

โดย

  • DDL ย่อมาจาก Data Definition Language
  • DML ย่อมาจาก Data Manipulation Language
  • DCL ย่อมาจาก Data Control Language
  • TCL ย่อมาจาก Transaction Control Language

อย่างไรก็ดี บางเว็บก็จะแยกคำสั่ง SELECT ไปอยู่ในหมวด DQL (Data Query Language) ครับ

เครื่องมือสำหรับการ Database SQL (หรือ RDBMS) บนคลาวด์

ในปัจจุบันข้อมูล Big Data ที่สร้างขึ้นมีขนาดใหญ่ขึ้นมากหลายเท่าตัวเมื่อเทียบกับ 10 ปีก่อน ข้อมูลที่สร้างขึ้นเหล่านี้

  • เติบโตมากขึ้นทุกนาที
  • กระจัดกระจายไปตาม Server ที่ิติดตั้งแบบ On-Premise และบนคลาวด์ ส่งผลให้การนำข้อมูลมาใช้เป็นไปได้ยาก
  • นำไปประมวลผลได้ล่าช้า

จุดนี้ส่งผลให้ธุรกิจเสียหายได้ ตัว Cloud Database ได้รับการออกแบบมาเพื่อแก้ปัญหาเหล่านี้

Cloud Database

Cloud Database เป็น Database ที่วางอยู่บนผู้ให้บริการคลาวด์ (Cloud Provider) ที่ทำหน้าที่เหมือนกันกับ Database แบบ SQL ทั่ว ๆ ไป อย่างไรก็ตาม บริการ Cloud Database จะแตกต่างกับทั่วไปตรงที่บริการนี้เป็นแบบ Managed database-as-a-service (DBaaS) ที่ผู้ให้บริการ Cloud นั้น ๆ จะเป็นคนดูแล [1]

จุดนี้จะผิดกับบริการ Unmanaged ที่ผู้ใช้จะต้องจัดการ และดูแลฐานข้อมูลเอาเองภายในทีมของบริษัท หรือบุคคลนั้น ๆ

ตัวอย่างของเครื่องมือ Cloud Database ได้แก่

  • Amazon RDS (Amazon Relational Database Service)
  • Azure SQL Database
  • Google Cloud SQL

มาถึงจุดนี้ก็คงจะมีคำถามแล้วว่า “เพราะอะไรถึงต้องมาใช้บริการพวกนี้ แล้วมันแตกต่างกันตรงไหน?” เหตุผลของการใช้บริการ Cloud Database ก็ได้แก่

  • ยืดหยุ่น (Flexible) สามารถปรับตั้งค่าสเปคเครื่องได้เลยตามความต้องการ แถมถ้า Database เกิดล่มขึ้นมา เราก็เปิดใช้ High Availability เพื่อให้บริการ Cloud SQL ก็สลับไปใช้ฐานข้อมูลตัวก็อปปี้ที่วางอยู่ในคนละที่แบบอัตโนมัติ [3]
  • เสถียร (Reliable) มีการรับประกันว่าจะไม่ล่มบ่อยโดยใช้ SLA (Service Level Agreement) แถมยังมีบริการสำรองข้อมูล (Backup) อัตโนมัติ
  • ปลอดภัย (Secure) ไม่ต้องจ้างยาม ติดตั้งระบบความปลอดภัยเองแบบ On-Premises
  • ประหยัด (Affordable)
  • และอื่น ๆ

ในบทความนี้จะใช้บริการของ Google Cloud Platform ที่มีชื่อว่า Google Cloud SQL

Google Cloud SQL

Google Cloud SQL

Google Cloud SQL เป็นบริการ SQL Database ที่ให้บริการในรูปแบบ Fully Managed ที่ให้ผู้ใช้สามารถติดตั้ง สร้าง ดูแล จัดการฐานข้อมูลได้สะดวกบน Infrastructure ของ Google Cloud Platform โดยเราไม่จำเป็นต้องมาวุ่นวายพวกนี้เลย เราแค่ไปโฟกัสกับการทำฐานข้อมูลเพื่อสร้าง Applications มาใช้ ตามสโลแกนของเว็บ Google ที่กล่าวไว้ว่า

Focus on your application, and leave the database to us

Cloud SQL for MySQL, PostgreSQL, and SQL Server | Google Cloud

บริการนี้เป็นบริการที่นิยมมาก โดยลูกค้าของ Google Cloud Platform 100 ราย ใช้บริการนี้มากกว่า 95% อีกด้วย แถมยังเครื่องมือสำหรับการจัดการ Database ทั้ง 3 เจ้าดัง ได้แก่ MySQL, PostgreSQL และ Microsoft SQL Server

การย้ายมาใช้บริการ Cloud SQL ก็ทำได้ง่ายมาก เราไม่ต้องเขียนโค้ดอะไรเพิ่มเลย ตัวเครื่องมือ Database ใช้ตัวเดิม (MySQL, PostgreSQL และ Microsoft SQL Server) ได้เลย ไม่ต้องทำอะไรเพิ่มเลย [4]

นอกจากนี้บริการ Cloud SQL ยังมีคุณสมบัติสำหรับการสำรองข้อมูลแบบอัตโนมัติ การทำซ้ำ (replication) และการป้องกันระบบคอมพิวเตอร์ล่มเหลว (failover) คุณยังสามารถรวมเข้ากับบริการอื่น ๆ ของ Google Cloud ได้ด้วย เช่น Google Kubernetes Engine และ BigQuery [5]

โจทย์

โจทย์ที่ได้จะเป็นโจทย์ที่นำมาจากคอร์สของ Road to Data Engineer (หรือ R2DE) ที่เป็นคอร์สที่ผู้อ่านสามารถซื้อได้จากเว็บไซต์ DataTH ผู้สอนของคอร์สเป็นบุคคลที่ทำงานทางด้าน Data Engineer มาก่อนตัวคอร์สสอนตั้งแต่พื้นฐานการเขียนโปรแกรม ไปถึงระดับที่สามารถสมัครงานในตำแหน่ง Junior Data Engineer ได้ ในทุกบทเรียนจะมี Workshop ให้ผู้เรียนได้ลองทำจริง

ตัวโจทย์ที่จะทำนี้ทางฝ่ายผลิตภัณฑ์และการตลาดอยากทราบว่าสินค้าชิ้นไหนขายดี เพื่อหาสินค้าที่ถูกใจลูกค้ามาวางขาย และจัดโปรได้เหมาะสม โดยเก็บข้อมูลไว้ในฐานข้อมูล (Database) และต้องการให้ทีมวิศวกรเตรียมข้อมูลให้ทางฝ่ายวิเคราะห์ข้อมูลเพื่อจะสร้าง Report หรือ Dashboard

ร้านค้า ภาพถ่ายโดย vnwayne fan บน Unsplash

เมื่อทราบตัวโจทย์แล้ว เราก็วางแผนทำ Data Pipeline เลย นิยามและรายละเอียดแบบคร่าว ๆ ของ Data Pipeline ที่เอามาจากบทความก่อนหน้าคือกระบวนการลำเลียงข้อมูลจากแหล่งข้อมูล (Data Source) มายังจุดหมาย (Destination) โดยมีทั้งหมด 4 ขั้นตอน ได้แก่

  • การนำเข้าข้อมูล (Ingestion)
  • การเปลี่ยนแปลงข้อมูล (Transformation)
  • การเก็บข้อมูล (Storage) ที่แบ่งได้เป็น Data Warehouse และ Data Lake
  • และปลายทางคือการวิเคราะห์ หรือนำข้อมูลไปใช้ประโยชน์ (Analysis)

โดยเราสามารถสรุปขั้นตอนได้ตามข้างล่างนี้

ส่วนเครื่องมือทั้งหมดที่ใช้ เราใช้ผ่าน Google Cloud Platform (GCP) ที่เป็นบริการคลาวด์ที่เราเช่าใช้ส่วนหนึ่งของ Data Center ผ่านระบบอินเตอร์เน็ต โดยไม่ต้องติดตั้งคอมพิวเตอร์เองแบบ On-Premises และคิดค่าใช้บริการตามจริง (OpEx หรือ Operational Expenditure) ที่เป็น Pay-as-you-go.

เรื่อง Region ในบทความนี้กำหนดเป็น us-central1 ทั้งหมด

การนำเข้าข้อมูล (Ingestion)

ในขั้นตอนนี้เป็นการนำเข้าข้อมูลจาก Database และ API ตามที่ได้แจ้งในโจทย์ข้างบนนี้ โดยตัว Database ที่ใช้เป็นแบบ SQL ที่แสดงรายการสินค้าที่ขาย และแสดงรายการสั่งซื้อ ส่วน API ที่ดึงข้อมูลมาจะเป็นการดึงข้อมูลอัตราแลกเปลี่ยนระหว่างเงินสกุลดอลลาร์สหรัฐฯ และสกุลเงินบาท

ข้อมูลของตัว Database ที่มีมาให้เป็นข้อมูลที่มีลักษณะเป็น Structured Data ที่เป็นข้อมูลที่มีลักษณะโครงสร้างที่แน่นอน สามารถแสดงผลในรูปแบบตารางได้ ส่วน API ที่ดาวน์โหลดมาเป็นข้อมูล JSON ที่มีลักษณะเป็น Semi-structured Data ที่มีความยืดหยุ่น และปรับโครงสร้างได้ในอนาคต

ขั้นตอนนี้เราจะใช้

  • MySQL ผ่านการใช้บริการ Google Cloud SQL
  • Apache Airflow ที่ใช้งานผ่าน Google Cloud Composer
  • และใช้ Google Cloud Storage เพื่อสร้าง Bucket สำหรับการทำ Data Lake

MySQL เป็นเครื่องมือจัดการ Database ที่เป็น Open Source ที่เดิมถูกพัฒนาโดย MySQL AB ต่อมาถูกซื้อโดย Oracle ที่รองรับคำสั่งภาษา SQL ร่วมกับรองรับภาษาเขียนโปรแกรมได้หลากหลาย ได้แก่ C, C++, Python, Java, PHP เป็นต้น

นอกจากนี้ เครื่องมือนี้ยังได้รับการออกแบบและปรับให้เหมาะสมต่อการพัฒนาโปรแกรม และเว็บ ที่รองรับการทำงานทุกแพลตฟอร์ม ร่วมกับใช้งานได้หลายผู้ใช้ (Multi-user)

สร้าง Instance บน Cloud SQL

การสร้าง Instance สำหรับการรัน MySQL บนบริการ Google Cloud SQL ทำได้ไม่กี่ขั้นตอน

ขั้นตอนแรก เข้าไปที่หน้า Google Cloud ก่อน จากนั้นเข้าไปที่หน้า Console แล้วเลือกไปยัง Cloud SQL เมื่อเลือกเข้าไปที่หน้า Cloud SQL แล้ว จะปรากฏหน้าจอเพื่อให้สร้าง Instance บน Google Cloud SQL จากนั้นกดปุ่ม Create Instance with Your Free Credits

หน้าจอแรกของบริการ Cloud SQL

กดปุ่ม Create Instance แล้วให้เลือกชนิดเครื่องมือ SQL Database ที่ต้องการ โดยมีให้เลือกเป็น MySQL, PostgreSQL และ SQL Server ในตัวอย่างนี้เราเลือก MySQL.

หน้าจอให้เลือก SQLDatabase ที่ต้องการ

เมื่อกดปุ่มเลือก MySQL แล้ว เรามาตั้งค่า Instance ของ Cloud SQL โดยให้

  • กำหนดชื่อ Instance ID
  • กำหนดรหัสผ่านของ user Root
  • เลือก Cloud SQL Edition ที่ต้องการโดยมีให้เลือกเป็น Enterprise Plus กับ Enterprise จุดนี้จะแตกต่างกันเรื่อง SLA, ประสิทธิภาพ, สเปค กับการสำรองข้อมูล และการกู้คืนข้อมูล โดยดูได้ตามภาพด้านล่างนี้
  • เลือก Preset สำหรับสเปคของเซิร์ฟเวอร์ที่ต้องการ โดยมีให้เลือกเป็น Production, Development และ Sandbox โดยแตกต่างกันตามภาพด้านล่างนี้เช่นกัน ในตัวอย่างให้เลือกที่ Sandbox
  • เลือก Region และ Zone โดยในตัวอย่างนี้เลือก us-central1 และเลือก Zone เป็น Single Zone
หน้าจอให้เลือก Cloud SQL Edition
หน้าจอให้เลือก Preset เลือกสเปคของเซิร์ฟเวอร์ตามที่ต้องการ

เมื่อตั้งค่าเสร็จเรียบร้อย กดปุ่ม Create Instance เพื่อสร้าง Instance ของ Cloud SQL ครับ ขั้นตอนนี้จะใช้ระยะเวลานานหน่อยครับ ก็ออกไปทำธุระอย่างอื่นก่อนได้เลย

เมื่อสร้าง Instance ของ Google Cloud SQL สำเร็จแล้ว หน้าจอจะแสดงตามด้านล่างนี้

หน้าจอแสดง Instance ของ Google Cloud SQL

หลังจากนั้น ให้เราสร้าง User เพื่อจัดการกับ Database ในตัว Instance ของ Google Cloud SQL โดยดูที่แท็บด้านซ้าย ให้เลือกไปที่ Users

แท็บแมนูในหน้า Instance ของ Google Cloud SQL

เมื่อเข้ามาที่หน้า Users แล้ว ให้กดไปที่ Add User Account จากนั้นหน้าต่าง Add a user account to instance <instance name> ก็จะปรากฏขึ้น โดยจะมีตัวเลือกสองตัวเลือกของการทำ Authentication ได้แก่

  • Built-in authentication อันนี้จะสร้าง User account ที่พิมพ์ Username และ Password ได้ตามปกติ โดยจะมีสิทธิการใช้งานแบบเดียวกันกับ root อย่างไรก็ตาม จุดนี้เราปรับได้ในภาพหลัง
  • Cloud IAM (Cloud Identity Access Management) อันนี้เรานำ Google Account หรือ Service Account ที่เกี่ยวข้องมาใช้ โดยเราจะต้องกำหนด Role ให้สามารถจัดการกับ Instance ได้ รายละเอียดเพิ่มเติมอ่านได้ในเอกสารของ Google Cloud
หน้าจอสร้าง User Account ใน Google Cloud SQL

ในตัวอย่างนี้ เราเลือก Built-in authentication แล้วกำหนด username และ password ตามที่ต้องการ จากนั้นกดปุ่ม Add

เมื่อเพิ่ม User แล้ว เราเข้าถึง Instance ได้ โดยพิมพ์คำสั่งใน Cloud Shell ตามด้านล่างนี้

gcloud sql connect < instance that you want to connect > --user=<username> --quiet

กดปุ่ม Enter แล้วจะมีหน้าต่างสอบถามว่าเราต้องการอนุญาตให้บัญชี Google Account นี้เข้าถึงคำสั่งนี้ได้หรือไม่ ให้เรากดที่ Authorize จากนั้นก็พิมพ์รหัสผ่านของผู้ใช้ที่เราสร้างขึ้น

เมื่อสร้างเสร็จแล้วจะปรากฏหน้าจอ MySQL ขึ้นมาตามด้านล่างนี้ โดยเราสามารถพิมพ์คำสั่ง SQL ได้ตามที่ต้องการ

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 2147
Server version: 8.0.31-google (Google)

Copyright (c) 2000, 2024, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

โดยตัวอย่างนี้ให้เราพิมพ์คำสั่ง CREATE DATABASE < preferred database name > ลงใน Prompt แล้วกด Enter เพื่อสร้าง Database เมื่อสร้างเสร็จแล้วให้พิมพ์คำสั่ง SHOW DATABASES ระบบจะแสดง Database ที่มีในระบบ

mysql> CREATE DATABASE < preferred database name >;
Query OK, 1 row affected (0.22 sec)

mysql> SHOW DATABASES;
+-----------------------------+
| Database                    |
+-----------------------------+
| information_schema          |
| mysql                       |
| performance_schema          |
| < preferred database name > |
| sys                         |
+-----------------------------+
5 rows in set (0.21 sec)

เมื่อสร้างฐานข้อมูลเสร็จแล้ว ให้เรา Dump ข้อมูลจาก Database เก่ามานำเข้าไปยัง Database ใหม่ที่เราสร้างขึ้นบน Google Cloud SQL

การ Dump SQL Database เพื่อเซฟเป็นไฟล์ SQL

ต่อมา เรามาพูดถึงการสร้าง Database และ Table บน Google Cloud SQL แต่ก่อนอื่น เราจะต้องนำข้อมูลจาก Database ที่มีมาให้ใน Workshop 1 ของคอร์ส R2DE เสียก่อน โดยข้อมูล Host, Username, Password, และชื่อ Database อันนี้นี้เข้าไปดูได้ในคอร์ส

ส่วนเครื่องมือที่ใช้เป็น Client ของ SQL Database ก็มีหลายตัว ได้แก่ MySQL Workbench, DBeaver กับ HeidiSQL เป็นต้น เครื่องมือตามข้างบนนี้ เราสามารถดาวน์โหลดเพื่อมาติดตั้งได้จากลิ้งค์ที่ให้ได้ครับ

เมื่อดาวน์โหลดและติดตั้งเสร็จแล้ว ให้เชื่อมต่อกับฐานข้อมูล แล้ว Dump Database ออกมาเป็นไฟล์ sql

การอัพโหลดไฟล์ไปยัง Google Cloud Storage

ต่อมา อัพโหลดไฟล์ไปยัง Google Cloud Storage การอัพโหลดไฟล์เข้าไปยัง Cloud Storage ทำได้โดย

  • กดเข้าไปที่หน้า Cloud Storage แล้วเลือก Bucket ตามที่ต้องการ จากนั้นกดปุ่มอัพโหลด
  • หรืออัพโหลดไฟล์เข้า Cloud Shell ก่อน จากนั้นพิมพ์คำสั่ง gs util cp < preferred_upload_file > gs://< preferred_upload_bucket >/< target_path >
  • อีกวิธีก็เป็นการเขียนโค้ดเพื่ออัพโหลดไฟล์ อันนี้เราสามารถเขียนได้หลายภาษา แต่ในบทความนี้เราจะเขียนโค้ดด้วยภาษาไพทอน

การเขียนโค้ดด้วยภาษาไพทอน ทำได้โดยการเข้าไปยังหน้า Cloud Shell แล้วกดที่ปุ่ม Open Editor เพื่อเปิดหน้าแก้ไขโค้ด ต่อมา เรามาเริ่มเขียนโค้ดกัน

ขั้นตอนแรกของการเขียนโค้ดคือนำเข้าไลบรารีที่เกี่ยวข้องเสียก่อน โดยไลบรารีสำหรับการอัพโหลดไฟล์ไปยัง Google Cloud Storage หรือดาวน์โหลดไฟล์จาก Cloud Storage อยู่ในไลบรารีของ google.cloud.storage การเขียนโค้ดทำได้ตามด้านล่างนี้

from google.cloud.storage import Client

ขั้นตอนต่อมา เราเรียกใช้คลาส Client ร่วมกับการเลือก Bucket ตามที่เราต้องการ

from google.cloud.storage import Client

client = Client()
bucket = client.bucket(bucket_name)

เมื่อเลือก Bucket ตามที่เราต้องการแล้ว จากนั้นพิมพ์ตำแหน่งที่อยู่ไฟล์ที่เราต้องการให้ปรากฏที่ปลายทาง ด้วยการเขียนโค้ดได้ตามด้านล่างนี้

from google.cloud.storage import Client

client = Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(“< target path >”)

จากนั้นสั่งให้ตัวโค้ดอัพโหลดไฟล์จากตำแหน่งที่มีอยู่ใน Google Cloud Shell ด้วยคำสั่ง upload_from_filename โดยการโค้ดได้ตามด้านล่างนี้

from google.cloud.storage import Client

client = Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(“< target path >”)

generation_match_precondition = None
blob.upload_from_filename(source_filenames,
    if_generation_match=generation_match_precondition)

ตัวอาร์กิวเมนต์ if_generation_match จะกำหนดไว้ในกรณีที่ไฟล์ที่มีชื่อเดียวกันอยู่บน Cloud Storage Bucket นั้น ๆ ว่ามีไพล์อยู่แล้วหรือไม่ ถ้าเรากำหนดค่านี้ไว้เท่ากับ 0 ตัวคำสั่ง upload_from_filename จะทำงานได้โดยไม่มีข้อผิดพลาดก็ต่อเมื่อไม่มีไฟล์นั้น ๆ อยู่บน Bucket แต่ถ้าเราต้องการเขียนทับไฟล์ไปเลย ไม่สนใจว่ามีไฟล์อยู่หรือไม่ ให้ใส่ว่า None

จากน้น พิมพ์ Command Line เพื่อรันคำสั่ง เมื่อรันเสร็จแล้ว ตัวโค้ดจะอัพโหลดไฟล์เพื่อเก็บไว้บน Bucket

อัพโหลดไฟล์จาก Bucket ไปยัง Cloud SQL

การอัพโหลดไฟล์จาก Bucket ขึ้นไปยัง Google Cloud SQL ทำได้หลายวิธี ในบทความนี้จะยกตัวอย่าง 2 วิธี ได้แก่

วิธีแรก เป็นการอัพโหลดไฟล์ผ่านทางหน้าเว็บของ Google Cloud SQL วิธีนี้ทำได้ไม่ยาก โดยให้เราเข้าไปที่ Instance ของ Google Cloud SQL ที่สร้างขึ้น แล้วกดไปปุ่ม Import เลือกไฟล์ใน Bucket ที่เราได้อัพโหลดไว้ แล้วเลือกตำแหน่ง Database ที่เราได้สร้างขึ้น จากนั้นกดปุ่ม Import เพื่อนำเข้าไฟล์ไปยัง Database

นำเข้าไฟล์ผ่านหน้าเว็บ

อีกวิธี เป็นการอัพโหลดไฟล์ผ่านการพิมพ์คำสั่ง อันนี้เราทำผ่าน Google Cloud Shell ได้เลย โดยการพิมพ์คำสั่งตามด้านล่างนี้ เมื่อพิมพ์เสร็จแล้วกด Enter

gcloud sql import sql < instance that you want to connect > < sql file path on Bucket on Google Cloud Storage > [--database=< database name>, -d <database name>] [--user=<username>]

ตัวไฟล์ก็จะนำเข้าไปยังฐานข้อมูลได้เช่นกัน

สร้าง Instance บน Google Cloud Composer

ขั้นตอนนี้ ให้เราสร้าง Instance บน Google Cloud Composer เพื่อใช้งานเครื่องมือ Apache Airflow สำหรับการทำ Data Pipeline Orchestration โดยให้เราสร้าง Environment เสียก่อน ด้วยการเลือก Composer 1 หรือ 2 ก็ได้

ในตัวอย่างนี้เราจะเลือกที่ Composer 2 เมื่อเลือกแล้ว ให้เรากรอกข้อมูลให้เรียบร้อย โดยเลือก Environment resources เป็น Small แล้วกดปุ่ม Create ขั้นตอนนี้จะใช้เวลานานจนกว่าจะเสร็จ

เมื่อสร้างเสร็จแล้ว ให้เราScale ส่วน Worker ขึ้นมาหน่อย จากเดิมที่ให้แรม 2GB เป็น 3.25GB เพื่อให้มีแรมใช้เพียงพอต่องานที่จะทำขึ้น ไม่อย่างนั้นจะขึ้นประมาณว่า Process นั้นโดย Killed เนื่องจากแรมไม่พอ วิธีการ Scale ทำได้โดยเข้าไปที่ Environment ที่สร้างขึ้น เลือกแท็บ Environment Configuration จากนั้นเลื่อนมาที่ Workloads Configuration แล้วกด Edit

หน้าจอแสดง Workloads Configuration

จากนั้นหน้าจอ Workloads Configuration จะปรากฏทางด้านขวา

หน้าจอ Workloads Configuration

ให้เลือกตรงส่วน Worker โดยเพิ่มแรมจาก 2 GB เป็น 3.25 GB จากนั้นกดปุ่ม Save แล้วรอให้ Composer จัดการซึ่งใช้เวลานานครับ 😀

เปลี่ยนจาก 2 GB เป็น 3.25 GB

นอกจากนี้ เราจะใช้แพคเกจ Pandas และ Requests ดังนั้นแล้ว เราจะติดตั้งแพคเกจ numpy pandas และ requests ที่แท็บ PYPI Packages จากนั้นก็รอเวลาให้มันติดตั้ง ซึ่งก็นานอีกเช่นกัน

เปิดใช้งาน Cloud SQL Auth Proxy

Cloud SQL Auth Proxy เป็นเครื่องมือเชื่อมต่อกับบริการ Cloud SQL โดยอนุญาตให้เราเชื่อมต่อเข้ากับ Instance อย่างปลอดภัยโดยไม่ต้องไปตั้งค่า Authorized networks หรือตั้งค่า SSL

ข้อดีของการใช้บริการนี้ ได้แก่

  • ตัว Proxy ทำให้การเชื่อมต่อเข้ากับ Instance ปลอดภัยมากขึ้นโดยเข้ารหัสการเชื่อมต่อ่เข้า และออกจาก Database โดยใช้ TLS 1.3 ร่วมกับการเข้ารหัสแบบ 256-bit AES cipher นอกจากนี้ยังมีการใช้ SSL certificate เพื่อยืนยันตัวตนระหว่าง server และ client ที่ไม่ขึ้นกับตัว Protocol การเชื่อมต่อ Database ส่งผลให้เราไม่ต้องตั้งค่า SSL certificate เลย
  • บริการนี้ทำให้เราเชื่อมต่อได้ง่ายโดยใช้ IAM permission เพื่อควบคุมการเชื่อมต่อเข้ากันกับ Cloud SQL instance
  • ไม่ต้องใช้ static IP address เพื่อเชื่อมต่อเข้ากับ Cloud SQL

บริการ Cloud SQL Auth Proxy ไม่จำเป็นต้องกำหนดวิธีการเชื่อมต่อแบบใหม่เลย เพียงแต่ใช้การเชื่อมต่อ IP แบบเดิมบนเครือข่าย VPC network (Virtual Private Cloud network) ก็ใช้ได้แล้ว [6]

วิธีการติดตั้ง Cloud SQL Auth Proxy ทำได้หลายวิธี โดยอาจจะติดตั้งเครื่องมือบนคอมพิวเตอร์ หรือบนเซิร์ฟเวอร์ใช้เองก็ได้ตามในเว็บ อย่างไรก็ตาม ในบทความนี้เราจะนำมาใช้งานกับ Google Cloud Composer ที่รันอยู่บนเครื่องมือ Kubernetes เราจำเป็นต้องสร้าง Auth Proxy บนระบบนี้

ตั้งค่า Kubernetes Engine API และอัพเดทการตั้งค่า

ขั้นตอนการติดตั้งทำได้โดยเริ่มจากการเปิดใช้งาน Kubernetes Engine API แล้วอัพเดทการตั้งค่า kubectl เพื่อให้ใช้งานกับ Kubernetes Cluster ที่สร้างโดย Google Cloud Composer ด้วยการพิมพ์คำสั่ง

gcloud container clusters get-credentials < Kubernetes Cluster Name > \
    --region=< region >

โดย kubectl เป็นฟังก์ชันสำหรับการติดต่อกับ Kubernetes Cluster เพื่อที่จะติดตั้ง และจัดการ Deployment, Pod และ Container

การดูชื่อ Kubernetes Cluster ทำได้โดยเข้าไปที่ Google Cloud Composer Environment ที่เราสร้างขึ้น ต่อมากดที่แท็บ Environment Configuration แล้วดูที่ GKE cluster

เราจะพบชื่อ Cluster ตรงบริเวณที่ขีดเส้นใต้สีส้มที่อยู่หลัง / อันหลังสุด อันนี้คือชื่อที่เราจะนำมาใช้กับคำสั่งข้างบนเพื่อตั้งค่าสำหรับ kubectl

ดูรายละเอียดของ Kubernetes Cluster ได้ที่บริเวณ GKE cluster

เมื่อพิมพ์คำสั่งเสร็จแล้ว หน้าจอจะปรากฏด้านล่างนี้

gcloud container clusters get-credentials < Kubernetes Cluster Name > --region=< Region >

Fetching cluster endpoint and auth data.
kubeconfig entry generated for < Kubernetes Cluster Name >

สร้าง Google Service Account

ขั้นตอนที่สอง จะเป็นสร้าง Google Service Account สำหรับการใช้งานกับ Kubernetes Cluster เพื่อติดตั้งกับ SQL Database ที่สร้างขึ้นบน Google Cloud SQL

วิธีสร้าง Google Service Account ทำได้โดยการใช้คำสั่ง

gcloud iam service-accounts create < service_account_name >

ต่อมากำหนด Permission ให้กับ Service Account ที่สร้างขึ้น

gcloud projects add-iam-policy-binding < project_id > \
      --member serviceAccount:< service_account_name >@< project_id >.iam.gserviceaccount.com \
      --role roles/cloudsql.client --condition None

ระบบจะขึ้นหน้าจอตามด้านล่างนี้เป็นอันว่าเราตั้งค่า Permission เรียบร้อย

Updated IAM policy for project [infra-tempo-410705].
bindings:
. . .

สร้าง service_account.yaml

ขั้นตอนที่สาม เราจะสร้างไฟล์ Service Account ในรูปแบบ yaml สำหรับการใช้งานผ่าน kubectl เพื่อเก็บข้อมูล Kubernetes Service Account

apiVersion: v1
kind: ServiceAccount
metadata:
  name: < ksa_account_name >

เขียนเสร็จแล้ว ให้บันทึกไฟล์เป็น service_account.yaml แล้วพิมพ์คำสั่ง

kubectl apply -f service_account.yaml

สร้าง Workload Identity

ขั้นตอนที่สี่ เรามาสร้าง Workload Identity ที่เป็นขั้นตอนการสร้าง Kubernetes Service Account (KMS)

gcloud container clusters update \
            --region < Region > < Kubernetes Cluster Name >  \
            --workload-pool < project_id >.svc.id.goog

เมื่อพิมพ์แล้ว กด Enter ผลลัพธ์จะแสดงตามด้านล่างนี้

Updating us-central1-test-9c7bf836-gke...done.                                                                                                                                                
Updated [https://container.googleapis.com/v1/projects/< project_id >/zones/< Region >/clusters/< Kubernetes Cluster Name >].

To inspect the contents of your cluster, go to: https://console.cloud.google.com/kubernetes/workload_/gcloud/< Region >/< Kubernetes Cluster Name >?project=< project_id >

เพิ่ม Role ให้กับ Service Account และเชื่อมบัญชีไปยัง KSA Account

ขั้นตอนที่ห้า เรานำ Service Account ที่สร้างขึ้นมา

  • ตั้งค่า Role โดยให้พิมพ์ Role iam.workloadIdentityUser
  • เชื่อมต่อบัญชี Kubernetes Service Account (KSA) เข้ากันกับ Google Service Account (GSA) โดยบัญชี GSA จะอนุญาตให้เข้าถึงจาก Workload Identity ที่เราระบุ KSA ไว้ในระบบ
gcloud iam service-accounts add-iam-policy-binding \
            --role roles/iam.workloadIdentityUser \
            --member serviceAccount:< project_id >.svc.id.goog[default/< ksa_account_name >] < service_account_name >@< project_id >.iam.gserviceaccount.com

พิมพ์เสร็จแล้ว กดปุ่ม Enter แล้วผลลัพธ์แสดงตามด้านล่างนี้

Updated IAM policy for serviceAccount [< service_account_name >@< project_id >.iam.gserviceaccount.com].
bindings:
- members:
  - serviceAccount:< project_id >.svc.id.goog[default/< ksa_account_name >]
  role: roles/iam.workloadIdentityUser
etag: 
version: 1

Annotate Service Account กับ KSA Account

ขั้นตอนที่หก ทำ Annotate Google Service Account โดยเชื่อมเข้ากับ KSA Account ที่ได้สร้างขึ้นในขั้นตอนที่สาม

kubectl annotate serviceaccount < ksa_account_name > \
            iam.gke.io/gcp-service-account=< service_account_name >@< project_id >.iam.gserviceaccount.com

ทำเสร็จแล้ว กดปุ่ม Enter แล้วจะปรากฏตามด้านล่างนี้

serviceaccount/< KSA Service Account > annotated

สร้าง Kubernetes Cluster เพื่อใช้ Cloud SQL Auth Proxy ผ่าน Sidecar pattern

ขั้นตอนที่เจ็ด สร้างไฟล์ yaml สำหรับการสร้าง Cloud SQL Auth Proxy บน Kubernetes cluster ผ่าน Sidecar pattern ที่เป็นการเพิ่ม Kubernetes Pod เข้าไปใน Cluster ที่มีอยู่แล้ว เพื่อที่จะ

  • ป้องกันไม่ให้ Cloud SQL เข้าถึงได้จากภายนอก ร่วมกับเข้ารหัสการเชื่อมต่อไปยัง Cloud SQL ทำให้ปลอดภัยมากขึ้น
  • ป้องกัน Single point of failure การติดต่อเข้าถึง Database จะไม่ขึ้นกับการติดต่อของแอพอื่น ทำให้ Database Cloud SQL เสถียรมากขึ้น
  • จำกัดการเข้าถึง Cloud SQL Auth Proxy ทำให้เรากำหนด IAM permissions รายแอพได้
  • กำหนด scope ของการเข้าถึง Database [7]

วิธีการเขียนไฟล์ yaml สำหรับการใช้งาน Kubernetes ทำได้ตามด้านล่างนี้

apiVersion: apps/v1
kind: Deployment
metadata:
  name: < deployment name >
spec:
  selector:
    matchLabels:
      app: < Kubernetes Cluster Name >
  template:
    metadata:
      labels:
        app: < Kubernetes Cluster Name >
    spec:
      containers:
      - args:
        - --structured-logs
        - --address=0.0.0.0
        - --port=3306
        - < Cloud SQL Connection Name >
        image: gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.1.0
        name: cloud-sql-proxy
        resources:
          requests:
            cpu: '1'
            memory: 2Gi
        securityContext:
          runAsNonRoot: true
      serviceAccountName: < KSA Name >

บันทึกไฟล์ในชื่อ proxy_with_workload_identity.yaml จากนั้นใช้คำสั่ง kubectl apply -f proxy_with_workload_identity.yaml

สร้าง service.yaml

ขั้นตอนสุดท้าย สร้าง Service สำหรับการรัน Kubernetes ขึ้นมา โดยพิมพ์การตั้งค่าตามด้านล่างนี้

apiVersion: v1
kind: Service
metadata:
  labels:
    run: < deployment name >-service
  name: < deployment name >-service
spec:
  ports:
  - port: 3306
    protocol: TCP
    targetPort: 3306
  selector:
    app: < Kubernetes Cluster Name >
  type: ClusterIP

จากนั้นบันทึกไฟล์เป็น service.yaml แล้วใช้คำสั่ง kubectl apply -f service.yaml จากนั้นเราสามารถใช้ Google Cloud Composer เพื่อเชื่อมต่อฐานข้อมูลได้โดยใช้ชื่อตามด้านล่างนี้เป็นชื่อ Host

< deployment name >-service.default.svc.cluster.local

ขั้นตอนตามข้างบนนี้ เราสามารถใช้เครื่องมือสำเร็จรูปอย่าง Google-Cloud-SQL-Composer-Proxy ที่มีบน GitHub มาใช้ได้โดยการ Clone ตัว Repository นี้มาใช้ โดยเครื่องมือนี้ใช้ง่ายและสะดวกกว่าตามที่เขียนไว้ข้างบนมาก เพียงแค่ตั้งค่าลงในไฟล์ ini แล้วสั่งให้ scripts/create.py < config name >.ini ก็ทำได้แล้ว

การเชื่อมต่อ Google Cloud Composer เข้ากับ Cloud SQL ผ่าน Cloud SQL Auth Proxy

ทำได้โดยการเข้าไปที่หน้า Google Cloud Composer เราจะพบหน้าจอตามด้านล่างนี้ ให้กดที่ Airflow ตามที่ได้ไฮไลค์ไว้

หน้าจอ Google Cloud Composer

จากนั้น ให้เลือกไปที่เมนู Admin แล้วเลือกที่ Connections

หน้าจอ Airflow

เลื่อนลงมาจนกว่าจะพบ mysql_default (คนละอันกับ mssql_default) ให้กด Edit record แล้วพิมพ์การตั้งค่าตามด้านล่างนี้

  • Host – < deployment name >-service.default.svc.cluster.local ตามที่เขียนไปข้างบน
  • Schema – ชื่อฐานข้อมูลที่เราต้องการ
  • Login – ชื่อผู้ใช้
  • Password – รหัสผ่าน
  • Port – 3306

จากนั้นกดปุ่ม Test เพื่อทดสอบ กรณีทดสอบผ่านจะปรากฏหน้าจอตามด้านล่างนี้

หน้าจอแสดงเมื่อการทดสอบการเชื่อมต่อระหว่าง Cloud Composer กับ Cloud SQL สำเร็จ

กดปุ่ม Save แค่นี้ เราก็พร้อมสำหรับการสร้าง Data Pipeline แล้ว

เขียน Data Pipeline ผ่านการใช้ Google Airflow

การเข้าไปใช้งาน Google Cloud Composer นี้เราสามารถเข้าไปสร้างผ่าน Google Cloud Console ได้ครับ ต่อมาเรามาเขียนโค้ดสำหรับการใช้งานผ่าน Apache Airflow โดยโค้ดที่เราจะเขียนเป็นตัวไฟล์สำหรับการทำ DAG (Directed Acyclic Graph)

สำหรับส่วนประกอบ และรายละเอียดเบื้องต้นของ DAG สามารถอ่านได้ในบทความที่แล้ว

เชื่อมต่อกับ MySQL

การเขียนโค้ดเพื่อเชื่อมต่อกับฐานข้อมูล MySQL ทำได้ไม่ยาก เพียงแค่

หนึ่ง นำเข้าคำสั่ง MySqlHook จากไลบรารี airflow.providers.mysql.hooks.mysql ด้วยการเขียนโค้ดตามด้านล่างนี้

from airflow.providers.mysql.hooks.mysql import MySqlHook

จากนั้น เรียกใช้ MySqlHook เพื่อเรียกใช้การเชื่อมต่อที่เราตั้งค่าไว้ใน Airflow เราทำได้โดยการเขียนโค้ดตามด้านล่างนี้


from airflow.providers.mysql.hooks.mysql import MySqlHook

. . .

def function_name():
    mysqlserver = MySqlHook("< configured MySQL connection name >")

ต่อมา เราเรียกดูข้อมูลใน Table แล้วเก็บไว้ใน Pandas DataFrame ได้โดยการเขียนโค้ดตามด้านล่างนี้

from airflow.providers.mysql.hooks.mysql import MySqlHook

. . .

def function_name():
    mysqlserver = MySqlHook("< configured MySQL connection name >")
    pandas_dataframe = mysqlserver.get_pandas_df(sql = "SELECT * FROM < table_name >")

แค่นี้ เราก็เรียกข้อมูลจาก Table ใน SQL Database ได้แล้ว

เขียนโค้ด

ต่อมา เรามาพิมพ์โค้ดแบบจริง ๆ จัง ๆ กันแล้ว โดยในไฟล์ DAG มีส่วนประกอบทั้งหมด 5 ส่วน ได้แก่

  1. Import modules
  2. Default arguments (args)
  3. Instantiate a DAG
  4. Tasks ที่เป็นการสร้าง Operator
  5. Setting up dependencies ที่เราสามารถกำหนดทิศทางของการทำงานในแต่ละ Operator
Import modules

เรานำเข้าไลบรารีที่จำเป็นต่อการใช้งานได้โดย

  • คลาส Client ในโมดูล google.cloud.storage
  • นำเข้าโมดูล requests และ pandas
  • คลาส DAG ในโมดูล airflow.models สำหรับใช้ในขั้นตอน Instantiate a DAG
  • คลาส Operator ได้แก่ PythonOperator, DataprocSubmitJobOperator และ GCSToBigQueryOperator ในโมดูล airflow.operators.python, airflow.providers.google.cloud.operators.dataproc และ airflow.providers.google.cloud.transfers.gcs_to_bigquery ตามลำดับ
  • และ days_ago ที่นำเข้าจากโมดูล airflow.utils.dates สำหรับการกำหนดให้ DAG นี้เริ่มต้นตั้งแต่เมื่อวาน และมีผลให้รันทันทีวันนี้เมื่อถึงเวลา
# Google Cloud Storage
from google.cloud.storage import Client

# Airflow
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

# Pandas and requests
import pandas as pd
import requests
Default Arguments

กำหนดค่าเริ่มต้นของโมดูล รวมถึงกำหนดค่าตัวแปรต่าง ๆ โดย

  • กำหนดชื่อ MySQL Connection ที่จะให้เชื่อมต่อจาก Cloud Composer ไปยัง Cloud SQL ตามที่ตั้งค่าไว้ในขั้นตอนก่อนหน้า
  • กำหนดที่อยู่ URL สำหรับการโหลดข้อมูล Currency Exchange API โดยที่อยู่ลิ้งค์สามารถดูได้ใน Workshop 1 ของคอร์ส R2DE
  • กำหนดชื่อไฟล์ที่ต้องการเก็บข้อมูลจากฐานข้อมูลแสดงรายการสินค้าที่ขาย (audible) และรายการซื้อสินค้า (audible_transaction) รวมถึงกำหนดชื่อไฟล์สำหรับการเก็บค่าอัตราแลกเปลี่ยน และเก็บไฟล์ผลลัพธ์สุดท้ายก่อนนำเข้าไปยัง Google BigQuery
# Pandas and requests
import pandas as pd
import requests

MYSQL_CONNECTION = "mysql_default"
CONVERSION_RATE_URL = "< currency conversion rate api >"

# path ที่จะใช้
mysql_output_path = "audible_data_merged.csv"
conversion_rate_output_path = "conversion_rate.csv"
final_output_path = "audible_output.csv"

ร่วมกับกำหนดพารามิเตอร์สำหรับการส่งคำสั่งไปยัง Dataproc

  • reference เราจำกำหนดชื่อ project_id ตามที่เราใช้งาน Project นั้น ๆ อยู๋
  • placement กำหนดชื่อ cluster_name ที่เราเปิดใน Google Cloud Dataproc
  • pyspark_job กำหนดไฟล์ไพทอนที่เราต้องการให้รันบน Google Cloud Dataproc ด้วยการกำหนดใน main_python_file_uri กรณีที่เก็บใน Google Cloud Storage เราเขียนที่อยู่ไฟล์โดยขึ้นต้นด้วย gs:// ตามด้านชื่อ Bucket และตำแหน่งไฟล์ที่เก็บ
# pyspark
PYSPARK_JOB = {
    "reference": {"project_id": "< project_id >"},
    "placement": {"cluster_name": "< dataproc cluster name >"},
    "pyspark_job": {"main_python_file_uri": "gs://< bucket name >/< pyspark path >"},
}
Instantiate a DAG

กำหนดค่าสำหรับการสร้าง DAG ร่วมกับเขียนคำอธิบายของ DAG ด้วย Markdown ผ่าน doc_md

with DAG(
    "< dag_name >",
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=["workshop"]
) as dag:

    dag.doc_md = """
        < dag markdown description >
    """

Tasks

ส่วนนี้เป็นการสร้าง Operator ขึ้นมาสำหรับกำหนดการทำงานภายใน DAG โดยในไฟล์นี้จะเป็นการดาวน์โหลดข้อมูลจาก Dataset ที่ระบุไว้ในส่วนต้นของบทความที่นำข้อมูลจาก Database และ API

ส่วนแรกเป็นการนำข้อมูลจาก Database

def get_data_from_mysql(transaction_path):
    # รับ transaction_path มาจาก task ที่เรียกใช้

    # เรียกใช้ MySqlHook เพื่อต่อไปยัง MySQL จาก connection ที่สร้างไว้ใน Airflow
    mysqlserver = MySqlHook(MYSQL_CONNECTION)
    
    # Query จาก database โดยใช้ Hook ที่สร้าง ผลลัพธ์ได้ pandas DataFrame
    audible_data = mysqlserver.get_pandas_df(sql="SELECT * FROM audible_data")
    audible_transaction = mysqlserver.get_pandas_df(sql="SELECT * FROM audible_transaction")

    # Merge data จาก 2 DataFrame เหมือนใน workshop1
    df = audible_transaction.merge(audible_data, how="left", left_on="book_id", right_on="Book_ID")

    # Save ไฟล์ CSV ไปที่ transaction_path
    df.to_csv(transaction_path, index=False)
    print(f"Output to {transaction_path}")

ส่วนต่อมาเป็นการนำข้อมูลจาก API

def get_conversion_rate(conversion_rate_path):
    r = requests.get(CONVERSION_RATE_URL)
    result_conversion_rate = r.json()
    df = pd.DataFrame(result_conversion_rate)

    # เปลี่ยนจาก index ที่เป็น date ให้เป็น column ชื่อ date แทน แล้วเซฟไฟล์ CSV
    df = df.reset_index().rename(columns={"index": "date"})
    df.to_csv(conversion_rate_path, index=False)
    print(f"Output to {conversion_rate_path}")

สุดท้ายเป็นโค้ดการอัพโหลดไฟล์ที่ทำเสร็จแล้ว ไว้บน Google Cloud Storage เพื่อกำหนดให้เป็น Data Lake

def upload_blob(bucket_name, source_filenames, target_blob_names):
    client = Client()
    bucket = client.bucket(bucket_name)

    for source_filename, target_blob_name in zip(source_filenames, target_blob_names):
        blob = bucket.blob(target_blob_name)
        generation_match_precondition = None
        blob.upload_from_filename(source_filename,
            if_generation_match=generation_match_precondition)

        print("[*] Uploaded")

def upload_output(bucket_name, output_path):
    upload_blob(bucket_name, output_path, output_path)
    print("Uploaded")

โค้ดฟังก์ชันทั้ง 3 ฟังก์ชันข้างบน เราสามารถเขียนโดยใช้ PythonOperator ได้โดย โดย python_callable เป็นการกำหนดฟังก์ชันที่เราต้องการเรียกใช้ และ op_kwargs เป็นการกำหนด Argument ให้กับฟังก์ชัน

t1 = PythonOperator(
    task_id = "get_data_mysql",
    python_callable = get_data_from_mysql,
    op_kwargs = {"transaction_path": mysql_output_path }
)

t2 = PythonOperator(
    task_id = "get_conversion_rate",
    python_callable = get_conversion_rate,
    op_kwargs = {"conversion_rate_path": conversion_rate_output_path }
)

# bucket_name, output_path
t3 = PythonOperator(
    task_id = "upload_to_bucket",
    python_callable = upload_output,
    op_kwargs = {
        "bucket_name": "< bucket name >",
        "output_path": [mysql_output_path, conversion_rate_output_path]
    }
)

หลังจากที่เขียน PythonOperator สำหรับการดาวน์โหลด Dataset และเก็บไว้ใน Google Cloud Storage แล้ว ส่วนนี้เป็นการสั่งให้ Google Dataproc ที่เราสร้าง Cluster ขึ้นมาประมวลผล โดยสร้าง Job ขึ้นมาใหม่ ผ่านการใช้งาน DataprocSubmitJobOperator

# Run Dataproc
t4 = DataprocSubmitJobOperator(
    task_id="pyspark_task", 
    job=PYSPARK_JOB, 
    region='< Region >',
    project_id= "< project_id >"
)

ถัดจากนี้ เมื่อสั่งให้ Dataproc ทำงานเสร็จเรียบร้อยแล้ว ตัว Dataproc จะนำข้อมูลผลลัพธ์เก็บไว้ใน Data Lake อย่าง Google Cloud Storage เราก็นำผลลัพธ์ที่ได้ในฟอร์แมต Parquet ไปอัพโหลดเข้าไปเก็บใน Data Warehouse อย่าง Google BigQuery ผ่านการใช้งาน GCSToBigQueryOperator

t5 = GCSToBigQueryOperator(
    task_id = "gcs_to_bigquery",
    bucket = "< bucket >",
    source_objects = ['audible_output.parquet/*.parquet'],
    source_format = 'PARQUET',
    destination_project_dataset_table = "< bigquery dataset >.< table >",
    write_disposition = 'WRITE_TRUNCATE'
)

โดยการกำหนด WRITE_TRUNCATE มีไว้เมื่อกรณีที่มีตารางอยู่แล้ว ตัว BigQuery จะเขียนทับไปเลย ซึ่งจะแตกต่างกับ

  • WRITE_APPEND ที่เขียนต่อจากตารางเดิมไปเลยกรณีที่มีตารางนี้อยู่ใน BigQuery
  • WRITE_EMPTY ที่สร้างตารางใหม่ และเขียนข้อมูลในนั้น กรณีที่มีข้อมูลอยู่แล้ว จะขึ้น Error แทน

Setting up dependencies

ส่วนสุดท้ายจะเป็นการกำหนดทิศทางการทำงานของแต่ละ Operator ที่เราสร้างขึ้นมาในขั้นตอนที่ 4 (Tasks) โดยเราจะกำหนดให้เป็น Fan-in เนื่องจากเรากำหนดให้ดาวน์โหลด Dataset ก่อน จากนั้นอัพโหลดไปยัง Google Cloud Storage ทีเดียว จากนั้นสั่งให้ Google Dataproc ประมวลผล แล้วนำข้อมูลผลลัพธ์ที่ได้อัพโหลดเข้า Google BigQuery

การเขียนโค้ดเขียนได้ตามด้านล่างนี้

[t1, t2] >> t3 >> t4 >> t5

เมื่อเขียนโค้ด DAG แล้ว ให้เรานำตัวโค้ดนี้เก็บไว้ที่ Google Cloud Storage bucket ที่ Google Cloud Composer สร้างขึ้น โดยเก็บในโฟลเดอร์ dags ซึ่งเมื่อมองจากตัว Composer เองจะเขียนที่อยู่โฟลเดอร์ได้เป็น /home/airflow/gcs/dags

การเปลี่ยนแปลงข้อมูล (Transformation)

ขั้นตอนนี้ เป็นการนำข้อมูลที่เก็บไว้ใน Data Lake มาผ่านขั้นตอนการเปลี่ยนแปลงข้อมูล Extract Transform Load (ETL) เพื่อที่จะนำข้อมูลที่เก็บไว้ใน Google Cloud Storage มาประมวลผลเพื่อทำความสะอาดข้อมูล (Data Cleansing) โดยกระบวนการที่ทำเป็นขั้นตอนการทำ Transform เพื่อที่จะให้ข้อมูลมีคุณภาพมากขึ้น และเพื่อป้องกันการเกิด Garbage in, Garbage out

เครื่องมือ และการสร้าง Instance บน Dataproc

เครื่องมือที่เราจะใช้คือ Apache Spark ที่ใช้งานผ่าน Google Dataproc โดยให้เราเปิดใช้งาน Instance นี้เสียก่อน ด้วย

  • การสร้าง Instance บน Google Compute Engine แล้วเลือกเป็น Single Node ที่ใช้สเปคแบบ n2-standard-4 ที่ให้แรม 16 GB เนื่องมาจากขั้นตอนการทำ Data Cleansing ใช้แรมเยอะ การกำหนดแรมที่ไม่เพียงพอต่อการใช้งานจะทำให้ Job ถูก Killed ได้
  • ระหว่างการสร้าง Instance เรากำหนดให้ใช้โค้ด pip-install.sh เพื่อติดตั้งแพคเกจ numpy กับ pandas ผ่านการพิมพ์ metadata PIP_PACKAGES

เมื่อกำหนดเสร็จแล้ว กด Create เพื่อสร้าง Instance

เขียนโค้ด

เมื่อเราสร้าง Cluster บน Google Dataproc เรียบร้อย เรามาเขียนโค้ดเพื่อใช้งานกับ Google Dataproc สำหรับการทำ Data Cleansing ครับ

นำเข้าไลบรารี

ก่อนอื่นเลย เราเขียนโค้ดสำหรับการนำเข้าไลบรารีก่อน โดยนำเข้าไลบรารีตามด้านล่างนี้

  • ไลบรารี numpy และ pandas
  • นำเข้า SparkContext จากไลบรารี pyspark
  • นำเข้า SparkSession จากไลบรารี pyspark.sql
  • นำเข้าชนิดตัวแปรใน Spark DataFrame จากไลบรารี pyspark.sql.types
  • นำเข้าฟังก์ชันสำหรับประมวลผลใน DataFrame ด้วยการนำเข้าโมดูล functions จากไลบรารี pyspark.sql
  • และนำเข้า storage จากไลบรารี googe.cloud เพื่อนำเข้าเครื่องมือสำหรับเชื่อมต่อกับ Google Cloud Storage
import pandas as pd
import numpy as np

# Spark-related libraries
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as f

# Google Cloud Storage
from google.cloud import storage

กำหนดค่าเริ่มต้น

กำหนดค่าเริ่มต้น โดยการกำหนดชื่อ Bucket บน Google Cloud Storage ด้วยการดาวน์โหลดไฟล์ตามชื่อกำหนด และอัพโหลดไฟล์ตามชื่อที่กำหนดเช่นกัน

# Define Variables
bucket_name = "< google cloud storage bucket >"
mysql_output_path = "audible_data_merged.csv"
conversion_rate_output_path = "conversion_rate.csv"

ดาวน์โหลดไฟล์จาก Data Lake

ขั้นตอนก่อนที่จะเริ่มทำ Data Cleansing เราจำเป็นต้องดาวน์โหลดไฟล์ข้อมูลที่เราได้เตรียมไว้บน Google Cloud Storage ในขั้นตอนการนำเข้าข้อมูล (Ingestion) มาใช้านบน Dataproc

การเขียนโค้ดสำหรับดาวน์โหลดข้อมูลจาก Google Cloud Storage ทำได้ตามด้านล่างนี้

# Download File
def download_file(bucket, filename):
    source = filename
    target_name = filename
    blob = bucket.blob(source)
    blob.download_to_filename(target_name)
    print("[*] Downloaded")

ข้อมูล Dataset การซื้อขายสินค้า

ข้อมูลนี้มีรายละเอียดแต่ละคอลัมน์ตามด้านล่างนี้

ชื่อคอลัมน์รายละเอียด
timestampเวลาที่ซื้อสินค้า
user_idรหัสผู้ใช้
book_idรหัสหนังสือ
countryประเทศ
Book Titleชื่อหนังสือ
Book Subtitle
Book Authorชื่อผู้แต่ง
Book Narratorชื่อผู้พูดให้กับหนังสือเล่มนั้น
Audio Runtimeระยะเวลาของไฟล์เสียง
Audiobook Typeชนิดของหนังสือเสียง
Categoriesหมวดหมู่
Ratingเรตติ้งหนังสือ
Total No. of Ratingsจำนวนของเรตติ้ง
Priceราคา
รายละเอียด Dataset การซื้อสินค้า

ข้อมูล Dataset อัตราแลกเปลี่ยน

ข้อมูลอัตราการแลกเปลี่ยนมีรายละเอียดแต่ละคอลัมน์ตามด้านล่างนี้

ชื่อคอลัมน์รายละเอียด
dateวันที่
conversion_rateอัตราแลกเปลี่ยน
รายละเอียด Dataset อัตราแลกเปลี่ยน

รวม Dataset ทั้งสอง Dataset

หลังจากที่ทราบรายละเอียดของแต่ละ Dataset แล้ว เรามาเขียนโค้ดตามด้านล่างนี้เพื่อรวมทั้งสอง Dataset โดย

  • ใช้คอลัมน์ date เพื่อทำ Left Join
  • แปลงราคาเอาสัญลักษณ์ $ ออก
  • แปลงหน่วยเป็น float ต่อมาคำนวณเงินโดยแปลงอัตราแลกเปลี่ยนจากหน่วยดอลลาร์สหรัฐฯ เป็นหน่วยเงินบาท (THBPrice)
  • ลบคอลัมน์ date และ book_id

รายละเอียดของโค้ดส่วนนี้ เอามาจากใน Workshop 1 ของคอร์ส R2DE ผู้อ่านสามารถเอาโค้ดจากในนั้นมาใส่ได้เลย

def download_and_merge(bucket, transaction_path, conversion_rate_path):
    # Download files
    download_file(bucket, transaction_path)
    download_file(bucket, conversion_rate_path)

    # Read file from CSV and convert to Pandas DataFrame
    transaction = pd.read_csv(transaction_path)
    conversion_rate = pd.read_csv(conversion_rate_path)

    # Join by using date
    # < This part is based on Google Colab of Workshop 1 from R2DE course >

    # Return value
    return final_df

เมื่อเขียนโค้ดเสร็จแล้ว ก็เขียนคำสั่งเพื่อเริ่มต้นการทำงานของ Google Cloud Storage Client แล้วดาวน์โหลด Dataset จาก Data Lake

# Download File
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
pandas_df = download_and_merge(bucket, mysql_output_path, conversion_rate_output_path)

เริ่มต้นการทำงาน PySpark

ส่วนนี้เริ่มต้นการทำงานของ PySpark กับเริ่มต้นการทำงานของ Google Cloud Storage Client จากนั้นดาวน์โหลดไฟล์ Dataset จาก Google Cloud Storage

# Create Spark Session
sc = SparkContext()
spark = SparkSession(sc)
print(f"The current spark version = { spark.version }.")

ทำ Data Cleansing

ต่อมา เรานำข้อมูลที่ได้ดาวน์โหลดและรวม Dataset มาเรียบร้อย หลังจากนั้นเราแปลงตัวแปรให้อยู่ในรูปแบบ Spark DataFrame โดยกำหนดให้แต่ละคอลัมน์เป็น String ยกเว้นคอลัมน์ Price และ THBPrice เป็น Double

# From the StackOverflow issues on this problem, I copied the corrected code on the latest version of Pandas. 
# Source: https://stackoverflow.com/questions/76404811/attributeerror-dataframe-object-has-no-attribute-iteritems
pd.DataFrame.iteritems = pd.DataFrame.items

# Load Data
fields = []
for field_name in pandas_df.columns:
    if field_name == 'Price' or field_name == 'THBPrice':
        fields.append(StructField(field_name, DoubleType(), True))
    else:
        fields.append(StructField(field_name, StringType(), True))
        
schema = StructType(fields)

dt = spark.createDataFrame(pandas_df, schema)

ต่อมา เมื่อตัวแปรเป็น Spark DataFrame เรียบร้อย เรามาทำ Data Cleansing โดย

  • แปลงคอลัมน์ timestamp ให้เป็นตัวแปร Timestamp
  • พบว่ามีการเขียนชื่อประเทศผิดจาก Japan เป็น Japane
  • มีชื่อ user_id ที่เขียนเลข 0 เกิด จาก ca86d172 เป็น ca86d17200
  • แปลง Missing Value ใน user_id เป็นค่า 00000000
  • เปลี่ยนชื่อคอลัมน์จาก Total No. of Ratings เป็น Total No of Ratings เนื่องมาจากเวลานำเข้าข้อมูลไปยัง Google BigQuery ตัวโค้ดจะแจ้งว่าเกิดความผิดพลาดเนื่องมาจากมีสัญลักษณ์จุด (.)

ตัวโค้ดเขียนได้ตามด้านล่างนี้ โค้ดบางส่วนผู้อ่านสามารถดูได้ใน Workshop 2 จากคอร์ส R2DE

# Preview Data
print("Preview Data")
print(dt.show())

< This part is based on Workshop 2 (Data Cleansing) of R2DE to convert to timestamp, to correct Japane to Japan, to correct user_id, to fill Null to be 00000000>

# We have a problem on loading to Google BigQuery, so we change.
print("Rename the column Total No. of Ratings")
dt_clean = dt_clean.withColumnRenamed('Total No. of Ratings', "Total No of Ratings")

การเก็บข้อมูล (Storage)

ในขั้นตอนนี้จะเป็นขั้นตอนถัดมาจากการเปลี่ยนแปลงข้อมูล (Transformation) ที่จะเป็นขั้นตอนการ Load ข้อมูลที่ผ่านการทำ Data Cleansing แล้วไปเก็บไว้ใน Data Lake ก่อน จากนั้นนำเข้าข้อมูลไปยัง Data Warehouse

ในบทความนี้จะเลือกเก็บข้อมูลที่ผ่านการทำ Transform ตามขั้นตอน Extract Transform Load (หรือ ETL) ที่เราจะเก็บข้อมูลทุกอย่างไว้ใน Data Lake ก่อนที่จะนำไปเก็บใน Data Warehouse โดยเก็บใน Google Cloud Storage จากนั้นนำไปเก็บใน Google BigQuery

การเก็บข้อมูลลงใน Data Lake โดยเก็บไว้ใน Google Cloud Storage ทำได้โดยการเขียนโค้ดตามด้านล่างนี้

# Upload
def upload_blob(bucket, source_filenames, target_blob_names):
    for source_filename, target_blob_name in zip(source_filenames, target_blob_names):
        blob = bucket.blob(target_blob_name)
        blob.upload_from_filename(source_filename,
            if_generation_match=None)

        print("[*] Uploaded")

def write_output(bucket_name, cleaned_data):
    print("Write to Parquet") 
    cleaned_data.coalesce(1).write.mode('overwrite').parquet(f"gs://{ bucket_name }/audible_output.parquet/")
    print("Finished writing")

# Write File
write_output(bucket_name, dt_clean)
spark.stop()
print("PySpark is stopped.")

โดยการเขียนไฟล์ในโค้ดจะเป็นการเขียนไฟล์ด้วยฟอร์แมต Parquet ที่เป็นการเก็บข้อมูลที่เป็น Columnar Storage ที่ช่วยให้ใช้พื้นที่น้อยลง เก็บข้อมูลได้มากขึ้น และมีการเก็บ metadata เอาไว้ในไฟล์ ฟอร์แมตนี้นิยมใช้งานใน Apache Hadoop หรือ Apache Spark และสามารถใช้ร่วมกับภาษาเขียนโปรแกรมได้หลายภาษา

ฟอร์แมต Parquet จะดีกว่าการใช้ CSV เนื่องมาจากไฟล์ฟอร์แมต CSV มีข้อเสีย ได้แก่

  • ไม่มีการบีบอัดข้อมูลทำให้ไฟล์ขนาดใหญ่
  • ไม่มีการกำหนด structure หรือ schema ของข้อมูล
  • ข้อมูลบาง row อาจจะมีความผิดพลาดหรือผิดประเภทได้
  • การเก็บเป็น row ไม่สะดวกต่อการใช้กับระบบที่เป็น Columnar
  • ต้องกำหนด Schema เองเวลานำเข้าข้อมูลไปยัง Google BigQuery ซึ่งผิดกับ Parquet ที่ตัว BigQuery ทราบว่าข้อมูลแต่ละคอลัมน์เป็นอย่างไร และพยายามแปลงชนิดตัวแปรให้อัตโนมัติ โดยแสดงตามตารางด้านล่างนี้
ชนิดตัวแปรใน Parquetคำอธิบายชนิดตัวแปรชนิดตัวแปรใน BigQuery
BOOLEANBOOLEAN
INT32INTEGER (UINT_8*, UINT_16, UINT_32, INT_8, INT_16, INT_32)INT64
INT32DECIMALNUMERIC, BIGNUMERIC, or STRING
INT32DATEDATE
INT64None, INTEGER (UINT_64, INT_64)INT64
INT64DECIMALNUMERIC, BIGNUMERIC, or STRING
INT64TIMESTAMP, precision=MILLIS (TIMESTAMP_MILLIS)TIMESTAMP
INT64TIMESTAMP, precision=MICROS (TIMESTAMP_MICROS)TIMESTAMP
INT96NoneTIMESTAMP
FLOATNoneFLOAT64
DOUBLENoneFLOAT64
BYTE_ARRAYNoneBYTES
BYTE_ARRAYSTRING (UTF8)STRING
FIXED_LEN_BYTE_ARRAYDECIMALNUMERIC, BIGNUMERIC, or STRING
FIXED_LEN_BYTE_ARRAYNoneBYTES

*U_INT คือ Unsigned Int

นอกจากนี้ การเก็บข้อมูลประเภท Columnar Storage มีข้อดี คือถ้าค่าในคอลัมน์เหมือนกันก็ไม่ต้องเก็บค่าซ้ำ ตัวโปรแกรมจะใช้วิธีอ่านจากค่าที่เคยเก็บแล้วได้เลย อย่างไรก็ดี บาง Database อาจจะมีการแอบทำ Normalization เบื้องหลังแต่ไอเดียเหมือนกัน

เมื่อเขียนโค้ดเสร็จแล้ว ให้เซฟไฟล์ที่มีชื่อตามที่กำหนดในไฟล์ DAG ที่ระบุไว้ในส่วน Default Argument ที่เขียนขึ้นในขั้นตอนการนำเข้าข้อมูล (Ingestion) ในตัวแปร PYSPARK_JOB ที่ตำแหน่ง Key pyspark_job -> main_python_file_uri

# pyspark
PYSPARK_JOB = {
    "reference": {"project_id": "< project_id >"},
    "placement": {"cluster_name": "< dataproc cluster name >"},
    "pyspark_job": {"main_python_file_uri": "gs://< bucket name >/< pyspark path >"},
}

เมื่อเซฟไฟล์แล้ว เรานำโค้ดไปอัพโหลดลง Google Cloud Storage จากนั้นสั่งงานผ่าน Google Composer เพื่อเรียกใช้งาน DAG ที่สร้างขึ้นในขั้นตอนการนำเข้าข้อมูล (Ingestion) โดยเข้าไปที่หน้าจอ Google Cloud Composer เลือก DAG ที่สร้างขึ้น จากนั้นกดปุ่ม Trigger DAG เพื่อเริ่มต้นการทำงานของ DAG

เมื่อกดปุ่มแล้ว ระบบจะเริ่มขั้นตอนการรัน Data Pipeline จาก

  • การดาวน์โหลดข้อมูลใน Dataset
  • แปลงข้อมูลร่วมกับทำ Data Cleansing โดยเก็บข้อมูลชั่วคราวไว้ใน Data Lake
  • แล้วนำข้อมูลทีผ่านการทำ Data Cleansing ที่อยู่ใน Data Lake ไปเก็บไว้ใน Google BigQuery

เมื่อทำเสร็จแล้ว มาเปิดดูที่หน้าจอ Google Cloud Composer จะปรากฏตามด้านล่างนี้

ผลลัพธ์ของการรัน Data Pipeline ใน Google Composer

และ Google BigQuery ที่เป็น Data Warehouse ก็จะปรากฏตามด้านล่างนี้เช่นกัน

ผลลัพธ์ของการรัน Data Pipeline ใน Google BigQuery

ต่อมา เราต้องการนำข้อมูลใน Table มาให้คนอื่นอ่าน (SELECT) ได้อย่างเดียว เราไม่ได้ต้องการให้ผู้ใช้แก้ไข หรือเข้าไปลบข้อมูลได้

ส่วนนี้เราจะสร้าง View ขึ้นมา โดยจุดนี้ไม่ต้องใช้พื้นที่จัดเก็บเลย เราทำได้โดยการใช้คำสั่ง SQL อย่าง SELECT จาก Table ที่ต้องการ โดยเราจะได้ข้อมูลสดใหม่อยู่เสมอ ขั้นตอนนี้ เราเขียนคำสั่ง SQL ได้ตามด้านล่างนี้ใน BigQuery

CREATE VIEW `< project_id >`.`< dataset >`.`< table >`
AS
SELECT `user_id`, `timestamp`, `Book_ID`, `Book_Title`, `Categories`, `country`, `THBPrice` FROM `< dataset >`.`< table >`;

เขียนโค้ดเสร็จแล้ว กดปุ่ม Run ตัว BigQuery จะสร้าง View ขึ้นมาใหม่ตามที่ต้องการครับ

หน้าจอแสดง View ที่สร้างขึ้น

วิเคราะห์ หรือนำข้อมูลไปใช้ประโยชน์ (Analysis)

ส่วนสุดท้ายเป็นขั้นตอนการวิเคราะห์ข้อมูล หรือการทำนข้อมูลไปใช้ประโยชน์ (Analysis) ส่วนนี้เป็นการนำข้อมูลที่ผ่านการแปลงสภาพมาเก็บไว้ในที่เหมาะสมเพื่อวิเคราะห์ข้อมูลต่อไป

ข้อมูลที่เราเก็บอยู่ใน Data Warehouse อย่าง Google BigQuery เรานำข้อมูลที่เก็บมาทำ Dashboard ที่เป็นขั้นตอนการทำ Data Visualization เพื่อแปลงข้อมูลให้อยู่ในรูปแบบกราฟเพื่อให้เข้าใจข้อมูลที่มีอยู่ได้ง่ายขึ้น

ในตัวอย่างนี้เราจะใช้งาน Google Looker Studio ทำ Interactive Dashboard ของเราทำหน้า Dashboard ได้ตามภาพตามโจทย์ที่ตั้งไว้ที่ต้องการทราบว่าสินค้าชิ้นไหนขายดี เพื่อหาสินค้าที่ถูกใจลูกค้ามาวางขาย และจัดโปรได้เหมาะสม โดยเก็บข้อมูลไว้ในฐานข้อมูล (Database) และต้องการให้ทีมวิศวกรเตรียมข้อมูลให้ทางฝ่ายวิเคราะห์ข้อมูลเพื่อจะสร้าง Report หรือ Dashboard

แต่ก่อนอื่น ทางทีมงาน Business Analyst (BA) จะไปคุยกับผู้ใช้ (ตัวอย่างเช่นทีมงาน Product และ Marketing) ว่าต้องการอะไรบ้าง จากนั้นทางทีมงานวาดแผนภาพ Wireframe ก่อนเพื่อเช็คดว่า Dashboard ตรงกับความต้องการหรือไม่ เพราะการปรับแก้ไขใน Wireframe ทำได้ง่ายกว่า โดยตัวอย่างภาพ Wireframe แสดงตามด้านล่างนี้

ภาพ Wireframe ของ Dashboard (ตัวภาพนำมาจากคอร์ส R2DE)

เมื่อทราบโจทย์ และตัว Wireframe แล้ว เรามาวาดไว้ 2 หน้า หน้าแรกที่แสดงยอดขายสินค้าตามโจทย์ที่ตั้งไว้ โดย

  • แสดงรายได้ทั้งหมด (Total Revenue)
  • แสดงจำนวนลูกค้า (Total Customer)
  • แสดงยอดการซื้อหนังสือตามประเทศ (Transaction by country)
  • แสดงหนังสือที่ขายดีที่สุด (Best-selling books)
  • และแสดงหมวดหนังสือที่ขายดี (Best-selling categories)
Dashboard หน้าแรก

หน้าต่อมาแสดงรายการสินค้าตามยอดขาย โดยผู้ใช้สามารถคัดกรองตามยอดขายขั้นต่ำได้ (Min Revenue)

Dashboard หน้าที่สอง

โดยเราได้เพิ่ม

  • Parameter ที่กำหนดต้นทุนการผลิตนศ ขั้นต่ำ (Min Revenue)
  • และ Calculated Field ที่เป็นการคัดกรองข้อมูลตามต้นทุนขั้นต่ำที่เราได้ระบุ (More than min rev) โดยการเขียน SQL เพื่อคัดกรองตามเงื่อนไขโดยใช้คำสั่ง CASE WHEN การใช้งานแสดงตามด้านล่างนี้ เมื่อสินค้ามียอดขายมากกว่าขั้นต่ำ ตัวโค้ดจะคืนค่า 1 และกรณีที่ไม่ตรงกับเงื่อนไข โค้ดจะคืนค่าเท่ากับ 0
CASE
WHEN SUM(THBPrice) > Min_Revenue
THEN 1
ELSE 0
END 

จากนั้น เราไปปรับใน Calculated Field ให้ทำงานในตารางหน้าที่สอง โดยกดเข้าไปที่เพิ่มตัวกรอง (Add a Filter) แล้วเลือก Calculated Field More than min rev โดยให้แสดงเฉพาะสินค้าที่มียอดขายมากกว่าขั้นต่ำ

ที่มา

[1] What Is A Relational Database (RDBMS)?  |  Google Cloud

[2] มาดูความแตกต่างของ Primary Key และ Foreign Key กันเถอะ!! – BorntoDev เริ่มต้นเรียน เขียนโปรแกรม ขั้นเทพ !

[3] สรุป Google Cloud Platform สำหรับทำ Big Data และ Machine Learning: ตอน 2) Storage & Database – เข้าใจ Data ง่าย ๆ กับ DataTH

[4] Cloud SQL documentation  |  Cloud SQL Documentation  |  Google Cloud

[5] Monster Connect | เจาะลึกบริการ Database SQL ของสองแบรนด์เจ้าใหญ่ Azure vs Google Cloud

[6] About the Cloud SQL Auth Proxy | Cloud SQL for MySQL | Google Cloud

[7] Connect to Cloud SQL from Google Kubernetes Engine | Cloud SQL for MySQL | Google Cloud

By Kittisak Chotikkakamthorn

อดีตนักศึกษาฝึกงานทางด้าน AI ที่ภาควิชาวิศวกรรมไฟฟ้า มหาวิทยาลัย National Chung Cheng ที่ไต้หวัน ที่กำลังหางานทางด้าน Data Engineer ที่มีความสนใจทางด้าน Data, Coding และ Blogging / ติดต่อได้ที่: contact [at] nickuntitled.com