Ofir
08/26/2022, 3:57 PMwith Flow("Data preprocess" as data_preprocess_flow:
...
with Flow("Visualization" as visualization_flow:
...
vs:
with Flow("Monolithic data pipeline") as monolithic_flow:
res = data_preprocess(dataframe)
res = train(res)
...
Ofir
08/26/2022, 3:58 PMNathaniel Russell
08/26/2022, 4:35 PMstorage = S3(
bucket=s3_bucket_name,
key=f"flows/banner_canvas_flow.py",
stored_as_script=True,
local_script_path=f"flows/banner_canvas_flow.py"
)
In prefect 1.0 I could store my task code in an S3 bucket and use that implementation of the tasks whenever I ran a flow with storage=storage. How would I accomplish this in prefect 2.0?Philip MacMenamin
08/26/2022, 5:05 PMcontext
value on a per run basis?Matt Delacour
08/26/2022, 5:15 PM_or
statement in GraphQL?
Cannot make it work on my side.
There is nothing about _or
or _and
in the doc nor in GithubNeil Natarajan
08/26/2022, 7:06 PMRuntimeError: Orion requires sqlite >= 3.24.0 but we found version 3.7.17
how do I go about fixing this errorLeon Kozlowski
08/26/2022, 7:46 PMDockerStorage
and KubernetesRun
- I have been looking through previous threads and it seems I should be using DockerPackager
, but it looks like a Deployment
doesnโt accept the packager
argument anymore - is there guidance on migrating from a DockerStorage/Kubernetes run workflow to 2.0?Kevin Grismore
08/26/2022, 7:56 PMKeyError: "No class found for dispatch key 'gcs' in registry for type 'Block'."
when trying to run a flow stored on GCS using a GCS storage block?Ilya Galperin
08/26/2022, 8:30 PM/usr/local/lib/python3.10/site-packages/prefect/deployments.py:48: UserWarning: Block document has schema checksum sha256:0ec43f8010cee4adbf73aebcc58f1e45986d765c2a224dfc9cd5428f98c516f8 which does not match the schema checksum for class 'S3'. This indicates the schema has changed and this block may not load.
storage_block = Block._from_block_document(storage_document)
Deleting and re-creating the block and deployment will sometimes cause it to work again but again, only on an intermittent basis. The flow code itself does not touch or interact with Prefect block storage. Has anyone experienced this or have any idea what might be causing the issue?Jai P
08/26/2022, 8:37 PMSeth Coussens
08/26/2022, 8:55 PMBrandon T. Kowalski
08/26/2022, 9:19 PM20:53:32.166 | INFO | prefect.agent - Submitting flow run 'adf8ae6e-675d-40dd-8a32-078aecbc248b'
20:53:32.359 | ERROR | prefect.agent - Failed to submit flow run 'adf8ae6e-675d-40dd-8a32-078aecbc248b' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/agent.py", line 200, in submit_run
await self.task_group.start(submit_flow_run, flow_run, infrastructure)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 807, in start
return await future
File "/usr/local/lib/python3.9/site-packages/prefect/infrastructure/submission.py", line 47, in submit_flow_run
infrastructure = _prepare_infrastructure(flow_run, infrastructure)
File "/usr/local/lib/python3.9/site-packages/prefect/infrastructure/submission.py", line 35, in _prepare_infrastructure
"env": {**base_flow_run_environment(flow_run), **infrastructure.env},
AttributeError: 'S3' object has no attribute 'env'
Any pointers?Chris Gunderson
08/26/2022, 9:36 PMfrom prefect.tasks.notifications import SlackTask
class Alert:
def alert_on_special_failure(task, old_state, new_state):
if new_state.is_failed():
if getattr(new_state.result, "flag", False) is True:
errMsg = '--- LOADER ERROR ---'
msg = "{}\nTask: `{}` FAILED.\nThe loader process failed: `{}`".format(
errMsg, task.name, new_state.result.value)
SlackTask().run(message = msg, webhook_secret = "SLACK_PREFECT_DEV")
return new_state
This was previous called like this:
@task(name = 'send API request',
max_retries = 3,
retry_delay = timedelta(minutes = 5),
state_handlers = [alert_on_special_failure])
def post_request_process(
If we add this alert on special failure object, will we need to pass in the task, old_state, and new_state? Were these variables global?Hedgar
08/27/2022, 7:41 AMOfir
08/27/2022, 11:14 AMgit clone --depth 1 <https://github.com/PrefectHQ/prefect.git>
cd prefect/examples/tutorial
Ofir
08/27/2022, 11:14 AMOfir
08/27/2022, 11:15 AMofir@Ofirs-MacBook-Pro-2 prefect % ls examples/
ls: examples/: No such file or directory
ofir@Ofirs-MacBook-Pro-2 prefect %
Ofir
08/27/2022, 11:28 AMofir@Ofirs-MacBook-Pro-2 prefect % git checkout 1.x
Branch '1.x' set up to track remote branch '1.x' from 'origin'.
Switched to a new branch '1.x'
ofir@Ofirs-MacBook-Pro-2 prefect % ls examples
conditional.py kafka mapping.py old parameters.py tutorial
ofir@Ofirs-MacBook-Pro-2 prefect % ls examples/tutorial
01_etl.py 03_parameterized_etl_flow.py 05_schedules.py aircraftlib
02_etl_flow.py 04_handle_failures.py 06_parallel_execution.py requirements.txt
ofir@Ofirs-MacBook-Pro-2 prefect %
Ofir
08/27/2022, 11:29 AMgit clone --branch 1.x --depth 1 <https://github.com/PrefectHQ/prefect.git>
cd prefect/examples/tutorial
Keith Hickey
08/27/2022, 2:15 PMVenkat Ramakrishnan
08/28/2022, 4:04 AMAnat Tal Gagnon
08/28/2022, 10:54 AMAnat Tal Gagnon
08/28/2022, 11:02 AMBenson Mwangi
08/28/2022, 5:17 PMibrahem
08/29/2022, 7:19 AMFaheem Khan
08/29/2022, 7:50 AMAndreas Nord
08/29/2022, 8:01 AMEnrique
08/29/2022, 9:20 AMHedgar
08/29/2022, 11:27 AMto_csv()
method. The path was created by pathlib:
file-path = Path("data/fresh-data.csvโ)
file-path.parent.mkdir(parents=True,exist_ok=True)
Why can't I see fresh data, is there an extra thing am missing. Just trying stuff with prefect 2 on my local systemBrandon T. Kowalski
08/29/2022, 12:16 PMBrandon T. Kowalski
08/29/2022, 12:16 PMJeff Hale
08/29/2022, 3:32 PM