https://prefect.io logo
k

Ken Farr

06/28/2021, 10:17 PM
Hello! I'm working to port an Argo Workflow to Prefect. All of our Argo WFs are docker based, as such, I'm creating a simple flow that takes 2 Parameters and computes the docker command to execute. The problem I'm running into is that by building the DAG correctly I am referencing a global variable technically before it is defined. This is not a pretty situation, however it is technically producing the correct DAG (based on dag visualization and execution) I've included a paired down snippet. What is the correct/preferred way to accomplish a task like this? Essentially I need to set a Task's arguments based on the execution of a prior Task.
Copy code
...

cft_container = CreateContainer(image_name="twodub:v0.4")
cft_start = StartContainer()
cft_logs = GetContainerLogs()
cft_status_code = WaitOnContainer()


@task
def set_cft_container_command(cft_name: str, account_id: str):
    logger = prefect.context.get("logger")
    ret = f"python -m twodub cft install --cft-name {cft_name} --account-id {account_id}"
    new_container.command = ret # <--- Referencing new_container before declared
    <http://logger.info|logger.info>(f"command is: '{ret}'")
    return ret


with Flow("install-cft-flow") as flow:
    cft_name = Parameter("CFT Name")
    account_id = Parameter("Account ID")

    scc = set_cft_container_command(cft_name, account_id)

    # I'm creating a new version of this Task here with bind
    # I suspect there is a cleaner way than this, but it wasn't
    # apparent to me in the documentation 
    #
    # If I did not do this, then the container would be created
    # before the set_cft_container_command was called and the
    # correct command argument would not be set
    new_container = cft_container().bind(upstream_tasks=[scc])

    start_container = cft_start(container_id=new_container)

...
🙌 1
✅ 1
k

Kevin Kho

06/28/2021, 10:21 PM
Hi @Ken Farr, you can do
new_container = CreateContainer(scc)
instead. You can initialize tasks inside the Flow context.
Oh sorry if it goes in the run method of the task, then you can do
new_container = cft_container(scc)
Man sorry I guess you wanted a new version of the Task, the correct answer might be:
new_container = CreateContainer(image_name="twodub:v0.4")(scc)
k

Ken Farr

06/28/2021, 10:25 PM
What I really want is to just set the argument
command
based on the two Parameters being passed in. If you take a peek at
set_cft_container_command
the only purpose of this task is to build the container command and set it. BUT since the run() on CreateContainer doesn't take a
command
I have to do it on the Task instead. It's creating a weird circular logic that I think can be fixed by expanding the CreateImage API to accept the
command
at execution instead of at creation
But I was hoping my naivety of Prefect was the limiting factor
k

Kevin Kho

06/28/2021, 10:28 PM
This is from the Task Library right? The CreateContainer seems to take a
command
in the run method
k

Ken Farr

06/28/2021, 10:29 PM
Yep, that's what I'm following. But CreateContainer runs not when the flow runs but when the flow is created. I need to set the
command
when the flow executes, not when the flow is created. Such is my understanding.
Again, because it's based on two Parameter tasks.
k

Kevin Kho

06/28/2021, 10:33 PM
The init happens during build time, and the run happens during execution time. You can do:
Copy code
with Flow("install-cft-flow") as flow:
    cft_name = Parameter("CFT Name")
    account_id = Parameter("Account ID")
    scc = set_cft_container_command(cft_name, account_id)
    CreateContainer()(image_name="twodub:v0.4", command=scc)
k

Ken Farr

06/28/2021, 10:36 PM
Yes! That was it.
Thank you!
k

Kevin Kho

06/28/2021, 10:37 PM
No problem! Just a note that not all tasks in the task library have everything available to configure at runtime. This is something on the radar to fix.
3 Views