Ido Slonimsky
12/13/2021, 10:04 AMAnna Geller
Ido Slonimsky
12/13/2021, 10:09 AM@task()
def extract_data(source):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"{source=}")
With source being a parameter:
with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
source_name = Parameter("source")
data = extract_data(source_name)
Anna Geller
@task(log_stdout=True)
def extract_data(source):
print(f"{source=}")
Ido Slonimsky
12/13/2021, 10:14 AMAnna Geller
source_name = Parameter("source", default="your_default_source")
This way, you can schedule your flow to run with this default value and you can optionally overwrite it at runtime when needed.Ido Slonimsky
12/13/2021, 10:17 AMIdo Slonimsky
12/13/2021, 10:21 AMAnna Geller
from prefect import task, Flow, Parameter
@task(log_stdout=True)
def hello_world(name):
print(f"{name=}")
with Flow("mini.example") as flow:
name = Parameter("name", default="world")
hw = hello_world(name)
if __name__ == '__main__':
flow.run()
It should give the output:
[2021-12-13 11:23:57+0100] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-12-13 11:23:57+0100] INFO - prefect.TaskRunner | name='world'
Ido Slonimsky
12/13/2021, 10:28 AM[2021-12-13 12:26:16+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'mini.example'
[2021-12-13 12:26:16+0200] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2021-12-13 12:26:16+0200] INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
[2021-12-13 12:26:16+0200] INFO - prefect.TaskRunner | Task 'hello_world': Starting task run...
[2021-12-13 12:26:16+0200] INFO - prefect.TaskRunner | name='world'
[2021-12-13 12:26:16+0200] INFO - prefect.TaskRunner | Task 'hello_world': Finished task run for task with final state: 'Success'
[2021-12-13 12:26:16+0200] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
The flow I was trying to run also works correctly when using flow.run() - The problem occurs when I try to run it on the agents in the ECS FargateAnna Geller
@task(log_stdout=True)
def hello_world(name):
print(f"name={name}")
Ido Slonimsky
12/13/2021, 10:28 AMAnna Geller
import platform
print(platform.python_version())
Marwan Sarieddine
12/13/2021, 3:23 PMIdo Slonimsky
12/13/2021, 3:24 PMMarwan Sarieddine
12/13/2021, 3:24 PM