이번 글에서는 특정 주기과 특정 시각에 스케줄을 수행하는 방법에 대해 알아보겠습니다.
해당 기능을 구현하기 위해서 schedule 모듈을 사용하였습니다.(참고 링크)
해당 모듈은 사용법이 쉽고 직관적이여서 간단한 스케줄 작성 시 많이 사용됩니다.
schedule을 사용하기 위해 아래와 같이 모듈을 설치합니다.
1 | pip install schedule |
특정 주기
특정 주기마다 schedule을 수행하는 예제 코드는 아래와 같습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | import schedule import time def todo_job1() - > None : print ( "todo job1" ) if __name__ = = "__main__" : schedule.every( 10 ).seconds.do(todo_job1) # 10초에 한번씩 실행 schedule.every( 1 ).minutes.do(todo_job1) # 10분에 한번씩 실행 schedule.every( 1 ).hours.do(todo_job1) # 1시간에 한번씩 실행 schedule.every( 1 ).days.do(todo_job1) # 1일에 한번씩 실행 schedule.every().monday.do(todo_job1) # 매주 월요일 실행 while True : schedule.run_pending() # 등록된 스케줄 체크 time.sleep( 1 ) |
특정 시각
특정 시각에 schedule을 수행하는 예제 코드는 아래와 같습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import schedule import time def todo_job1() - > None : print ( "todo job1" ) if __name__ = = "__main__" : schedule.every().minute.at( ':50' ).do(todo_job1) # 매 분 50초가 되면 실행 schedule.every().hour.at( ':50' ).do(todo_job1) # 매 시간 50분에 실행 schedule.every().day.at( "10:55" ).do(todo_job1) # 매일 10:55 에 실행 schedule.every().wednesday.at( "13:15" ).do(todo_job1) # 매주 수요일 13:15 에 실행 while True : schedule.run_pending() # 등록된 스케줄 체크 time.sleep( 1 ) |
job 함수에 파라미터 전달
아래 todo_job2 함수의 text 파라미터에 값을 전달할 수도 있습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | import schedule import time def todo_job2(text) - > None : print ( "todo job2:" , text) if __name__ = = "__main__" : schedule.every( 10 ).seconds.do(todo_job2, "10초마다 실행" ) # 10초에 한번씩 실행 schedule.every( 1 ).minutes.do(todo_job2, "1분마다 실행" ) # 10분에 한번씩 실행 schedule.every().hour.at( ':50' ).do(todo_job2, "매 시간 50분마다 실행" ) # 매 시간 50분에 실행 schedule.every().day.at( "10:55" ).do(todo_job2, "매일 10:55 마다 실행" ) # 매일 10:55 에 실행 while True : schedule.run_pending() # 등록된 스케줄 체크 time.sleep( 1 ) |
주의점
공식 홈페이지에서도 schedule을 사용할 때 간단한 스케줄링에만 사용하고, 아래의 경우엔 다른 라이브러리를 찾아보라고 써있습니다.
- 작업 지속성(재시작 사이의 스케줄 기억)
- 정확한 타이밍(초 미만의 정확한 실행)
- 동시 실행(멀티 쓰레드)
- 현지화(시간대, 근무일 또는 공휴일)
동시 실행에 대해서는 아래와 같이 우회 할 수 있다고 합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | import threading import time import schedule def job(): print ( "I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target = job_func) job_thread.start() schedule.every( 10 ).seconds.do(run_threaded, job) schedule.every( 10 ).seconds.do(run_threaded, job) schedule.every( 10 ).seconds.do(run_threaded, job) schedule.every( 10 ).seconds.do(run_threaded, job) schedule.every( 10 ).seconds.do(run_threaded, job) while 1 : schedule.run_pending() time.sleep( 1 ) |
동시 실행의 정확한 설명은 여기를 참고하시기 바랍니다.
이상으로 스케줄에 대해 알아보았습니다.