Matthias
07/17/2020, 2:15 PM@task(checkpoint=False)
def extract_things():
bar = [1, 2, 3, 4, 5, 6, 7, 8, 9]
foo = ['a', 'b', 'c']
data = {
'bar': bar,
'foo': foo
}
return data
@task(checkpoint=False)
def do_things(data):
for x in data:
print(x)
return None
with Flow(
"Do it",
) as flow:
things = extract_things()
do_things(things['foo'])
do_things(things['bar'])
james.lamb
07/17/2020, 4:25 PMc00l-flow
could exist in the same tenant, as long as they are in different projects.
thanks!Matt Allen
07/17/2020, 4:46 PMBen Fogelson
07/17/2020, 6:07 PMprefect
for something like this. The idea is that currently Flow
objects have two major responsibilities: (1) building/maintaining a DAG and (2) supporting execution of that DAG. I propose splitting that functionality into two classes: Flow
, which would support DAG execution, and Composite
(or some other name), which builds and maintains a DAG. Crucially, Composite
instances could be nested (i.e. Composite.add_task
would accept either a Task
or another Composite
instance, and similarly for replace
, add_edge
, etc). To run the DAG, Flow
would traverse the nested graph and build up a flattened DAG that just contains Task
nodes, and then execute as usual.Richard Hughes
07/17/2020, 7:02 PMMichael Reeves
07/17/2020, 8:09 PMitay livni
07/17/2020, 8:27 PMZach
07/17/2020, 8:57 PMTyler Wanner
07/17/2020, 9:29 PMTyler Wanner
07/17/2020, 9:36 PMTyler Wanner
07/17/2020, 9:39 PMMatt Wong-Kemp
07/17/2020, 9:45 PMAlfie
07/18/2020, 8:20 AMAlfie
07/18/2020, 9:57 AMMarwan Sarieddine
07/19/2020, 7:36 PMexport PREFECT__LOGGING__LEVEL="ERROR"
but if I set the environment variable from within the script - prefect doesn’t pick up on it
import os
os.environ["PREFECT__LOGGING__LEVEL"] = "ERROR"
....
flow.run()
what am I missing here ? (is there a way to set the variable programmatically from within the script? )karteekaddanki
07/20/2020, 7:43 AMA result subclass that matches the storage backend of your prefect.config.flows.storage setting will automatically be applied to all tasks, if available; notably this is not yet supported for Docker StorageDoes it mean I have to individually configure the results of tasks in a docker stored flow (without any global defaults) or does it mean it is currently unsupported to have any results for tasks that run in a docker environment?
Sven Teresniak
07/20/2020, 9:22 AMLocal
storage to the flow, pointing to a path in the NFS. Now the questions:
1. is it a good idea (or necessary?) to have the flows and results accessible in the same mountpoint (local filesystem location, e.g. NFS-persisted ~/.prefect/flows
) for Agent and Dask worker? Or is it sufficient to persist the flow/result storage (make it durable between restarts), and its not needed for the agent to access the pickeld flow results?
2. What's the pro and con of stored_as_script=True
for a Local
storage? When do I want to set this? Also, I do not understand the path
parameter for local storage? I set directory
to a path on my NFS and now I see the pickled flows and results. What is path?Michael Ludwig
07/20/2020, 11:44 AMclass ECSFargateAgent:
def __init__(self, config: PrefectAgentConfig):
self._env = config.env
self._agent = FargateAgent(
labels=[f"reco-{self._env}"],
enable_task_revisions=True,
launch_type="FARGATE",
taskRoleArn=config.task_role_arn,
executionRoleArn=config.execution_role_arn,
cluster=config.ecs_cluster_arn,
networkConfiguration={
"awsvpcConfiguration": {
"assignPublicIp": "ENABLED",
"subnets": config.subnets,
"securityGroups": [config.security_group],
}
},
cpu="1024", # 1 vCPU
memory="3072", # 3 GB
containerDefinitions=[
{
"logConfiguration": {
"logDriver": "awslogs",
"options": {
"awslogs-group": config.log_group,
"awslogs-region": "eu-west-1",
"awslogs-stream-prefix": "flows",
},
},
}
],
)
def run(self):
"""Start the agent"""
self._agent.start()
e.g. we switched memory
from 16GB to 3GB but the agent still fires off flows with 16GB. Only deleting the old task definitions manually solves this for us. Anybody seen something similiar or has a solution?Sven Teresniak
07/20/2020, 12:14 PM# prefect run server -n flow2 -l
Flow Run: <http://localhost:8080/flow-run/e82df5c1-d197-46f1-9bb1-f18f9335e0f8>
TIMESTAMP LEVEL MESSAGE
2020-07-20T12:09:56.284609+00:00 INFO Submitted for execution: PID: 835
2020-07-20T12:09:57.237871+00:00 INFO Beginning Flow run for 'flow2'
2020-07-20T12:09:57.273447+00:00 INFO Starting flow run.
2020-07-20T12:09:57.273836+00:00 DEBUG Flow 'flow2': Handling state change from Scheduled to Running
2020-07-20T12:09:57.581024+00:00 INFO Task 'task2': Starting task run...
2020-07-20T12:09:57.724552+00:00 INFO hello, task2
2020-07-20T12:09:57.835204+00:00 INFO Task 'task2': finished task run for task with final state: 'Success'
2020-07-20T12:09:57.942237+00:00 INFO Task 'flow1-runner': Starting task run...
2020-07-20T12:09:58.145036+00:00 ERROR Unexpected error: HTTPError('400 Client Error: Bad Request for url: <http://localhost:4200/graphql>')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 819, in get_task_run_state
value = timeout_handler(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 188, in timeout_handler
return fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 445, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 119, in run
flow_run_id = client.create_flow_run(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 877, in create_flow_run
res = self.graphql(create_mutation, variables=dict(input=inputs))
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 213, in graphql
result = <http://self.post|self.post>(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 172, in post
response = self._request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 328, in _request
response = self._send_request(
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 265, in _send_request
response.raise_for_status()
File "/usr/local/lib/python3.8/site-packages/requests/models.py", line 941, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: <http://localhost:4200/graphql>
2020-07-20T12:09:58.275483+00:00 INFO Task 'flow1-runner': finished task run for task with final state: 'Failed'
2020-07-20T12:09:58.302057+00:00 INFO Flow run FAILED: some reference tasks failed.
2020-07-20T12:09:58.302667+00:00 DEBUG Flow 'flow2': Handling state change from Running to Failed
During this flow run the log/stdout of Apollo (wich is listening to localhost:4200
) printsSven Teresniak
07/20/2020, 12:14 PMflow1
:
#!/usr/bin/env python
# coding: utf-8
import prefect
from prefect import task, Flow
from prefect.environments.storage.local import Local
@task
def task1():
prefect.context.get("logger").info("hello, task1")
with Flow("flow1", storage=Local(directory="/flows/.prefect/flows")) as flow:
task1()
if __name__ == "__main__":
flow.register()
And here is flow2
:
#!/usr/bin/env python
# coding: utf-8
import prefect
from prefect import task, Flow
from prefect.tasks.prefect import FlowRunTask
from prefect.environments.storage.local import Local
@task
def task2():
prefect.context.get("logger").info("hello, task2")
with Flow("flow2", storage=Local(directory="/flows/.prefect/flows")) as flow:
flow1_run = FlowRunTask(name="flow1-runner", flow_name="flow1", wait=True)
task2.set_downstream(flow1_run)
if __name__ == "__main__":
flow.register()
When I now run the flow2
I get an error:Sven Teresniak
07/20/2020, 12:14 PM2020-07-20T12:09:58.139Z {"message":"Variable \"$input\" got invalid value { flow_id: \"80de1b39-fed7-47ab-a5a5-fc22ac6f87d2\", idempotency_key: \"e82df5c1-d197-46f1-9bb1-f18f9335e0f8\" }; Field \"idempotency_key\" is not defined by type create_flow_run_input.","locations":[{"line":1,"column":10}],"extensions":{"code":"INTERNAL_SERVER_ERROR"}}
Is this a bug?
Every component of my Prefect Cluster (v0.12.4) is running in K8S but its a static setup with every component is one container and everything together in one pod (so everything can communicate using localhost). I have a Dask-Worker pod running and all other flows (not calling other flows) this far are working.karteekaddanki
07/20/2020, 2:42 PMfrom prefect.client import Client
import pandas as pd
if __name__ == "__main__":
dates = pd.bdate_range(pd.Timestamp("20200601"), pd.Timestamp("20200630"))
c = Client()
for d in reversed(dates):
c.create_flow_run(
"e04181f3-86b3-4f6f-87cf-ed7c24cb95ec",
parameters=dict(date=d.strftime("%Y-%m-%d")),
context=dict(name="John Doe"),
run_name=f"test_backfill_{d.strftime('%Y%m%d')}")
But some of the flow runs are stuck with
Queued due to concurrency limits. The local process will attempt to run the flow for the next 10 minutes, after which time it will be made available to other agents.I am on the dev version of the cloud backend and understand that there are limits. I have the following questions: 1. Am I hitting this error because of the dev cloud version? 2. Although, the message says the flow will be rescheduled within the next 10 minutes, it takes around 30 mins for the rest of the tasks to get scheduled and run. 3. Is there a better way to create a backfill? I am avoiding
FlowRunTask
because context is currently not passed to the triggered flow runsbruno.corucho
07/20/2020, 2:56 PM@task
def i_am_too_big():
"""
multiple
lines of code
that should
go into
a different method
"""
"""
some
sequential (to be parallelized)
http
calls
""""
"""
multiple
lines of code
that should
go into
a different method
"""
Qwame
07/20/2020, 6:23 PMkarteekaddanki
07/20/2020, 8:52 PMAlfie
07/21/2020, 2:56 AMERROR - agent | Error while deploying flow: ValidationError({'_schema': 'Invalid data type: None'})
What I did is creating the workflow with a label and using graphql API to register the flow:
alert_flow.environment = LocalEnvironment(labels=['public'])
create_flow_mutation = """
mutation($input: create_flow_input!) {
create_flow(input: $input) {
id
}
}
"""
serialized_flow = flow._flow.serialize(build=False)
self._client.graphql(query=create_flow_mutation,
variables=dict(
input=dict(
serialized_flow=serialized_flow,
set_schedule_active=False
)
)
)
I do not get more log for debugging, please help, thanks!Vikram Iyer
07/21/2020, 3:39 AMKlemen Strojan
07/21/2020, 5:31 AMFailed to retrieve task state with error: ClientError([{'path': ['get_or_create_task_run'], 'message': 'Invalid ID', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}])
Is this raised in GraphQL? What are the possible reasons? I am using Cloud + Core 0.12.1Thomas Hoeck
07/21/2020, 7:33 AMUnexpected error: ZeroDivisionError('division by zero')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 952, in get_task_run_state
self.task.run, timeout=self.task.timeout, **raw_inputs
File "/usr/local/lib/python3.7/site-packages/prefect/utilities/executors.py", line 182, in timeout_handler
return fn(*args, **kwargs)
File "<ipython-input-6-39643be0b44f>", line 9, in hello_task
ZeroDivisionError: division by zero
I know it is part of the Hybrid model that code isn't shared but is there a way to get a normal python traceback, so I can see the failed line?Sven Teresniak
07/21/2020, 8:22 AMstored_as_script
and path
keyword arguments removed? in every storage class? (Local
in my case)