Robert Hales
07/15/2021, 9:55 AMclient.set_flow_run_name
method, and it sucessfully updates the UI. However, when I then do create_flow_run
the subtasks use the old auto-generated name. Im assuming because the context is stale?Robert Hales
07/15/2021, 10:26 AMwait_for_flow_run
failed if the state of the flow run is failedScarlett King
07/15/2021, 3:34 PMEric Mauser
07/16/2021, 7:54 PMGarret Cook
07/16/2021, 11:13 PMGarret Cook
07/17/2021, 2:19 AMBruno Murino
07/17/2021, 9:17 AM{
"type": "http-log",
"timestamp": "2021-07-17T08:43:42.608+0000",
"level": "error",
"detail": {
"operation": {
"user_vars": {
"x-hasura-role": "admin"
},
"error": {
"path": "$.selectionSet.insert_project.args.objects",
"error": "Uniqueness violation. duplicate key value violates unique constraint \"project_tenant_id_name_key\"",
"code": "constraint-violation"
},
"request_id": "c8aedf52-53fc-4e54-9149-a1d62db270f0",
"response_size": 193,
"query": {
"variables": {
"insert_objects": [
{
"tenant_id": "712db680-3e15-469d-b6e7-eee61a60593d",
"name": "Health Monitoring",
"description": null
}
]
},
"query": "mutation($insert_objects: [project_insert_input!]!) {\n insert: insert_project(objects: $insert_objects) {\n returning {\n id\n }\n }\n}"
}
},
"http_info": {
"status": 400,
"http_version": "HTTP/1.1",
"url": "/v1alpha1/graphql",
"ip": "172.17.0.14",
"method": "POST",
"content_encoding": null
}
}
}
Son Mai
07/18/2021, 1:50 PMGarret Cook
07/19/2021, 3:13 AMTask 'create_flow_run': Exception encountered during task execution!
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/task_runner.py", line 865, in get_task_run_state
logger=self.logger,
File "/usr/local/lib/python3.6/site-packages/prefect/utilities/executors.py", line 327, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.6/site-packages/prefect/tasks/prefect/flow_run.py", line 142, in create_flow_run
run_url = client.get_cloud_url("flow-run", flow_run_id)
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1217, in get_cloud_url
as_user=(as_user and using_cloud_api and self._api_token is not None)
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 1252, in get_default_tenant_slug
res = self.graphql(query)
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 563, in graphql
raise ClientError(result["errors"])
prefect.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
Garret Cook
07/19/2021, 4:09 PMMarko Jamedzija
07/20/2021, 3:33 PMRunning
state even though the underlying k8s RunNamespacedJob
task completed successfully. I’m using prefect 0.15.1
and LocalDaskExecutor
. This happens almost always for the longer-running tasks of this kind. Any suggestion how to resolve this? Thanks!Leonardo Rocha
07/20/2021, 5:59 PMAlex Furrier
07/20/2021, 10:06 PMflow_a = StartFlowRun(flow_name="flow a", project_name="my project", wait=True)
flow_b = StartFlowRun(flow_name="flow b", project_name="my project", wait=True)
with Flow('flow-of-flows') as flow_of_flows:
param_a = Parameter('param_a', 'a')
param_b = Parameter('param_b', 'b')
flow_a(parameters={"param_a": param_a})
flow_b(parameters={"param_b": param_b}, upstream_tasks=[flow_a])
In the UI if I attempt to run this flow with params like {'param_a': 'parametrized_1', 'param_b':'parametrized_b'}
it will create a task running flow_a
and fail. Viewing the initiated flow for flow_a
it shows the parameters being {'param_a': null}
. flow_a
fails and flow_b
fails before it even runs since it's dependent on flow_a
and says a reference task failed.
Is there a proper way to pass parameters between parent and child flows?Hugo Polloli
07/21/2021, 9:42 AMDaniel Davee
07/21/2021, 6:37 PMAmogh Kulkarni
07/22/2021, 8:31 AMYD
07/22/2021, 9:06 PMJonas
07/23/2021, 1:44 PMTypeError: cannot pickle 'weakref' object (running this with flow.run worked fine)
I narrowed it down to the use of a sqlalchemy session object I use to send and retrieve data from a database and that the object can't be serialized.
Now I'm a bit clueless as to how I should solve this because I'm unsure with what steps to take next.
from prefect import task, Flow,case, Parameter
from sqlalchemy import orm
from sqlalchemy import orm
engine = create_engine("mysql+mysqldb://{USER}:{PASSWORD}@{HOST}:{PORT}/{NAME}".format(**json.load(f)))
session = orm.Session(bind=engine)
@task(max_retries=3, retry_delay=timedelta(seconds=10))
def commit_to_db(object_list: List):
session.add_all(object_list)
try:
session.commit()
except exc.SQLAlchemyError as error:
session.rollback()
#logger.warning(error)
raise Exception(error)
session.close()
Chris Bowen
07/23/2021, 6:18 PM.md
files in a repo).
@Zach Angell provided me with the GraphQL to start out and I expanded on it to make a solution that works for me. I'm storing mine in a few functions, but I'll just provide the entire thing as a script here.
This script assumes a few things (I'll edit/update if I think of any more or anyone points any out):
1. This script is running in a directory that contains all of your .md
files that you want to deploy
2. Your .md
files are named the same thing as your flows (see the active_flows
object returned from the GraphQL query)
3. Any \
characters in your .md
files need to be escaped with a second \
or the deployment won't work
4. I believe including any `"`characters in the .md
files also blew up the API call, but '
was fine
CODE IN THREAD
I've got this divided up into three functions, but posting it in this format seemed like it might be easier to read.
One thing I did notice is that the Prefect UI doesn't seem to handle code blocks very elegantly. I tried using the standard three ticks (`), tildas (~), with/without specifying the code base, and using four spaces.
I attached a couple pictures that show the input and output. Not a huge deal, but curious why it doesn't "block" like most other platforms I've seen.
Anyways, happy to hear any feedback, suggestions, etc. Also just happy to share it if it helps anyone else at all!Nathan Walker
07/23/2021, 8:00 PMMateusz Pazdzior
07/26/2021, 10:15 AMAlfie
07/27/2021, 12:03 AMEric Mauser
07/27/2021, 8:52 PMflow.py
locally it returns that both tasks were successful, however I can see that it didn't actually run the dbt package. Any thoughts here? The dbt package is just in the same folder as the flow.py
. I'm assuming this all gets packaged up and run on ECS.Hugo Polloli
07/28/2021, 12:33 PMprefect agent docker start
then docker ps
, I was thinking I'd see a docker agent as a container but I don't, I only have my prefect server. Did I misunderstand something about the "docker" agent or is something wrong with my setup ?Jonas
07/28/2021, 2:14 PMFailed to load and execute Flow's environment: StorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'subpackage1\'")\nThis may be due to a missing Python module in your current environment. Please ensure you have all required flow dependencies installed.')
I'm trying to import package 1 and 2 in flow A .
I get that the Prefect environment can't find the module but what is they best way of solving this?
folder structure:
└── myproject
└── system1
├── flow_A.py
└── subpackage1
└── package1.py
└── subpackage2
└── package2.py
└── random files (that I can import)Constantino Schillebeeckx
07/28/2021, 2:28 PMEric Mauser
07/28/2021, 4:12 PMPedro Martins
07/28/2021, 5:45 PMAlex Furrier
07/28/2021, 10:13 PM[2021-07-28 22:06:32+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'My Flow'
[2021-07-28 22:06:32+0000] INFO - prefect.DaskExecutor | Creating a new Dask cluster with `__main__.`...
Creating scheduler pod on cluster. This may take some time.
[2021-07-28 22:06:37+0000] INFO - prefect.DaskExecutor | The Dask dashboard is available at <http://dask-root-78fcc2c9-3.prefect:8787/status>
distributed.deploy.adaptive - INFO - Adaptive scaling started: minimum=1 maximum=2
[2021-07-28 22:07:04+0000] WARNING - prefect.CloudFlowRunner | Flow run is no longer in a running state; the current state is: Error: pods ['prefect-job-054cbaf7-rt2mr'] failed for this job">
The last thing I attempted to change was to have all results for the flow be AzureResults
with a connection to blob storage. The environment variables for connecting to our blob storage are available in the docker image the flow runs on, but I didn't see them on the prefect job. They are on the dask workers that spawn. I don't see successful results showing up in our blob storage. Not sure if the two of these are related but thought it's possible.Zhilong Li
07/29/2021, 2:42 AMZhilong Li
07/29/2021, 2:42 AMKevin Kho
07/29/2021, 2:16 PM