Prateek Gupta
09/13/2021, 5:39 AMPrateek Gupta
09/13/2021, 5:39 AMPrateek Gupta
09/13/2021, 5:41 AMPrateek Gupta
09/13/2021, 5:43 AMAnh Nguyen
09/13/2021, 7:38 AMhaf
09/13/2021, 12:23 PMnout
to the task decorator/constructor, or provide a Tuple
return-type annotation to your task.` but I can't use nout
because I don't know the result size and it is not a good fit for Tuple (since it's a list of tuple)haf
09/13/2021, 1:01 PMSam Cook
09/13/2021, 2:08 PMBrian Phillips
09/13/2021, 7:01 PMAWS_CREDENTIALS
secret? I'm having trouble setting credentials so that tasks are able to write with S3Result. This pattern does not seem to work either
aws_credentials = PrefectSecret('AWS_CREDENTIALS')
with prefect.context(secrets={'AWS_CREDENTIALS': aws_credentials}):
<task>
Ishavpreet Singh
09/13/2021, 7:06 PMMax Ivanchenko
09/13/2021, 7:14 PMOwen McMahon
09/13/2021, 7:47 PMmap_index
in the filename so it properly writes out each mapped task to an individual result) within a flow running against Prefect Cloud.
I just came across a weird scenario where the Flow did run the Mapped Task fully through (100 Mapped Tasks in total), but noticed afterwards that 7 of them had a status of 'Cached'. This caught my eye - as it should not have loaded any of them from the Cache. When I looked closer at the logs of one of the 'Cached' Mapped Tasks - it looks like it finished successfully, and then restarted ~7 mins later and loaded from Cache.
It appears that all data is still there as I expected - but behavior seemed a bit odd. Wondering if anyone else has seen this before?
Thanks!Ben Muller
09/13/2021, 8:10 PMKyle McChesney
09/13/2021, 8:28 PMset_dependencies
, but also get its result and pass along to other flows. I.E. mixing and matching imperative vs functional?Kyle McChesney
09/13/2021, 8:28 PMKyle McChesney
09/13/2021, 8:28 PMKyle McChesney
09/13/2021, 8:29 PMset_dependencies
on the actual first tasks, with my startup_task as an upstream
data = flow.set_dependencies(generate_data(data_csv_url), upstream_tasks=[startup()])
Kyle McChesney
09/13/2021, 8:30 PMKyle McChesney
09/13/2021, 8:30 PMdata =
should be the result of generate_data
Kyle McChesney
09/13/2021, 8:31 PMBrian Phillips
09/13/2021, 8:41 PMCluster.stop()
Constantino Schillebeeckx
09/13/2021, 9:09 PMConstantino Schillebeeckx
09/13/2021, 10:14 PMprefect.client.client.Client.get
? For reference, I'm trying to hit he following without success:
[2021-01-14 16:00:00.000] ERROR --- 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
Traceback (most recent call last):
File "deploy/register_flows.py", line 491, in <module>
main()
File "deploy/register_flows.py", line 485, in main
create_proj_and_register_flows(flows, args)
File "deploy/register_flows.py", line 278, in create_proj_and_register_flows
register_flow(flow, flow_file, args)
File "deploy/register_flows.py", line 314, in register_flow
flow_already_registered(flow.name)
File "deploy/register_flows.py", line 319, in flow_already_registered
resp = CLIENT.get('flows', params={"name": flow_name})
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 406, in get
response = self._request(
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 710, in _request
response = self._send_request(
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/prefect/client/client.py", line 620, in _send_request
response.raise_for_status()
File "/Users/constantino.schillebeekx/.pyenv/versions/dwh/lib/python3.8/site-packages/requests/models.py", line 943, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <https://api.prefect.io/flows?name=dbt_test>
Brad
09/14/2021, 2:55 AMCached
despite not being complete. I wonder if this is the expected result or a bug. Example in theadGaylord Cherencey
09/14/2021, 5:11 AMA
,B
and C
I want flow B
and C
to be triggered each time flow A
is run successfully but I want to define this dependency in flow B
and C
because the "parent" will be owned by another team. Is there a way to do this in Prefect or if we would have to go for an eventing solution?Jacob Blanco
09/14/2021, 7:05 AMwith case(...)
not if case
.
Having a strange issue where when running a flow it warns that a Parameter was declared but not added the Flow, and when I try to run the flow with the parameters established it errors out saying that the parameters are unexpected.
This is the structure of the flow:
with Flow(name="My Flow") as flow:
if case(Parameter("do_something", default=False), True):
result_of_thing = run_thing()
else:
result_of_thing = run_another_thing()
from flow_definition import flow
flow.run(parameters={"do_something": True})
Hugo Polloli
09/14/2021, 7:35 AM@task(state_handlers=[my_handler])
def my_task(my_input):
self.message = f"Input {my_input} wasn't ok"
assert input_is_ok(my_input)
self.message = f"Failed to do_some_things for {my_input}"
res = do_some_things(my_input)
self.message = f"Failed to do_a_few_more for {my_input}"
res2 = do_a_few_more(res)
return res2
I understand I could use signals, using the value property of a signal to store what I want to print, but that would mean wrapping each row I'm performing an action in inside a try/except block, then raising the signal inside the except block, I've done that and it gets really cluttered
I maybe missed something obvious and am going in the wrong direction though...Issam Assafi
09/14/2021, 7:43 AMshekhar koirala
09/14/2021, 8:01 AM@task(max_retries=3, retry_delay=datetime.timedelta(minutes=10))
def fetch_data(count,check):
data = []
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(data)
return data
def run():
with Flow("fetch-data") as local_flow:
count = Parameter("count", default=100)
check = Parameter("limit", default=10)
fetch_data(count,check)
local_flow.register(project_name="test")
id = local_flow.run()
print(id)
if __name__ == "__main__":
run()
Lucas Beck
09/14/2021, 8:20 AMETL
like jobs to run, some on schedule and some on demand. We usually break down big computing tasks in smaller parts, each contained in its own docker
container. These tasks, when combined, form a computation DAG. This DAG can mostly be parallelized (mapped). We then leverage an Azure Kubernetes Service (AKS) cluster to horizontally scale the compute jobs into hundrends, sometimes thousands of containers. So far we have been using our own internal tool to orchestrate tasks and manage dependencies, and that is what we are currently investigating if we can replace with prefect. Our prefect setup has a self hosted server instead being the cloud solution.
Challenge 1
The first and most important challenge we are facing regards scaling jobs. I will use the flow we tried to migrate to prefect as the example here. I am attaching the DAG schematic as a picture. In this flow, each task use prefect's RunNamespacedJob
to spin up a new job that has exactly one pod, which will perform a given computation. We then use other functions from the prefect SDK to read the logs after completion and delete the kubernetes jobs. The whole DAG can be run in parallel (mapped) for multiple inputs. Usually the job works fine for 3-5 inputs, but as soon as we try to scale to as little as 30 inputs we start seeing a ton of heartbeat errors or no pod statuses (also attaching an image here) . Once running with our internal tool, we already scaled up to around 500 inputs in parallel in the same cluster setup. Anyone else has experienced this? By the way, we are using the DaskExecutor
for that with default parameters
Challenge 2
We currently have a single repo where we want to keep all prefect flows under version control. In order to reuse code, we also made it a package that can be imported around. The structure looks like this:
prefect-flows/
setup.py
requirements.txt
prefectflows/
flows/
flow1.py
flow2.py
...
core/
kubernetes.py
...
The idea is that the actual flows sit under the flow
folder and can import boiler plate/ utils from the core
folder. For instance, under kubernetes.py
are all functions that use prefect's SDK to interact with kubernetes. An example function is run_job
which will run a job, read logs and eventually delete that job.
This all works fine if running locally, but fails once registering a flow and trying to run it through the server. The error is :
Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'prefectflows.core\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
It is important to mention that I have dockerized the package using the prefect base image, and I am telling prefect to run the flow with that image. Something like the following:
Flow(
flow_name,
storage=Azure(
container=container,
stored_as_script=False,
connection_string=azure_connection_string,
),
run_config=KubernetesRun(
image="imagewithourpackageinstalled:latest",
...
Maybe I misunderstood where the unpickling is happening, but would expect that if the flow uses an image that has all packages used when registering that flow, then that should work.
I think that summarizes it, but please let us know if you need further description or to share more of the code.
Thanks in advance for your help!