Daniel Manson
10/20/2021, 1:31 PMa = task_a()
test_value = task_get_test_value()
with case(test_value, 'test-against-this'):
b = task_b()
b.set_upstream(a)
...then task b
would fail if a
fails regardless of whether the case
block is in the skipped or non skipped state. However that doesn't seem to be the way it works - rather if the case
block skips, then it 'hides' whether or not a
succeeded.Tim Enders
10/20/2021, 1:39 PMwith Flow(
"AWS CUR import",
result=GCSResult(bucket="platformsh-vendor-prefect"),
state_handlers=[flow_failure],
) as flow:
# save_file = prefect.Parameter("Download Files", default=False)
num_periods = prefect.Parameter("num_periods", default=4)
periods = generate_periods(num_periods)
apply_map(run_or_bail, periods)
clear_files()
Kyle McChesney
10/20/2021, 3:14 PMupstream_tasks
, I have the following example flow:
with Flow(
'example',
executor=LocalExecutor(),
) as flow:
data_csv_url = Parameter('data_csv_url')
output_url = Parameter('output_url')
data = generate_data(data_csv_url)
data_with_stuff = generate_stuff.map(data)
data_with_stuff_written = write_data(data_with_stuff, output_url)
data_with_other_stuff = generate_other_stuff.map(
data_with_stuff,
upstream_tasks=(data_with_stuff_written,),
)
I read a csv file and generate some “data” objects (a list of them) for the given flow run. I think map them across another task, which mutates the data items to add things to them. I want to then write the state of the objects to a file for tracking and I want to do this BEFORE I make further modifications in`generate_other_stuff`. With this flow def, generate_other_stuff
fails with out any exception and does not seem to even run the taskKyle McChesney
10/20/2021, 3:15 PM└── 09:09:26 | INFO | Task 'generate_data[0]': Starting task run...
└── 09:09:26 | INFO | Task 'generate_data[0]': Finished task run for task with final state: 'Success'
└── 09:09:26 | INFO | Task 'write_data': Starting task run...
└── 09:09:26 | INFO | Task 'write_data': Finished task run for task with final state: 'Success'
└── 09:09:27 | INFO | Task 'generate_other_stuff': Starting task run...
└── 09:09:27 | INFO | Task 'generate_other_stuff': Finished task run for task with final state: 'Failed'
Marwan Sarieddine
10/20/2021, 4:12 PMMatthew Seligson
10/20/2021, 5:04 PMLuis Gallegos
10/20/2021, 6:39 PMAqib Fayyaz
10/20/2021, 8:36 PMThomas Furmston
10/20/2021, 8:52 PMSlackbot
10/20/2021, 8:55 PMJacolon Walker
10/20/2021, 9:48 PMNo heartbeat detected from the remote task; marking the run as failed.
. Any info on what is possibly causing this?William Grim
10/21/2021, 5:14 AMprefect
CLI tool to do things like check which flows are running and see their logs.
If a flow is running and I want to cancel it, how could I do this? There doesn't seem to be a command, and I'm assuming I need to do it with graphql. Is there an example of how to do this anyway?
I appreciate it in advance.AJ
10/21/2021, 6:39 AMPiyush Bassi
10/21/2021, 6:53 AMMathias Lanng
10/21/2021, 6:54 AMMichael Hadorn
10/21/2021, 7:07 AMJohan Wåhlin
10/21/2021, 7:11 AMAkarshan Arora
10/21/2021, 9:34 AMAqib Fayyaz
10/21/2021, 9:59 AMAmine Dirhoussi
10/21/2021, 10:10 AMprefect agent docker
i am having a permission error :
Thanks for your helpAndreas Tsangarides
10/21/2021, 10:32 AMfor target_date in target_dates:
f.run(parameters={"target_date": target_date}, run_on_schedule=False)
I am running the above using a LocalDaskExecutor(scheduler="processes", num_workers=12)
locally. The problem is that run time grows...
Took 15.72 sec for 2016-05-08
Took 20.59 sec for 2016-05-09
Took 32.52 sec for 2016-05-10
Took 43.89 sec for 2016-05-11
Took 55.81 sec for 2016-05-12
Took 67.15 sec for 2016-05-13
Took 77.92 sec for 2016-05-14
Took 90.59 sec for 2016-05-15
Took 105.21 sec for 2016-05-16
Took 115.50 sec for 2016-05-17
Would that be a dask issue? Any suggestions for maybe running the backfill locally in a different way welcome!
(There is no back aggregation involved, so no reason for later runs to take longer. Volume/size of data is almost identical for each run. So when a backfill completes and I start another one fora different set of dates the time profile is similar: very fast -> slower)Martim Lobao
10/21/2021, 10:44 AMDaniil Ponizov
10/21/2021, 1:39 PMLuiz Felipe
10/21/2021, 3:12 PM@task(checkpoint=True, target="{task_name}.pkl", nout=4)
def split_train_test(dataset):
array = dataset.values
X = array[:,0:4]
y = array[:,4]
X_train, X_validation, Y_train, Y_validation = train_test_split(X, y, test_size=0.20, random_state=1)
return X_train, X_validation, Y_train, Y_validation
Using LocalResult on my flow, I noticed that I've saved an split_train_test.pkl
, which was what I expected, but also 4 files with prefect default names (prefect-result-...
). Is it possible to define a template for these files too? Or just not save them separately?
This is important because, with these names, the cache will never work for them (as it have the current timestamp)Constantino Schillebeeckx
10/21/2021, 4:49 PMif not args.dry_run:
project = args.project
logger.log(SUCCESS, f"Registering flow '{flow.name}' to project '{project}'.")
logger.debug(f"Serialized flow: {json.dumps(flow.serialize(), indent=4, sort_keys=True)}")
logger.debug(f"Hashed flow: {flow.serialized_hash()}")
flow.register(
project_name=project,
idempotency_key=flow.serialized_hash(),
set_schedule_active=set_schedule_active,
)
I noticed that on back to back execution of the above script, for the same flow, to the same project, different hashes were being created. When I look at the serialized_hash
of those flows I see no difference. To confirm the serialized flows are the same, I've saved the logging output to two json files (see screenshot). The only difference I can spot is that is use indent=4
in my json.dumps however the codebase does not.Margaret Walter
10/21/2021, 5:36 PMJacob Bedard
10/21/2021, 6:02 PMJacob Bedard
10/21/2021, 6:03 PMJacob Bedard
10/21/2021, 6:03 PMWilliam Jamir
10/21/2021, 6:51 PM