Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
428 views
in Technique[技术] by (71.8m points)

python - os.listdir vs grep in a Prefect schedule

I'm scheduling tasks with Prefect this way :

#Python script
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from datetime import timedelta
from datetime import datetime
from prefect.schedules import IntervalSchedule
import os
import sys

schedule = IntervalSchedule(start_date=datetime.now() + timedelta(seconds=10),interval=timedelta(minutes=1))
can_start = True

with Flow("List files", schedule) as flow:
    
    if can_start:
        can_start = False
        file_names = os.listdir("/home/admin/data/raw")
        file_names = fnmatch.filter(file_names, "*fact*")
        process_common.map(file_names)
        can_start = True
    
out = flow.run()

But if files arrive into my directory after the first Prefect run, file_names remain empty during the second run, and also during all the next ones.

I have tried to fetch my files with a grep command, and then it works !

file_names = ShellTask(command="ls /home/admin/data/raw | grep fact", return_all=True, log_stderr=True, stream_output=True)

Does someone know why that happens ? Many thanks for your help.

question from:https://stackoverflow.com/questions/65899426/os-listdir-vs-grep-in-a-prefect-schedule

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

This is a common confusion point - you are conflating build-time logic with runtime logic (see this SO post for another example).

All logic that you want to have effect at runtime should be encapsulated as a Prefect task - in your case, you may need to use Prefect's conditional tasks to achieve your outcome, although you might be able to get away with something much simpler.

In particular, the following code seems to have the desired outcome:

@task
def get_filenames():
    file_names = os.listdir("/home/admin/data/raw")
    file_names = fnmatch.filter(file_names, "*fact*")
    return file_names


with Flow("List files", schedule) as flow:
    process_common.map(file_names) # if the list is empty, nothing will happen
    
out = flow.run()

Lastly, note that you can effectively mark tasks as "skipped" based on dynamic runtime conditions using SKIP signals.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...