Jonah Benton
08/22/2019, 8:02 PMJonah Benton
08/23/2019, 12:54 AMMarkus Binsteiner
08/23/2019, 9:26 AMMarkus Binsteiner
08/23/2019, 9:27 AMMarkus Binsteiner
08/23/2019, 9:27 AMMarkus Binsteiner
08/23/2019, 9:29 AMMarkus Binsteiner
08/23/2019, 9:30 AMJerry Thomas
08/23/2019, 10:22 AMdef skipped(item):
return isinstance(item, prefect.engine.signals.SKIP) or isinstance(item, prefect.engine.result.NoResultType)
@task
def add(x,y)
try:
return x / y
except:
raise prefect.engine.signals.SKIP
with Flow("filter-skipped") as flow:
res = add.map([10, 2, 4], [1, 0, 3])
# res = FilterTask(res) #
Jerry Thomas
08/23/2019, 11:31 AMpython
def get_data():
with open("data/data.json","r") as f:
input = json.load(f)
return input
@task
def test(config):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(config)
def main(values):
with Flow("custom") as flow:
logger = get_logger("Flow")
<http://logger.info|logger.info>("flow started")
<http://logger.info|logger.info>(values) # this works
test(values) # this does not start
flow.run()
main(get_data())
If I use the flow logger to print the values fetched from a file it works, but the task does not seem to accept the values when it is fetched from a file. If I convert get_data() into a task and run it on a distributed environment where would it read from?Brad
08/23/2019, 11:34 AMstate_handlers
and trigger
hooks but I can’t seem to achieve what I wantDmitry Dorofeev
08/24/2019, 9:58 PMMikhail Akimov
08/24/2019, 11:26 PMAkshay Verma
08/25/2019, 4:48 PMPrefect
with the logging of the tasks it is running?Braun Reyes
08/26/2019, 5:42 AMGopal
08/26/2019, 12:56 PMMichael Adkins
08/26/2019, 3:29 PMKe Qiang
08/26/2019, 4:10 PMprefect
in the product environment now? Is there some famous success use case?An Hoang
08/26/2019, 8:51 PMdask
in this channel and then issues on github? Is there going to be a Dask-Prefect FAQ page to synthesize all the common problems/tips and tricks when using both?Feliks Krawczyk
08/27/2019, 5:26 AMAkash
08/28/2019, 10:03 AMhayssam
08/29/2019, 12:06 AM@task
def parse_file():
[...]
@task(skip_on_upstream_skip=False)
def compute_aggregates_from_db():
[...]
with Flow('ETL ISM') as flow:
e = parse_file()
t = process_dataframe(e)
l = store_dataframe_in_db(t, table_name="test_prefect")
ifelse(should_refresh_table("test_prefect"), e, Constant("No need"))
statistics = compute_aggregates_from_db(upstream_tasks=[l])
flow.run()
The flow is expected to output the statistics at every run. Suppose that I want to conditionally run the [e,t,l] part of the flow only if the file has been modified since than the last insertion in postgres. Rows in postgres are timestamped, and should_refresh_table
indicate whether an upade is required or not. Should I :
a. Perform the check in store_dataframe_in_db
and raise a prefect.engine.signals.SKIP
accordingly => I can avoid the l
part of the flow, but e
and t
are still executed
b. Add a prefect.tasks.control_flow.conditional.ifelse
on l
: same result
c. Add a prefect.tasks.control_flow.conditional.ifelse
on e
: all its downstreams are skipped, which is the desired behavior
Is this the approach you would recommend ? What was a bit surprising for me is that the condition is applied on the upstream e
task, while I (wrongly) tried to condition the downstream l
task, expecting that all upstreams are skipped as they are not needed.Feliks Krawczyk
08/29/2019, 1:57 AMfrom prefect import task, Flow
@task
def create_cluster():
cluster = create_spark_cluster()
return cluster
@task
def run_spark_job(cluster):
submit_job(cluster)
@task
def tear_down_cluster(cluster):
tear_down(cluster)
with Flow("Spark") as flow:
# define data dependencies
cluster = create_cluster()
submitted = run_spark_job(cluster)
tear_down_cluster(cluster)
# wait for the job to finish before tearing down the cluster
tear_down_cluster.set_upstream(submitted)
Wouldn’t you also need to add submitted.set_upstream(cluster)
to ensure this works? Or is there some hidden logic that means when you define a flow that it runs the lines in order, so submitted
runs only after cluster
? If that is the case, how do you get tasks to run in parallel?Feliks Krawczyk
08/29/2019, 1:57 AMFeliks Krawczyk
08/29/2019, 1:57 AMemre
08/29/2019, 11:34 AMif task.cache_for is not None
according to the task_runner.py
source code. I double checked that all my tasks have a cache_for
value of None
.
I’ve been away from Prefect for about a month and don’t remember getting this warning at all, And I haven’t changed versions. Has anyone experienced a similar issue?An Hoang
08/29/2019, 12:48 PMjazzydag
08/29/2019, 3:42 PMKyle Foreman (Convoy)
08/29/2019, 5:16 PMGopal
08/30/2019, 3:28 AMDerek Izuel
08/30/2019, 7:59 PMDerek Izuel
08/30/2019, 7:59 PMChris White
08/30/2019, 8:09 PMFailed
state object, then python doesn’t know what you’re referring to. You’ll need to do:
from prefect.engine.state import Failed
Derek Izuel
08/30/2019, 8:28 PMChris White
08/30/2019, 8:29 PM