Nick Untitled

Writing as my personal diary

Redis Queue - ตัวช่วยให้งานบน Web Server เป็น Asynchronous

Share: 

ปกติเวลาที่ฝั่ง Client ส่งข้อมูลไปที่ Server แล้ว เราจะรอระยะเวลาไม่นาน จากนั้น Server จะส่งข้อมูลกลับทาง Client ไว้สำหรับการประมวลผลต่อ ทีนี้เวลาที่เราส่งข้อมูลที่ใช้ระยะเวลาประมวลผลนานไปยัง Server แล้วประมวลผลผ่านไปซักพัก จะเกิดปัญหาหนึ่งคือ Request Timeout

เมื่อเกิดปัญหา Timeout แล้ว เราต้องการแก้ปัญหานี้ จะทำอย่างไร? ปัญหานี้ที่เคยอ่านในอินเตอร์เน็ตก็มีหลายวิธี ตัวอย่างเช่น แก้ที่ตัวเว็บเซิร์ฟเวอร์ว่าตั้งค่าให้มีระยะเวลานานเท่าไรก่อนที่จะขึ้นว่า Timeout หรือว่าไปแก้ที่ตัวโค้ดให้ใช้เวลาประมวลผลที่สั้นลง

อย่างไรก็ดี กรณีที่ตัวโค้ดมันแก้ไปให้ทำงานสั้นลงได้ลำบาก ตัวอย่างก็เป็นงานด้าน AI/ML ที่ต้องนำภาพมาประมวลผลเพื่อหาใบหน้าในภาพ (Face Detection) อันนี้มีวิธีหนึ่งที่ช่วยแก้ปัญหาได้คือใช้ตัว Redis Queue

แต่ก่อนอื่น เรามาแนะนำ Redis

Redis

Redis เป็นโปรแกรมฐานข้อมูลที่เป็น Open source ที่เก็บโครงสร้าง และข้อมูลไว้บน Memory โดยเราสามารถใช้งานเป็นฐานข้อมูลชั่วคราว หรือใช้ในการเก็บ Cache รวมถึงใช้ในการจัดการ Queue เพื่อไว้ใช้ประมวลผลสำหรับฟังก์ชันที่ต้องใช้ระยะเวลาประมวลผลที่มีระยะเวลานาน

อย่างไรก็ดี เราจะนำโปรแกรมนี้มาใช้งานแทนโปรแกรมฐานข้อมูลที่มีอยู่เดิมไม่ได้ทุกกรณี แต่ในบทความนี้เราจะเอามาใช้งานเรื่อง Queue ครับ

อ่านเพิ่มเติมได้ตามอินเตอร์เน็ตเลย อันนี้เป็นตัวอย่างหนึ่งครับ

ต่อจาก Redis แล้ว ต้องมีตัวช่วยคือ Redis Queue

Redis Queue

Redis Queue เป็นไลบรารีหนึ่งในภาษาไพทอนที่มีหน้าที่จัดการคิว และประมวลผลฟังก์ชันที่ต้องใช้ระยะเวลานานให้อยู่เบื้องหลังโดยการใช้ตัว Workers ไลบรารีนี้ใช้ฐานข้อมูล Redis สำหรับการจัดการเรื่อง Queue ครับ ไลบรารีนี้มีข้อดีคือจัดการกับ Queue ได้ง่ายกว่าการใช้ไลบรารีอย่าง Celery

การติดตั้ง และเขียนโค้ด

การติดตั้งไลบรารี และเขียนโค้ดขึ้นมาทำได้ไม่ยากจนเกินไปครับ เราสามารถติดตั้งได้โดย

1. ติดตั้งฐานข้อมูล Redis

เราสามารถติดตั้งฐานข้อมูล Redis ได้หลายวิธี แต่วิธีที่ง่ายกว่าวิธีอื่น เพราะไม่ต้องติดตั้ง หรือตั้งค่าอะไรซับซ้อน ก็คือการใช้ Container ผ่านการใช้เครื่องมืออย่าง Docker

