Ben Muller
11/02/2022, 8:06 AMDenys Volokh
11/02/2022, 8:17 AMwonsun
11/02/2022, 8:36 AMFlowRunTask
to StartFlowRun
(reference link) among the methods written in that article to create the same situation. Below attached scripts, it is the case that two flows(flow_waveforms and flow_pipeline) that are executed well have already been created, and parent_flow is created to sequentially execute those flows.
# parent_flow.py
import prefect
from prefect.tasks.prefect.flow_run import StartFlowRun
with Flow('mother_flow') as flow:
first_job = StartFlowRun(flow_name='flow_waveforms',
project_name='InterFlow_Dependencies',
wait=True)
second_job = StartFlowRun(flow_name='flow_pipeline',
project_name='InterFlow_Dependencies',
wait=True)
first_job.set_downstream(second_job)
if __name__ == '__main__':
flow.register("InterFlow_Dependencies")
# flow_waveforms.py
import prefect
from prefect import Flow, task
@task
def task1():
something
return one, two
@task
def task2():
something
with Flow('flow_waveforms') as flow:
var1, var2 = task1()
finish = task2()
flow.run_config = UniversalRun(env={'PREFECT__CLOUD__HEARTBEAT_MODE':'thread'})
flow.register('InterFlow_Dependencies')
# flow_pipeline.py
import prefect
from prefect import Flow, task
@task
def task3():
something
with Flow('flow_pipelines') as flow:
task3()
flow.register('InterFlow_Dependencies')
If i excuted mother_flow.py in this case, prefect responds following error.(image)
Any idea where I went wrong? Or could you tell me if there is an easy way to define dependencies between flows other than this way?
Thx.pk13055
11/02/2022, 10:09 AMorion
+ agent
container setup locally, and everything is working as expected when I try to run the flows manually. I was able to successfully create and apply a deployment as well (from within the cli
container). However, when trying to run the deployment I am faced with a File not found error
.
Here's the traceback:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "<frozen importlib._bootstrap_external>", line 879, in exec_module
File "<frozen importlib._bootstrap_external>", line 1016, in get_code
File "<frozen importlib._bootstrap_external>", line 1073, in get_data
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmp_zx6qpglprefect/flows/fetch_etfs.py'
My docker-compose
is as follows:
version: "3.9"
services:
db:
image: timescale/timescaledb:latest-pg14
volumes:
- $PWD/data/db:/var/lib/postgresql/data
- $PWD/config/db:/docker-entrypoint-initdb.d/
healthcheck:
test: [ "CMD-SHELL", "pg_isready" ]
interval: 10s
timeout: 5s
retries: 5
environment:
- POSTGRES_DB=$POSTGRES_DB
- POSTGRES_USER=$POSTGRES_USER
- POSTGRES_PASSWORD=$POSTGRES_PASSWORD
networks:
- db_network
orion:
image: prefecthq/prefect:2.6.3-python3.10
restart: always
volumes:
- $PWD/prefect:/root/.prefect
entrypoint: [ "prefect", "orion", "start" ]
environment:
- PREFECT_ORION_API_HOST=0.0.0.0
- PREFECT_ORION_DATABASE_CONNECTION_URL=$PREFECT_DB_URL
ports:
- 4200:4200
depends_on:
- db
networks:
- prefect_network
- db_network
agent:
build: ./prefect
restart: always
entrypoint: [ "prefect", "agent", "start", "-q", "main_queue" ]
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
cli:
build: ./prefect
entrypoint: "bash"
working_dir: "/root/flows"
volumes:
- "$PWD/prefect:/root"
environment:
- PREFECT_API_URL=<http://orion:4200/api>
networks:
- prefect_network
- db_network
networks:
db_network:
web_network:
prefect_network:
To create and apply the deployment(s):
> docker compose run cli
$ prefect deployment create ./flows/fetch_etf.py:flow_name --name flow_name
$ prefect deployment apply deployment.yaml
Anna Geller
11/02/2022, 4:13 PMJohn Kang
11/02/2022, 5:18 PMJiri Klein
11/02/2022, 5:52 PMConor Casey
11/02/2022, 6:25 PMAmir
11/02/2022, 6:45 PMJon Young
11/02/2022, 7:42 PMJon Ruhnke
11/02/2022, 8:46 PMBen Muller
11/02/2022, 8:52 PMBen Muller
11/02/2022, 9:22 PMBradley McLaughlin
11/02/2022, 10:02 PMDavid Prince
11/02/2022, 10:43 PMprefect.exceptions.ClientError: [{'path': ['flow_run', 0, 'id'], 'message': 'Cannot return null for non-nullable field flow_run.id.', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]"
As far as I understand, this is a flow_run record without a flow. I need some assistance with removing this corrupted metadata as our data pipelines are all majorly delayed.Stephen Lloyd
11/03/2022, 4:01 AMmerlin
11/03/2022, 4:06 AMtmp_path
from pytest
The confusing part was the flow code was found via relative path, but when writing the output to text file on a relative path the output files landed at an ephemeral system temporary folder. Naturally this wouldn't be an issue writing to s3, but for development I like to work in a local environment and write files relative to my project root.
I don't think this is caused by the poetry environment, because the agent is running in the same environment but the distinction in behavior is between deployed flows and flows called from the shell:
# flow.py
import
@task
def write_output(dataframe, filepath):
dataframe.to_csv(filepath)
logger.debug(f"current working directory: {Path().cwd()}")
@flow
def write_dataframe():
write_output(df, 'data/csv/datafile.csv')
# shell invocation
poetry run python src/flow.py
# prefect logs:
| DEBUG | Task run... - current working directory: /Users/merlin/prefect-repo
# deployed flow invocation
poetry shell
prefect deployment run write_dataframe/depl-write
# prefect logs:
| DEBUG | Task run... - current working directory: /private/var/folders/gh/...4r3zq.../T/tmpXookj0s08prefect
My goal here is for my task to write to the same local file path where my project folder is stored.
Is there a standard way to specify the working directory for deployed flow code on a local environment? Maybe this is managed by the config defaults (I couldn't find a likely ENV variable). My next attempt will be to specify an absolute path for the write location, but this is not ideal.iKeepo w
11/03/2022, 4:55 AMTim-Oliver
11/03/2022, 7:24 AM.tif
file? I tried to implement a custom Serializer
which just passes the image through and then wanted to use a custom WritableFileSystem
as storage option that takes care of writing/reading the image as .tif
. So far I did not manage and would be grateful for some advice. I am trying this in Prefect 2.x. In Prefect 1.x I managed to do something like this by implementing a custom LocalResult
.José Duarte
11/03/2022, 10:07 AMprefect.context.get("running_with_backend")
José Duarte
11/03/2022, 10:13 AMContextModel.copy
are broken
https://docs.prefect.io/api-ref/prefect/context/?h=context#prefect.context.ContextModel.copyTim-Oliver
11/03/2022, 11:40 AMJosé Duarte
11/03/2022, 12:39 PMDavid Elliott
11/03/2022, 2:17 PMKubernetesJob
infra?
I’m finding that the agent creates the job + pod just fine (and it the flow + pod run through to completion) but after X seconds (per that timeout parameter) the agent logs Job 'xxxxx': Job did not complete.
per this even though the job is mid-way through running? ie it doesn’t seem to have any negative effect on the flow, it’s just telling me the job didn’t complete even when the job is very much still running..? Feels like something’s not quite right, just wanting to understand what the intention is….Tim-Oliver
11/03/2022, 3:34 PMJai P
11/03/2022, 4:47 PMasync
, etc.)?Javier Ochoa
11/03/2022, 7:29 PMBrian Phillips
11/03/2022, 9:08 PMprefect.tasks.prefect.create_flow_run
are being canceled without any additional info in the logs. Is this expected? Has anyone else encountered similar behavior?Madison Schott
11/03/2022, 9:13 PMschedule_type
signify? Do you only use this if you want the sync to be scheduled outside of a scheduled flow?Madison Schott
11/03/2022, 9:15 PMMadison Schott
11/03/2022, 9:15 PMMichael Adkins
11/03/2022, 9:15 PM