David Anderson
06/16/2022, 9:02 PMAirbyteConnectionTask
seeing a new error starting ~last week? all of my runs are failing with Error during execution of task: KeyError('schedule')
. haven't changed a thing in my prefect flow configuration. thought maybe it was related to an airbyte upgrade (im running the self-hosted version), but im struggling to troubleshoot. any ideas?Matt Alhonte
06/16/2022, 10:47 PMOverview
tab in the GUI for a given flow always display the actual date instead of "Last Sunday" or whatever?Matt Alhonte
06/16/2022, 11:02 PMBen Muller
06/16/2022, 11:59 PMprefect run -p flows/harness_racing_victoria/harness_racing_victoria_results.py
error trace in 🧵
Any ideas?George Shishorin
06/17/2022, 2:05 AMNo module named 'pandas'
occurred. Flow B is ok.
So the question is: what is the best practice to register flows with different environments and dependencies?
Hope for your support and thank you!Marius Haberstock
06/17/2022, 8:20 AMmarque
06/17/2022, 9:09 AMMichael Maletich
06/17/2022, 10:55 AMDung Khuc
06/17/2022, 12:05 PMTaskA: resultA -> TaskB: resultB -> TaskC: resultC
if task B fails, I want to run:
UndoTaskA (resultA)
if task C fails, I want to run:
UndoTaskB (resultB) -> UndoTaskA (result A)
Michiel Verburg
06/17/2022, 12:53 PM@task
def for_loop_B(category):
for item_i in category.items:
load_item(item_i)
store_item(item_i)
@flow
def for_loop_A()
categories = retrieve_all_categories()
for category_i in categories:
for_loop_B(category_i)
Doing the above seemed wrong, because also (by default at least) tasks or flows would fail just because one internal step failed. Additionally, the processing of the categories is fully independent. So I thought, for_loop_B
should also be a flow, and load_item
and store_item
should be tasks for example. However, I got confused because of what the docs mention: “Unlike tasks, subflows will block until completion with all task runners.“.
I want the processing of items within a category to happen sequentially, but multiple categories can be processed in parallel, how can I make that happen? Also, can tasks be nested within tasks for that matter?Oscar Krantz
06/17/2022, 1:33 PMHalvar Trøyel Nerbø
06/17/2022, 2:02 PMXavier Babu
06/17/2022, 2:11 PMJosh Paulin
06/17/2022, 2:42 PMPREFECT__CLOUD__USE_LOCAL_SECRETS
and PREFECT__CONTEXT__SECRETS__GITLAB_ACCESS_TOKEN
set on my agent, but not on the job. Trying to run the flow just errors out at
Failed to load and execute flow run: KeyError('The secret GITLAB_ACCESS_TOKEN was not found. Please ensure that it was set correctly in your tenant: <https://docs.prefect.io/orchestration/concepts/secrets.html>')
Florian Guily
06/17/2022, 3:07 PMAlfred Martinez
06/17/2022, 3:32 PMBenny Warlick
06/17/2022, 3:41 PMjack
06/17/2022, 5:23 PMChristian Nuss
06/17/2022, 5:38 PMDeepak Pilligundla
06/17/2022, 7:03 PMHemabh Kamboj
06/17/2022, 9:15 PMprefect_flow_run_id
and prefect_flow_version
and inserting those params in the dbMatt Alhonte
06/17/2022, 10:27 PMWilliam Jamir
06/18/2022, 7:31 AMupstream_tasks
Does someone know what I’m doing wrong?
My intention is to execute task_3
only after task_1
and `task_2`is finished (since it depends on the output of them) and start task_4
only after the task_3
is finished
How can I accomplish that?
(Code and output error in thread)Daniel
06/19/2022, 3:44 AMprefecthq/prefect:latest-python3.10
docker image with some additional packages including the snowflake python connector and it's dependencies. Flows without snowflake interactions work perfectly when run from Prefect cloud however my EL flows which insert data into snowflake tables fail to load and execute returning >ModuleNotFoundError("No module named 'snowflake'").
Haven't been able to fine any similar reports among the community so wondering if someone could suggest what I may have done wrong.
With thanks,
Daniel.Xavier Witdouck
06/19/2022, 4:45 PMmarque
06/20/2022, 3:44 AMprefect.exceptions.ClientError: [{'path': ['create_project'], 'message': 'Uniqueness violation.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
when I ran prefect delete project
--> prefect create project
(using Prefect v1.0, with Prefect Cloud as backend). Can't seem to recreate a project with the same nameBharadwaj Yadati
06/20/2022, 5:41 AMTarek
06/20/2022, 8:23 AMMichal Zawadzki
06/20/2022, 10:39 AMdev_queue
with the label dev
and supporting all flow runners. It's the only work queue I have set up.
2. I ran an agent with prefect agent start dev_queue
3. I created a deployment with the tag `dev`:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
I verified in the UI that it has the right tag and flow runner.
However, when I run the deployment, the flow run is never picked up.
One suspicious thing I noticed is that the flow run doesn't inherit the dev
tag from the deployment (although I don't know if it's a bug or a feature).
When I remove all labels from the work queue, the flow runs are picked up correctly.Michal Zawadzki
06/20/2022, 10:52 AMflow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running prefect deployment create my_deployment.yaml
. My deployment looks like this:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
flow_runner:
type: DockerFlowRunner
config:
image: viadot:orion
Unfortunately the flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.Michal Zawadzki
06/20/2022, 10:52 AMflow_runner
in the deployment? I get `ValueError: Unregistered flow runner 'DockerFlowRunner'`when running prefect deployment create my_deployment.yaml
. My deployment looks like this:
name: test_platform_flow_first_deployment
flow_name: Data Platform Demo
flow_location: ./test_platform_flow.py
parameters:
to_print: "Hello from first deployment!"
tags:
- dev
flow_runner:
type: DockerFlowRunner
config:
image: viadot:orion
Unfortunately the flow_runner
config is not documented anywhere so it's hard for me to say if I'm specifying it incorrectly or it's not supported at all.Anna Geller
06/20/2022, 10:56 AMimport platform
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import DockerFlowRunner
...
@flow
def hello_flow():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(name="dev", flow=hello_flow, flow_runner=DockerFlowRunner())
if __name__ == "__main__":
hello_flow()
flow_runner: DockerFlowRunner
Michal Zawadzki
06/20/2022, 11:05 AMAnna Geller
06/20/2022, 11:08 AMfrom prefect.deployments import DeploymentSpec
from prefect.flows import Flow
from prefect.orion.schemas.schedules import SCHEDULE_TYPES
# from prefect.flow_runners import DockerFlowRunner
from typing import Any, Dict, List
from flows.async_flow import async_flow
from flows.crypto_prices_etl import crypto_prices_etl
from flows.repo_trending_check import repo_trending_check
def set_deployment_spec(
flow: Flow,
deployment_name_suffix: str = "dev",
schedule: SCHEDULE_TYPES = None,
parameters: Dict[str, Any] = None,
tags: List[str] = None,
) -> DeploymentSpec:
deploy_tags = (
[deployment_name_suffix] if tags is None else [deployment_name_suffix, *tags]
)
return DeploymentSpec(
flow=flow,
name=f"{flow.name}_{deployment_name_suffix}",
schedule=schedule,
tags=deploy_tags,
parameters=parameters,
# flow_runner=DockerFlowRunner()
)
set_deployment_spec(async_flow)
set_deployment_spec(crypto_prices_etl)
set_deployment_spec(repo_trending_check)
set_deployment_spec(
repo_trending_check,
deployment_name_suffix="orion_dev",
parameters=dict(repo="orion"),
)
set_deployment_spec(crypto_prices_etl)
Marvin
06/20/2022, 11:12 AMMichal Zawadzki
06/20/2022, 11:17 AMDeploymentSpec
inside.set_deployment_spec()
although it seems like you then need to check that user passed "dev" in the "deployment-dev.py" and "prod" in "deployment-prod.py"?Anna Geller
06/20/2022, 11:30 AMMichal Zawadzki
06/23/2022, 11:26 PMhttpx.HTTPStatusError: Client error '400 Bad Request'
when adding image: "my_image:my_tag"
under the config
key.
It's interesting because I can get that same flow manuallly from the agent's environment with
import prefect
c = prefect.client.get_client()
run = await c.read_flow_run("0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc")
print(run.flow_runner.config.get("image"))
without error.
Full Traceback:
23:00:21.814 | ERROR | prefect.engine - Engine execution of flow run '0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc' exited with unexpected exception
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 985, in <module>
enter_flow_run_engine_from_subprocess(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 130, in enter_flow_run_engine_from_subprocess
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 646, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 95, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 190, in retrieve_flow_then_begin_flow_run
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 1204, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1751, in get
return await self.request(
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.10/site-packages/prefect/client.py", line 233, in send
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/httpx/_models.py", line 736, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url '<https://api-beta.prefect.io/api/accounts/1d7a71e3-4d77-4615-b3cf-966c2cedb752/workspaces/9d26098f-f680-43c8-b327-a34ea72f15b2/flow_runs/0e8f9e7d-b42b-4d0b-83cc-5dc89595f2bc>'
For more information check: <https://httpstatuses.com/400>
01:00:22.596 | INFO | prefect.flow_runner.docker - Flow run container 'smooth-serval' has status 'exited'