1 contents = task(command='feast apply') 2 materia...
# ask-community
a
1 contents = task(command='feast apply') 2 materialize = task(command='feast materialize 2021-11-08 2021-11-09') 3 getFeatures() These are the three tasks in my flow that i want to run but in sequence as given above, but the problem i think is its run the first task for feast apply and than it runs third task getFeatures() and after that it comes back to task 2 for materalize. I think it does not wait for second task to finish and starts the third as well.
a
You could provide the keyword argument
upstream_tasks=[contents]
for the second task, and
upstream_tasks=[materialize]
for the third task in order for them to run in that sequence.
upvote 1
a
Thank you.
dataFrame = FeatureEngineering() datasent = sendDataToOfflineStore(upstream_tasks=[dataFrame]) contents = task(upstream_tasks=[datasent],command='feast apply') materialize = task(upstream_tasks=[contents] ,command='feast materialize 2021-11-08 2021-11-12') featuresForTraining(upstream_tasks=[materialize]) The code above is my flow and when i register the flow i get the following error Traceback (most recent call last): File "/home/aqib/.local/lib/python3.8/site-packages/prefect/cli/build_register.py", line 134, in load_flows_from_script namespace = runpy.run_path(abs_path, run_name="<flow>") File "/usr/lib/python3.8/runpy.py", line 265, in run_path return _run_module_code(code, init_globals, run_name, File "/usr/lib/python3.8/runpy.py", line 97, in _run_module_code _run_code(code, mod_globals, init_globals, File "/usr/lib/python3.8/runpy.py", line 87, in _run_code exec(code, run_globals) File "/home/aqib/openaimpPrefect/flows/flow.py", line 51, in <module> datasent = sendDataToOfflineStore(upstream_tasks=[dataFrame]) TypeError: sendDataToOfflineStore() got an unexpected keyword argument 'upstream_tasks' previously when i did not use the upstream task below was my flow dataFrame = FeatureEngineering() datasent = sendDataToOfflineStore(dataFrame) contents = task(command='feast apply') materialize = task(command='feast materialize 2021-11-08 2021-11-12') featuresForTraining()
a
How do you define
sendDataToOfflineStore
?
a
this function takes dataframe as arguments and sends it to postgres
a
Did you forget the prefect task decorator?
a
@task(log_stdout=True) i am using this one
a
Right. Then this should not be happening. I suggest that you post the entire flow as well as the tasks. Instead of posting your actual task code, you can just change the task code into a line of dummy code, as the point is to get a reproducible flow.
Oh, wait, I just realised that in your original code, you pass dataFrame directly. That means that you don't need to specify it as an upstream task because it is implicitly upstream, then the change caused an error because you still had that parameter. Maybe that's the issue, but without seeing the code I'm not entirely sure.
a
yes previously i was passing dataframe directly