Bishwarup B
09/12/2021, 12:20 PMmap
to parallelise over the lat-lon pairs but i have a couple of questions here:
1. I know if i use a DaskExecutor
or LocalDaskExecutor
the flow is distributed, but is there any limit to applying map
over such a large collection?
2. instead of using threads to run the computation, is it possible to make use of async
as the tasks (most of them) are heavily IO bound?
What are some considerations I should make here? Thanks!Oliver Mannion
09/12/2021, 12:28 PMNadav Nuni
09/12/2021, 2:32 PMJacob Blanco
09/13/2021, 2:57 AMPrateek Gupta
09/13/2021, 5:39 AMPrateek Gupta
09/13/2021, 5:39 AMPrateek Gupta
09/13/2021, 5:41 AMPrateek Gupta
09/13/2021, 5:43 AMAnh Nguyen
09/13/2021, 7:38 AMhaf
09/13/2021, 12:23 PMnout
to the task decorator/constructor, or provide a Tuple
return-type annotation to your task.` but I can't use nout
because I don't know the result size and it is not a good fit for Tuple (since it's a list of tuple)haf
09/13/2021, 1:01 PMSam Cook
09/13/2021, 2:08 PMBrian Phillips
09/13/2021, 7:01 PMAWS_CREDENTIALS
secret? I'm having trouble setting credentials so that tasks are able to write with S3Result. This pattern does not seem to work either
aws_credentials = PrefectSecret('AWS_CREDENTIALS')
with prefect.context(secrets={'AWS_CREDENTIALS': aws_credentials}):
<task>
Ishavpreet Singh
09/13/2021, 7:06 PMMax Ivanchenko
09/13/2021, 7:14 PMOwen McMahon
09/13/2021, 7:47 PMmap_index
in the filename so it properly writes out each mapped task to an individual result) within a flow running against Prefect Cloud.
I just came across a weird scenario where the Flow did run the Mapped Task fully through (100 Mapped Tasks in total), but noticed afterwards that 7 of them had a status of 'Cached'. This caught my eye - as it should not have loaded any of them from the Cache. When I looked closer at the logs of one of the 'Cached' Mapped Tasks - it looks like it finished successfully, and then restarted ~7 mins later and loaded from Cache.
It appears that all data is still there as I expected - but behavior seemed a bit odd. Wondering if anyone else has seen this before?
Thanks!Ben Muller
09/13/2021, 8:10 PMKyle McChesney
09/13/2021, 8:28 PMset_dependencies
, but also get its result and pass along to other flows. I.E. mixing and matching imperative vs functional?Kyle McChesney
09/13/2021, 8:28 PMKyle McChesney
09/13/2021, 8:28 PMKyle McChesney
09/13/2021, 8:29 PMset_dependencies
on the actual first tasks, with my startup_task as an upstream
data = flow.set_dependencies(generate_data(data_csv_url), upstream_tasks=[startup()])
Kyle McChesney
09/13/2021, 8:30 PMKyle McChesney
09/13/2021, 8:30 PMdata =
should be the result of generate_data
Kyle McChesney
09/13/2021, 8:31 PMBrian Phillips
09/13/2021, 8:41 PMCluster.stop()
Constantino Schillebeeckx
09/13/2021, 9:09 PMConstantino Schillebeeckx
09/13/2021, 10:14 PMprefect.client.client.Client.get
? For reference, I'm trying to hit he following without success:
[2021-01-14 16:00:00.000] ERROR --- 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
Traceback (most recent call last):
File "deploy/register_flows.py", line 491, in <module>
main()
File "deploy/register_flows.py", line 485, in main
create_proj_and_register_flows(flows, args)
File "deploy/register_flows.py", line 278, in create_proj_and_register_flows
register_flow(flow, flow_file, args)
File "deploy/register_flows.py", line 314, in register_flow
flow_already_registered(flow.name)
File "deploy/register_flows.py", line 319, in flow_already_registered
resp = CLIENT.get('flows', params={"name": flow_name})
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 406, in get
response = self._request(
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
response = self._send_request(
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 620, in _send_request
response.raise_for_status()
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
Brad
09/14/2021, 2:55 AMCached
despite not being complete. I wonder if this is the expected result or a bug. Example in theadGaylord Cherencey
09/14/2021, 5:11 AMA
,B
and C
I want flow B
and C
to be triggered each time flow A
is run successfully but I want to define this dependency in flow B
and C
because the "parent" will be owned by another team. Is there a way to do this in Prefect or if we would have to go for an eventing solution?Jacob Blanco
09/14/2021, 7:05 AMwith case(...)
not if case
.
Having a strange issue where when running a flow it warns that a Parameter was declared but not added the Flow, and when I try to run the flow with the parameters established it errors out saying that the parameters are unexpected.
This is the structure of the flow:
with Flow(name="My Flow") as flow:
if case(Parameter("do_something", default=False), True):
result_of_thing = run_thing()
else:
result_of_thing = run_another_thing()
from flow_definition import flow
flow.run(parameters={"do_something": True})
Jacob Blanco
09/14/2021, 7:05 AMwith case(...)
not if case
.
Having a strange issue where when running a flow it warns that a Parameter was declared but not added the Flow, and when I try to run the flow with the parameters established it errors out saying that the parameters are unexpected.
This is the structure of the flow:
with Flow(name="My Flow") as flow:
if case(Parameter("do_something", default=False), True):
result_of_thing = run_thing()
else:
result_of_thing = run_another_thing()
from flow_definition import flow
flow.run(parameters={"do_something": True})