Nikita Samoylov
08/05/2022, 11:05 AMMilan Valadou
08/05/2022, 12:47 PMMuddassir Shaikh
08/05/2022, 1:49 PMFlorian Guily
08/05/2022, 1:55 PMViet Nguyen
08/05/2022, 2:04 PMprefect-email
to send some dummy test email (I know there's prefect notifications features) but I want test out prefect-email
as well, got this error everytime, hard coded password for dummy test, can't be a wrong password 🤔 smtplib.SMTPAuthenticationError: (535, b'5.7.8 Username and Password not accepted. Learn more at\n5.7.8 <https://support.google.com/mail/?p=BadCredentials> d6-20020a170903230600b0016efc27ca98sm3023696plh.169 - gsmtp'
Thank youChu
08/05/2022, 2:17 PMEvan Curtin
08/05/2022, 2:22 PMResult
like implementation for 2.0, but I can’t find anything in the docs. Closest thing I am seeing is FileSystems
but I don’t see example usage of passing data between tasks using a custom persistence layerTony Yun
08/05/2022, 3:35 PMfile not found
. So I cannot store it in /tmp
and process later in this task?Keith
08/05/2022, 3:51 PMprefect-gcp
to do it, but when I combine this with block information it seems like the info is not in the correct format.
gcs_block = GCS.load("gcs-dev")
@flow()
def example_cloud_storage_upload_blob_from_file_flow():
gcp_credentials = GcpCredentials(service_account_info=gcs_block.service_account_info)
test_upload_file = "test_upload.txt"
blob = cloud_storage_upload_blob_from_file(test_upload_path, gcs_block.bucket_path, "test_upload.txt", gcp_credentials)
return blob
Seth Goodman
08/05/2022, 4:31 PM@task
def actual_task(arg1, arg2, arg3):
#does stuff
task_list = [
(1, "a", "b"),
(2, "c", "d"),
(3, "e", "f"),
(4, "g", "h"),
]
def task_map(task):
return actual_task(task[0], task[1], task[2])
with Flow("my_flow") as flow:
task_results = apply_map(task_map, task_list)
Bruno Grande
08/05/2022, 6:07 PMViet Nguyen
08/05/2022, 6:15 PMRajvir Jhawar
08/05/2022, 6:17 PMJohn Kang
08/05/2022, 7:25 PMtask.fn(function_to_call())
but that doesn't work as I get this error: AttributeError: 'function' object has no attribute 'fn'
`RuntimeError: Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use task.fn()
.`Andrew Richards
08/05/2022, 7:37 PMretries
parameter to the flow itself doesn't appear to work when I deliberately supply a bad shell command.Javier Ochoa
08/05/2022, 7:39 PMflow.storage = S3(
bucket=DEPLOYMENT_BUCKET, stored_as_script=False, add_default_labels=False
)
flow.register(
PROJECT_NAME,
add_default_labels=False,
idempotency_key=flow.serialized_hash(),
)
Bruno Grande
08/05/2022, 8:29 PM.submit
after my selection in the attached screenshot? This comes up in the docs here. I thought you needed to use .submit()
in order to obtain a future. Just wanted to check if this is a typo.Corris Randall
08/05/2022, 8:41 PMfrom typing import Optional
from prefect.utilities.asyncutils import sync_compatible
from prefect.blocks.notifications import NotificationBlock
class MyEmail(NotificationBlock):
_block_type_name = "My Email"
_block_type_slug = "my-email"
_block_schema_capabilities = ["notify"]
@sync_compatible
async def notify(self,body: str,subject: Optional[str] = None):
await print( f"In my email notify subject: {subject}\nbody: {body}" )
Kevin Grismore
08/05/2022, 9:18 PM- project
└── flows
└── flow1.py
└── flow2.py
└── util
└── util.py
if I do some/dir/project> prefect deployment build flows/flow1.py:flow_func -n my-flow -ib kubernetes-job/my-job -sb gcs/my-bucket -t k8s
everything in src ends up in my bucket as expected, but when I run the flow I get:
FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/flows/flow1.py'
Keith
08/06/2022, 12:59 AMupstream_tasks
parameter that you could pass to tasks so that each task knew to wait for the previous one to run. Through my reading of the documentation it seems like this is not necessary anymore b/c everything should run like it would in Python so it basically defaults to a sequential executor. Is this the correct logic?
Obviously this story changes a bit when adding in the different Task Runners
but just wanted to confirm that using default code blocks that tasks run in sequence and won't run the next task until the previous one is complete.Benoit Chabord
08/06/2022, 7:17 AMJan Domanski
08/06/2022, 10:24 AM10:21:24.290 | INFO | prefect.agent - Submitting flow run 'cfc4f262-4f05-4685-882e-364192297107'
10:21:24.474 | INFO | prefect.infrastructure.process - Opening process 'blond-mammoth'...
10:21:24.482 | INFO | prefect.agent - Completed submission of flow run 'cfc4f262-4f05-4685-882e-364192297107'
10:21:27.334 | ERROR | Flow run 'blond-mammoth' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 373, in get_directory
return await self.filesystem.get_directory(
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/prefect/filesystems.py", line 251, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 801, in get
self.get_file(rpath, lpath, **kwargs)
File "/opt/micromamba/envs/main/lib/python3.8/site-packages/fsspec/spec.py", line 769, in get_file
outfile = open(lpath, "wb")
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp3er_ugnvprefect/S3-BUCKET-NAME/alpha/flow.py'
10:21:27.727 | INFO | prefect.infrastructure.process - Process 'blond-mammoth' exited cleanly.
...
$ aws s3 ls <s3://S3-BUCKET-NAME/alpha/>
2022-08-06 10:20:49 6473 flow.py
2022-08-06 10:20:49 3204 example_flow-manifest.json
Created via
# prefect deployment build ./flow.py:example_flow --name example-flow-alpha --tag alpha --storage-block s3/S3-BUCKET-NAME
# prefect deployment apply example-flow-alpha.yaml
Had mixed luck reading and searching similar posts with this error messageRio McMahon
08/06/2022, 11:53 PMYardena Meymann
08/07/2022, 7:09 AMViet Nguyen
08/07/2022, 1:57 PMHafsa Junaid
08/07/2022, 8:48 PMRajvir Jhawar
08/08/2022, 2:28 AMFelix Sonntag
08/08/2022, 7:46 AMVadym Dytyniak
08/08/2022, 7:49 AMjaehoon
08/08/2022, 9:01 AMjaehoon
08/08/2022, 9:01 AMAnna Geller
08/08/2022, 9:21 AMAnton L.
08/08/2022, 3:50 PMAnna Geller
08/08/2022, 7:42 PMAnton L.
08/08/2022, 9:14 PMAnna Geller
08/09/2022, 10:29 AMBen Hammond
08/31/2022, 2:24 AMAnna Geller
08/31/2022, 5:01 AMBen Hammond
08/31/2022, 6:28 AMAnna Geller
08/31/2022, 10:15 AMBen Hammond
08/31/2022, 1:01 PMAnna Geller
08/31/2022, 1:13 PMwhere the flow is baked into the image itselfI'd recommend checking the latest release: https://medium.com/the-prefect-blog/prefect-2-3-0-adds-support-for-flows-defined-in-docker-images-and-github-repositories-79a8797a7371
would a more helpful path connected to Prefect’s intended path forward be to contribute other non-cloud storage blocks (like maybe SFTP or SCP, etc)?Yes, 100%, you are spot on here -- SFTP is a storage block to which we could write to and read from; GitHub and GitLab are really no storage systems, those are version control and engineering collaboration platforms so SFTP would be a much nicer way of solving the problem for on-prem deployments. I'd definitely love to see a contribution for that if you would want to submit a PR
Ben Hammond
08/31/2022, 1:27 PM