Bruno Grande
08/05/2022, 6:07 PMViet Nguyen
08/05/2022, 6:15 PMRajvir Jhawar
08/05/2022, 6:17 PMJohn Kang
08/05/2022, 7:25 PMtask.fn(function_to_call())
but that doesn't work as I get this error: AttributeError: 'function' object has no attribute 'fn'
`RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use task.fn()
.`Andrew Richards
08/05/2022, 7:37 PMretries
parameter to the flow itself doesn't appear to work when I deliberately supply a bad shell command.Javier Ochoa
08/05/2022, 7:39 PMflow.storage = S3(
bucket=DEPLOYMENT_BUCKET, stored_as_script=False, add_default_labels=False
)
flow.register(
PROJECT_NAME,
add_default_labels=False,
idempotency_key=flow.serialized_hash(),
)
Bruno Grande
08/05/2022, 8:29 PM.submit
after my selection in the attached screenshot? This comes up in the docs here. I thought you needed to use .submit()
in order to obtain a future. Just wanted to check if this is a typo.Corris Randall
08/05/2022, 8:41 PMfrom typing import Optional
from prefect.utilities.asyncutils import sync_compatible
from prefect.blocks.notifications import NotificationBlock
class MyEmail(NotificationBlock):
_block_type_name = "My Email"
_block_type_slug = "my-email"
_block_schema_capabilities = ["notify"]
@sync_compatible
async def notify(self,body: str,subject: Optional[str] = None):
await print( f"In my email notify subject: {subject}\nbody: {body}" )
Kevin Grismore
08/05/2022, 9:18 PM- project
āāā flows
āāā flow1.py
āāā flow2.py
āāā util
āāā util.py
if I do some/dir/project> prefect deployment build flows/flow1.py:flow_func -n my-flow -ib kubernetes-job/my-job -sb gcs/my-bucket -t k8s
everything in src ends up in my bucket as expected, but when I run the flow I get:
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows/flow1.py'
Keith
08/06/2022, 12:59 AMupstream_tasks
parameter that you could pass to tasks so that each task knew to wait for the previous one to run. Through my reading of the documentation it seems like this is not necessary anymore b/c everything should run like it would in Python so it basically defaults to a sequential executor. Is this the correct logic?
Obviously this story changes a bit when adding in the different Task Runners
but just wanted to confirm that using default code blocks that tasks run in sequence and won't run the next task until the previous one is complete.Benoit Chabord
08/06/2022, 7:17 AMJan Domanski
08/06/2022, 10:24 AM10:21:24.290 | INFO | prefect.agent - Submitting flow run 'cfc4f262-4f05-4685-882e-364192297107'
10:21:24.474 | INFO | prefect.infrastructure.process - Opening process 'blond-mammoth'...
10:21:24.482 | INFO | prefect.agent - Completed submission of flow run 'cfc4f262-4f05-4685-882e-364192297107'
10:21:27.334 | ERROR | Flow run 'blond-mammoth' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 373, in get_directory
return await self.filesystem.get_directory(
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 251, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 801, in get
self.get_file(rpath, lpath, **kwargs)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 769, in get_file
outfile = open(lpath, "wb")
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp3er_ugnvprefect/S3-BUCKET-NAME/alpha/flow.py'
10:21:27.727 | INFO | prefect.infrastructure.process - Process 'blond-mammoth' exited cleanly.
...
$ aws s3 ls <s3://S3-BUCKET-NAME/alpha/>
2022-08-06 10:20:49 6473 flow.py
2022-08-06 10:20:49 3204 example_flow-manifest.json
Created via
# prefect deployment build ./flow.py:example_flow --name example-flow-alpha --tag alpha --storage-block s3/S3-BUCKET-NAME
# prefect deployment apply example-flow-alpha.yaml
Had mixed luck reading and searching similar posts with this error messageRio McMahon
08/06/2022, 11:53 PMYardena Meymann
08/07/2022, 7:09 AMViet Nguyen
08/07/2022, 1:57 PMHafsa Junaid
08/07/2022, 8:48 PMRajvir Jhawar
08/08/2022, 2:28 AMFelix Sonntag
08/08/2022, 7:46 AMVadym Dytyniak
08/08/2022, 7:49 AMjaehoon
08/08/2022, 9:01 AMHa Pham
08/08/2022, 9:11 AMHa Pham
08/08/2022, 9:44 AMtag_1
, tag_2
, and the deployment only has tag_1
, it wont be picked up
⢠If I modify the deployment to have both tag_1
, tag_2
, it will be picked up
⢠If I add another tag_3
to the deployment, the deployment is also picked up
Is this the expected behavior?Iuliia Volkova
08/08/2022, 10:31 AMHa Pham
08/08/2022, 10:43 AMname_a
already existed with some modified configs, and now I run deployment apply
on the same original deployment file, I will lose all of the configs. Is there any way to avoid this, or what's the best practice when handling deployments?Raviraj Dixit
08/08/2022, 10:43 AMTim Helfensdƶrfer
08/08/2022, 11:16 AM@flow
it runs slower by a factor of 2-10x. This is our test setup:
def run_flow():
calculate_something()
@flow(
name=FLOW_NAME,
task_runner=get_default_task_runner(),
version=get_file_hash(__file__),
timeout_seconds=get_default_timeout(),
)
def run_prefect_flow():
global USE_PREFECT_LOGGER
USE_PREFECT_LOGGER = True
run_flow()
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--no-prefect":
# Normal performance
run_flow()
else:
# Bad performance
run_prefect_flow()
We can't share any code from inside calculate_something
- are there any circumstances that you know of where this might happen? What overhead brings `@flow`into play? Does it analyze http or DB requests for debugging purposes which might explain the performance degradation?
What I can offer is a pstats profile/graph in a dm because it also may contain sensitive data.
*currently = as long as we can remember using prefect 2.Yury Cheremushkin
08/08/2022, 12:14 PM.set_upstream()
method in 1.0. But it looks like there are no more such method. I.e. there are two tasks: loading data into some BigQuery table and then merging it with another table. Obviously these tasks need to be run in exact order. But there are no need to pass any results from the first task to the second, so it wasnāt obvious for Prefect 1.0 that there is some kind of order. Thatās why I used .set_upstream()
. What should i do now?
And the third question is about .map()
for tasks. So, now i should just use standard pythonic map? Will it be parallelized in the same manner as it was in Prefect 1.0 with Concurrent/Parallel task runner?Chu
08/08/2022, 1:10 PMc_flow.py
which orchestrates b_flow.py
and a_flow.py
, when we register all three flows together under this folder, will Prefect know the order of register? (like need to register b_flow and a_flow first, and then register c_flow)Oscar Bjƶrhn
08/08/2022, 1:52 PMFelix Sonntag
08/08/2022, 2:11 PMPREFECT_LOGGING_EXTRA_LOGGERS
and it works, but for example it does not cover logs, like coming from the Tensorflow CPP logs, as e.g.:
2022-08-08 13:59:27.431966: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-08-08 13:59:27.432037: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Plus, itās kind of tedious collecting all the libs I want to include.
2. When starting a flow run, many valuable logs and especially errors are only visible on the work queue. So if e.g. I misconfigured a block and the deployment run fails to start, an end user cannot see the error message, since the queue logs are not visible. E.g. one would need to check the work queue Kubernetes pod and manually filter out the error logs for yourself. A user might not have access and with many flow runs, this is also hard.
Am I missing something, or how should one work with that properly?