Marvin
09/12/2020, 5:54 PMSandeep Gainda
09/13/2020, 5:35 AMMarvin
09/13/2020, 5:35 AMMichael J Hall
09/13/2020, 2:20 PMMichael J Hall
09/13/2020, 2:20 PMMichael J Hall
09/13/2020, 2:21 PMMichael J Hall
09/13/2020, 2:23 PMMichael J Hall
09/13/2020, 3:03 PMMichael J Hall
09/13/2020, 3:04 PMMichael J Hall
09/13/2020, 4:09 PMMichael J Hall
09/13/2020, 4:19 PMMichael J Hall
09/13/2020, 8:31 PMMichael J Hall
09/13/2020, 8:31 PM[linuxserver02 ~]$ prefect backend cloud
Backend switched to cloud
[linuxserver02 ~]$ export PREFECT__CLOUD__AGENT__AUTH_TOKEN=ABC123ABC123123XYZ
[linuxserver02 ~]$ python3.6
Python 3.6.8 (default, Jun 11 2019, 15:15:01)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import prefect
>>> from prefect import task, Flow
>>>
>>> @task
... def hello_task():
... logger = prefect.context.get("logger")
... <http://logger.info|logger.info>("Hello, Cloud!")
...
>>> flow = Flow("hello-flow", tasks=[hello_task])
>>> flow.register(project_name="Hello, World!")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/prefect/core/flow.py", line 1594, in register
no_url=no_url,
File "/usr/local/lib/python3.6/site-packages/prefect/client/client.py", line 730, in register
project_name, project_name
ValueError: Project Hello, World! not found. Run `client.create_project("Hello, World!")` to create it.
>>>
Michael J Hall
09/13/2020, 8:32 PMsark
09/14/2020, 3:52 AMLocalStorage
it is local to the machine registering the flow, but for LocalEnvironment
it is local to the the machine of the agent?sark
09/14/2020, 3:56 AMHagai Arad
09/14/2020, 8:00 AMJuan
09/14/2020, 8:17 AMRizky Eko Putra
09/14/2020, 9:11 AM/usr/local/lib/python3.7/site-packages/prefect/core/flow.py:1594: UserWarning: No result handler was specified on your Flow. Cloud features such as input caching and resuming task runs from failure may not work properly.
no_url=no_url,
Ian Fridge
09/15/2020, 6:45 PMCharles Leung
09/15/2020, 8:59 PMChandra Manginipalli
09/16/2020, 8:46 PMsebastian.clanzett
09/17/2020, 4:25 PMNils Israel
09/17/2020, 5:09 PMChandra Manginipalli
09/17/2020, 6:12 PMmatt forbes
09/17/2020, 7:24 PMChandra Manginipalli
09/17/2020, 7:46 PMDave
09/18/2020, 10:39 AM"Interval can not be less than one minute when deploying to Prefect Cloud."
I'm running the latest version of Prefect.Vipul
09/18/2020, 11:33 AMwith Flow("MasterFlow") as master_flow:
cobdate = Parameter("cobdate")
staging_area = FlowRunTask(flow_name='staging_area',
parameters=dict(cobdate=cobdate),
project_name="SubFlow")
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
project_name="SubFlow")
data_mart = FlowRunTask(flow_name='data_mart',
project_name="SubFlow",
wait=True)
staging_area.set_downstream(data_mart)
business_logic_layer.set_downstream(data_mart)
Vipul
09/18/2020, 11:35 AMVipul
09/18/2020, 11:35 AMMatt Wong-Kemp
09/18/2020, 12:59 PMcobdate
, which container the Parameter object, not the value@task
def prepare_parameters(cobdate):
return dict(cobdate=cobdate)
with Flow("MasterFlow") as master_flow:
cobdate = Parameter("cobdate")
staging_parameters = prepare_parameters(cobdate)
staging_area = FlowRunTask(flow_name='staging_area',
parameters=staging_parameters,
project_name="SubFlow")
Vipul
09/18/2020, 1:15 PM-> res = self.graphql(create_mutation, variables=dict(input=inputs))
(Pdb) inputs
{'flow_id': 'bc156d0a-b2dc-4425-85a6-10e2a9a5ef1f', 'parameters': <Task: prepare_parameters>, 'idempotency_key': 'db6be705-45eb-4a99-adea-86c4506227d9'}
I think the JSON error is due to fact that 'parameters' has value of <Task: prepare_parameters> and JSON does not know how to serialiaze it... @nicholasnicholas
09/18/2020, 5:22 PMRun
tab here:cobdate
required or give it a default:
with Flow("MasterFlow") as master_flow:
cobdate = Parameter("cobdate", required=True)
# OR Parameter("cobdate", default="some default")
# Then pass the parameters to the runtime context of FlowRunTask
# instead of the Class instantiation
staging_area = FlowRunTask(flow_name='staging_area',
project_name="SubFlow")(parameters={"cobdate": cobdate})
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
project_name="SubFlow")
data_mart = FlowRunTask(flow_name='data_mart',
project_name="SubFlow",
wait=True)
staging_area.set_downstream(data_mart)
business_logic_layer.set_downstream(data_mart)
Vipul
09/18/2020, 5:59 PMnicholas
09/18/2020, 6:02 PMVipul
09/27/2020, 10:49 AMstaging_area = FlowRunTask(flow_name='staging_area',
project_name="SubFlow")(parameters={"cobdate": cobdate})
has cause two extra tasks to be added as List and Dict before calling Flow "staging_area", is this expected behaviour? Thanks