[python] 특정 주기/시각 마다 스케줄 수행 – schedule

[python] 특정 주기/시각 마다 스케줄 수행 – schedule

이번 글에서는 특정 주기과 특정 시각에 스케줄을 수행하는 방법에 대해 알아보겠습니다.

해당 기능을 구현하기 위해서 schedule 모듈을 사용하였습니다.(참고 링크)
해당 모듈은 사용법이 쉽고 직관적이여서 간단한 스케줄 작성 시 많이 사용됩니다.

schedule을 사용하기 위해 아래와 같이 모듈을 설치합니다.

pip install schedule
특정 주기

특정 주기마다 schedule을 수행하는 예제 코드는 아래와 같습니다.

import schedule
import time

def todo_job1()->None:
    print("todo job1")
    
if __name__ == "__main__":
    schedule.every(10).seconds.do(todo_job1) # 10초에 한번씩 실행
    schedule.every(1).minutes.do(todo_job1) # 10분에 한번씩 실행
    schedule.every(1).hours.do(todo_job1) # 1시간에 한번씩 실행
    schedule.every(1).days.do(todo_job1) # 1일에 한번씩 실행
    schedule.every().monday.do(todo_job1) # 매주 월요일 실행
    
    while True:
        schedule.run_pending() # 등록된 스케줄 체크
        time.sleep(1)
특정 시각

특정 시각에 schedule을 수행하는 예제 코드는 아래와 같습니다.

import schedule
import time

def todo_job1()->None:
    print("todo job1")
    
if __name__ == "__main__":
    schedule.every().minute.at(':50').do(todo_job1) # 매 분 50초가 되면 실행
    schedule.every().hour.at(':50').do(todo_job1) # 매 시간 50분에 실행
    schedule.every().day.at("10:55").do(todo_job1) # 매일 10:55 에 실행
    schedule.every().wednesday.at("13:15").do(todo_job1) # 매주 수요일 13:15 에 실행
    
    while True:
        schedule.run_pending() # 등록된 스케줄 체크
        time.sleep(1)
job 함수에 파라미터 전달

아래 todo_job2 함수의 text 파라미터에 값을 전달할 수도 있습니다.

import schedule
import time

def todo_job2(text)->None:
    print("todo job2:", text)
    
if __name__ == "__main__":
    schedule.every(10).seconds.do(todo_job2, "10초마다 실행") # 10초에 한번씩 실행
    schedule.every(1).minutes.do(todo_job2, "1분마다 실행") # 10분에 한번씩 실행
    schedule.every().hour.at(':50').do(todo_job2, "매 시간 50분마다 실행") # 매 시간 50분에 실행
    schedule.every().day.at("10:55").do(todo_job2, "매일 10:55 마다 실행") # 매일 10:55 에 실행
    
    while True:
        schedule.run_pending() # 등록된 스케줄 체크
        time.sleep(1)
주의점

공식 홈페이지에서도 schedule을 사용할 때 간단한 스케줄링에만 사용하고, 아래의 경우엔 다른 라이브러리를 찾아보라고 써있습니다.

  • 작업 지속성(재시작 사이의 스케줄 기억)
  • 정확한 타이밍(초 미만의 정확한 실행)
  • 동시 실행(멀티 쓰레드)
  • 현지화(시간대, 근무일 또는 공휴일)


동시 실행에 대해서는 아래와 같이 우회 할 수 있다고 합니다.

import threading
import time
import schedule

def job():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)
schedule.every(10).seconds.do(run_threaded, job)

while 1:
    schedule.run_pending()
    time.sleep(1)

동시 실행의 정확한 설명은 여기를 참고하시기 바랍니다.
이상으로 스케줄에 대해 알아보았습니다.