เราพิมพ์คำสั่งตามด้านล่างนี้ก็ติดตั้งได้เลยครับ ขั้นแรกเป็นการดึงตัว Container จากฐานข้อมูล Docker Hub

docker pull redis

ต่อมา เริ่มการทำงานตัว Container ของ Redis

docker run -d -p 6379:6379 redis

เมื่อพิมพ์คำสั่งตามนี้แล้ว ตัว Container Docker ของ Redis ก็ทำงานเรียบร้อยครับ

2. ติดตั้งไลบรารีที่จำเป็นของไพทอน

เราติดตั้งไลบรารีที่จำเป็นของไพทอนได้โดยการใช้คำสั่ง pip หรือ conda หรืออย่างอื่นก็ได้ แต่ในบทความนี้จะใช้คำสั่ง pip โดยติดตั้งผ่านการสร้างโดยใช้คำสั่ง virtualenv ครับ

pip install redis rq

เมื่อพิมพ์คำสั่งแล้วกด Enter เราจะรอให้ตัวโปรแกรมติดตั้งไลบรารีครับ เมื่อติดตั้งเสร็จแล้ว เราสามารถเรียกใช้ไลบรารีทั้งสองได้แล้วครับ

ส่วนใครที่ใช้ Container เราเขียนเพิ่มได้ในที่ไฟล์ requirements.txt หรืออื่น ๆ ตามที่เราใช้งานอยู่ได้ครับ

3. การเรียกใช้งาน

เราสามารถเขียนเพื่อใช้งานตัว Redis Queue ได้ตามด้านล่างนี้ครับ

from redis import Redis
from rq import Queue, Connection
from rq.job import Job

4. การเชื่อมต่อเข้ากับฐานข้อมูล Redis

เราเชื่อมต่อกับฐานข้อมูลของ Redis ได้โดยใช้คำสั่งในไลบรารี redis ที่เราติดตั้งในขั้นตอนก่อนหน้านี้ได้โดยการพิมพ์คำสั่งตามด้านล่างครับ

redis_conn = Redis(host URL, port)

host URL คือที่อยู่ของเซิร์ฟเวอร์ที่ติดตั้งฐานข้อมูล Redis ไว้ ในกรณีนี้อยู่ในคอมเครื่องเดียวกัน เราพิมพ์ localhost แทนได้ แต่ถ้าอยู่ใน Container อาจจะต้องพิมพ์เป็นอย่างอื่นแทน ส่วน port ก็คือ port ของเซิร์ฟเวอร์ที่ลงตัว Redis ครับ

สำหรับการตั้งค่าอื่นเพิ่มเติม สามารถอ่านได้ในหน้านี้ครับ

4. การส่งฟังก์ชันที่ใช้เวลาประมวลผลนานไว้บน Redis สำหรับ Queue

การส่งฟังก์ชันที่ใช้ระยะเวลาประมวลผลนานไว้บน Redis สำหรับ Queue เพื่อให้ตัว Redis Worker ใช้งานฟังก์ชันนั้นในเบื้องหลัง แทนที่จะประมวลผลบน HTTP server สามารถทำได้ไม่ยาก เพียงใช้ฟังก์ชันที่มีอยู่ใน Redis Queue ที่มีชื่อว่า Enqueue

เราเรียกใช้ฟังก์ชัน Enqueue ได้โดยการพิมพ์ตามด้านล่างนี้ครับ

q = Queue(connection=redis_conn)
job = q.enqueue(long_running_function,  argument)

ตรงที่พิมพ์ long_running_function เป็นฟังก์ชันอะไรก็ได้ที่ใช้เวลาประมวลผลนานมากกว่าปกติ โดยฟังก์ชันนี้ต้องอยู่ในไฟล์ไพทอนที่__ไม่ใช่__ไฟล์หลักที่เริ่มต้นใช้งานครับ ตัวอย่างการใช้งานดูได้ตามด้านล่างนี้ เราจะแบ่งไฟล์เป็นสองไฟล์คือไฟล์ task.py ที่ใส่ฟังก์ชันหนึ่งที่ใช้ระยะเวลาทำงานนานมากกว่าปกติ ส่วนไฟล์ main.py เป็นไฟล์หลักที่เริ่มต้นทำงานครับ

