Riley Hun
08/14/2020, 4:31 PMbral
08/14/2020, 7:08 PMJeff Brainerd
08/14/2020, 7:39 PMJeremiah
08/14/2020, 8:06 PMMichael Reeves
08/14/2020, 8:24 PMFLOW_OBJ.run()
? I'm currently doing the following:
with prefect.context(flow_run_id="asdf"):
EMAIL.run(flow_run_id="asdfasdf", name="TEST")
neither (context, nor the variable in the kwargs of the run
function) actually change the flow_run_id when its output in the logger 😞
Do I need to wrap the flow_run_id in a state variable? If so whats the best way to do this?
Finally, I'm running this flow over a dask cluster so idk if my issue here is affected by this issue: https://github.com/PrefectHQ/prefect/issues/2883Philip MacMenamin
08/14/2020, 9:15 PMvtk_flow
using the web GUI, and am attempting to register a flow:
$ python3 -m vtk_flow.flow
Traceback (most recent call last):
File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/nih-3d-resource-workflows/vtk_flow/flow.py", line 82, in <module>
f.register(project_name='vtk_flow')
File "/prefect-env/lib64/python3.7/site-packages/prefect/core/flow.py", line 1583, in register
no_url=no_url,
File "/prefect-env/lib64/python3.7/site-packages/prefect/client/client.py", line 729, in register
project_name, project_name
ValueError: Project vtk_flow not found. Run `client.create_project("vtk_flow")` to create it.
I thought Client was to talk to Prefect Cloud ?Hannah Amundson
08/14/2020, 10:13 PMkevin
08/14/2020, 10:17 PMprefect.context
at runtime?
More concretely, can I do something like this:
with Flow('test') as flow:
do_stuff()
with prefect.context(foo='bar'):
flow.run()
and if not what would be the correct way to inject context after defining a flow?Maxwell Dylla
08/14/2020, 10:53 PMMarwan Sarieddine
08/15/2020, 4:02 AMtas
08/15/2020, 10:34 AMAlex Papanicolaou
08/15/2020, 6:02 PMAme
08/16/2020, 4:02 AMJohn Ramirez
08/16/2020, 7:25 PMAme
08/17/2020, 3:27 AMAlfie
08/17/2020, 5:28 AMLewis Bails
08/17/2020, 6:41 AMSimone Cittadini
08/17/2020, 9:26 AMprefect server create-default-tenant --name default
but from the shell ( inside the graphql container ) I got
Error: No such command 'create-default-tenant'.
prefect version
0.13.1+14.g303356902
EDIT: worked with master ( prefect v 0.13.2 ) great !Adam
08/17/2020, 10:31 AMAlex Papanicolaou
08/17/2020, 4:52 PMwith prefect.context(loglevel="ERROR"):
inner_flow.run()
Mitchell Bregman
08/17/2020, 7:23 PMclass DS242(ParkMobileFlow):
@property
def tasks(self):
"""Loads all tasks for data quality tests."""
# Connect to ParkMobile client
pm = ParkMobile()
# Get Snowflake connection
sf = pm.get("snowflake")
sf.connect("sqlalchemy")
# Call all tasks and task definitions
records = sf.execute("""
SELECT *
FROM DEV_DB.DATA_QUALITY_TESTS.QUALITY_TASKS
WHERE is_active = true;
""")
return records
@property
def flow(self):
"""Returns the Prefect Flow object."""
return self.build()
def build(self):
"""Builds the workflow."""
tasks = []
for task in self.tasks:
tasks.append(
DataQualityTask(
uuid=task.task_uuid,
name=task.task_name,
description=task.task_description,
source_query=task.source_query,
target_query=task.target_query,
)
)
with Flow("DS242__data-quality-tests") as flow:
for task in tasks:
task()
return flow
Now, I am trying to register
this flow to the backend server, and am getting a serialization error, which is understandable. My question is, can I create a first Task
, that queries the DB for all task definitions, and then dynamically creates new tasks to the Flow
which performs all of the tasks? I can certainly build a custom Dockerfile that handles deployment of this, but would be awesome to connect to prefect server
Julien Allard
08/17/2020, 8:46 PMbral
08/17/2020, 10:36 PMAdam
08/18/2020, 9:45 AMx062Wyhdolq
08/18/2020, 9:53 AMJacob Blanco
08/18/2020, 9:57 AM<class 'prefect.configuration.Config'>
and I have no idea why. Did I do something wrong in the config.toml
file??x062Wyhdolq
08/18/2020, 9:58 AMJonas Hanfland
08/18/2020, 2:03 PMflow.visualize()
generates.
In the graph that's generated I would like Add new columns
to be dependent on Extract customers
, while Extract customers
should itself dependent on Unnest dassport data
.
In the code, how do I add the Unnest passport data
dependency to the existing .set_dependencies()
block without causing duplicates? Thx in advancebral
08/18/2020, 4:10 PMPhilip MacMenamin
08/18/2020, 5:42 PMmutation {
create_flow_run(
input: {
flow_id: "dcbbfc94-5643-453c-941a-0efa8505df66"
parameters: {"input_loc": "test"}
}
) {
id
}
}
Philip MacMenamin
08/18/2020, 5:42 PMmutation {
create_flow_run(
input: {
flow_id: "dcbbfc94-5643-453c-941a-0efa8505df66"
parameters: {"input_loc": "test"}
}
) {
id
}
}
{
"files": [
"fname.txt",
"fname2.txt"
],
"input_loc": "/test_job_loc/",
"submission_id": "test_job_id"
}
Jenny
08/18/2020, 6:17 PMPhilip MacMenamin
08/18/2020, 6:18 PMJenny
08/18/2020, 6:49 PMmutation {
create_flow_run(input: {
flow_id: "xxxxxxxxx",
parameters: "{\"a\":2}"
}
) {
id
}
}
Philip MacMenamin
08/18/2020, 6:50 PM