ไฟล์ task.py

import time

def long_running_function(seconds):
    print("Starting num task")
    for num in range(seconds):
        print(num)
        time.sleep(1)
    print("Task to print_numbers completed")

ไฟล์หลัก main.py

from redis import Redis
from rq import Queue, Connection
from task import long_running_function

redis_conn = Redis('127.0.0.1', 6379)
q = Queue(connection=redis_conn)
job = q.enqueue(long_running_function,  5)
job_id = job.id

ทดลองเริ่มต้นการทำงานโดยพิมพ์คำสั่งตามข้อที่ 6 และ 7 ครับ ผลลัพธ์ที่ได้จะแสดงตามด้านล่างนี้

10:00:29 default: task.long_running_function(5) (XXXX)
Starting num task
0
1
2
3
4
Task to print_numbers completed
10:00:34 default: Job OK (XXX)
10:00:34 Result is kept for 500 seconds

แสดงว่าฟังก์ชันนี้ทำงานไดสำเร็จโดยไม่มีปัญหาอะไรครับ

กรณีที่มีมากกว่าหนึ่ง Argument

ส่วนกรณีที่มีมากกว่าหนึ่ง Argument เราสามารถดูตัวอย่างได้ตามด้านล่างนี้ครับ

q = Queue(connection=redis_conn)
job = q.enqueue(long_running_function,  
        args=('http://nvie.com',),
        kwargs={
            'description': 'Function description', # This is passed on to count_words_at_url
            'ttl': 15  # This is passed on to count_words_at_url function
        }))

นอกจากนี้ เราสามารถเพิ่มการตั้งค่าของคำสั่ง enqueue เพิ่มเติมได้ตามหน้านี้ของเว็บไลบรารี Redis Queue ครับ

การทำงานของฟังก์ชัน enqueue

เมื่อใส่คำสั่งนี้แล้ว ตัวโปรแกรมจะส่งฟังก์ชันนี้ไว้ใน Queue ของ Redis แล้ว ตัว Workers จะนำฟังก์ชันใน Queue นั้น ๆ มาประมวลผลครับ

ตัวคำสั่ง enqueue จะคืนค่าชนิดตัวแปรที่มีชื่อเรียกว่า Job ในตัวแปร job ตามที่เขียนไว้ข้างบนนี้ เราสามารถเรียกตัว id สำหรับการติดตามความก้าวหน้าได้โดยพิมพ์ job.id ครับ

5. การติดตามความก้าวหน้าของการทำงานฟังก์ชัน

หลังจากที่ได้ตัว id มาเรียบร้อยแล้ว เราสามารถติดตามการทำงานของฟังก์ชันที่ประมวลผลอยู่ใน Workers ได้โดยการพิมพ์คำสั่งตามด้านล่างนี้ครับ

job = Job.fetch('job_id', connection=redis_conn)

พิมพ์คำสั่งนี้แล้ว ตัวฟังก์ชันจะคืนค่าตัวแปร Job เพื่อใช้สำหรับการประมวลผลในขั้นตอนต่อไป เราติดตามความก้าวหน้าของฟังก์ชันได้โดยพิมพ์คำสั่ง

status = job.get_status()

ตัวฟังก์ชันนี้จะคืนค่าสถานะของฟังก์ชันที่ทำงานอยู่ โดยสถานะนี้แบ่งได้เป็น queued, started, deferred, finished, stopped, scheduled, canceled และ failed ครับ ผู้อ่านนำไปประยุกต์ใช้เพื่อติดตามความก้าวหน้าของฟังก์ชันได้ครับ

ส่วนของผมจะเช็คว่าฟังก์ชันนี้ทำงานได้สำเร็จ หรือไม่สำเร็จ หรืออื่น ๆ หรือไม่ เราพิมพ์ได้ตามด้านล่างนี้ครับ

ตัวอย่างที่ใช้การวนลูปในไพทอน

เราเพิ่มการนำเข้าไลบรารีเป็น

from time import sleep
import json

แล้วพิมพ์โค้ดเพิ่มตามด้านล่างนี้ (โค้ดส่วน setInterval เอามาจากเว็บ Stackoverflow)

def call_at_interval(period, callback, args):
    while True:
        sleep(period)
        if callback(*args) == False:
            break

def setInterval(period, callback, *args):
    Thread(target=call_at_interval, args=(period, callback, args)).start()

def polling(job_id):
    job = Job.fetch(job_id, connection=redis_conn)
    status = job.get_status()
    result = job.result

    if status == 'finished':
        print(json.dumps({
            'status': 'success',
            'result': result
        }))
        return False
    elif status == 'failed':
        print(json.dumps({
            'status': 'failed',
            'result': result
        }))
        return False

    print(json.dumps({
      'status': 'processing'
    }))
    return True

setInterval(1, polling, job_id)

ตัวอย่างที่ใช้ไลบรารีของ Flask

@app.route('/get_status/<string:task_id>', methods=['GET'])
def get_status(task_id):
    q = Queue(connection=redis_conn)
    job = Job.fetch(task_id, connection=redis_conn)
    status = job.get_status()
    result = job.result

    if status == 'finished':
        return json.dumps({
          'status': 'success',
          'result': result
        })
    elif status == 'failed':
        return json.dumps({
          'status': 'failed',
          'result': result
        })

    return json.dumps({
      'status': 'processing'
    })

ส่วน flask ตามข้างบนนี้คืออะไร Flask เป็นไลบรารีที่มีหน้าที่ทำตัว Web server ครับ รายละเอียดของไลบรารีนี้สามารถอ่านเพิ่มเติมได้ที่นี่ครับ

6. การเริ่มต้น Workers

เราเรียกใช้งานตัว Workers ได้โดยพิมพ์คำสั่งตามด้านล่างนี้ครับ

rq worker --with-scheduler

จากนั้นเรียกใช้งานไฟล์ไพทอนตามปกติ

7. การเริ่มต้นตัวโปรแกรม

เราเริ่มต้นตัวโปรแกรมที่ใช้ Redis Queue ตามข้างบนที่ผ่านมานี้ได้โดยใช้คำสั่ง

python <ชื่อไฟล์ .py>

หรือกรณีที่ใช้ Flask, FastAPI หรืออื่น ๆ เราสามารถเรียกใช้งานผ่าน gunicorn, uvicorn, uWSGI ได้ครับ

8. เพิ่มเติม

ฟังก์ชันอื่น ๆ ที่มีอยู่ในไลบรารี Redis Queue ผู้อ่านเข้าไปอ่านที่หน้าเว็บไซต์ตามลิ้งค์นี้ได้ครับ ตัว Reference ที่มีอยู่ในเว็บไซต์เข้าใจได้ไม่ยากจนเกินไปครับ

สรุป

ปกติการเรียกใช้งาน HTTP Request ทั่ว ๆ ไป เราจะเรียกจากตัว Client แล้วให้ทาง Server ประมวลผลตามปกติ ทีนี้กรณีที่ใช้ฟังก์ชัันที่ใช้ระยะเวลาการประมวลผลนานมากกว่าปกติ จะทำให้เกิด Timeout ขึ้นมา การใช้งานไลบรารี Redis Queue สามารถแก้ปัญหาที่จุดนี้ได้โดย

  1. Server รับ Request จากทาง Client แล้วใช้คำสั่งตามข้อที่ 3-5 เพื่อส่งฟังก์ชันนั้นไว้ใน Queue ของ Redis แล้วให้ทาง Workers ประมวลผลต่อ
  2. เมื่อส่ง Queue ไปแล้ว เราต้องการค่า job_id ออกมา แล้วติดตามความก้าวหน้าของฟังก์ชันนั้น โดยทำตามข้อที่ 4-5

ซึ่ง เราได้ทดลองใช้กับ web server ที่เขียนโดย Flask ไปแล้ว ตัว Redis Queue แก้ปัญหาเรื่อง Request Timeout ระหว่างการประมวลผลฟังก์ชันที่ต้องใช้ระยะเวลาประมวลผลนานครับ

, , , , , , , , ,