Hi all, Working on a simple Flow but my Tasks keep...
# ask-community
c
Hi all, Working on a simple Flow but my Tasks keep stuck on "Pending”. Logs say
Failed to load and execute Flow's environment: ValueError('Flow is not contained in this Storage')
My setup is on a AWS EC2 machine and I am running the Flow with a Docker Agent, as per below.
Copy code
from prefect import Parameter, Flow, task
from prefect.storage import GitHub
from prefect.run_configs.docker import DockerRun


@task(log_stdout=True)
def test(e):
    return e.get("body")


storage = GitHub(repo=repo,
            path="flows/flow.py",
            access_token_secret="GITHUB_ACCESS_TOKEN")

run_config = DockerRun(labels=["L1", "L2", "L3"],
                       env={"EXTRA_PIP_PACKAGES": "zcrmsdk zohocrm-prefect-tasks"})

with Flow(name="Name", storage=storage, run_config=run_config) as flow:
    # Pipeline parameters
    event = Parameter('event', required=True)
    test(event)
Any idea about what I am doing wrong?
k
Hey @Carlos Paiva, does your Github version match this file?
c
Yes, @Kevin Kho
k
Is this the whole file?
c
It is
k
How did you register this?
c
Running a script similar to the Flow that uses the file attached.
from prefect import Parameter, Flow, task
from prefect.storage import GitHub
from prefect.run_configs.docker import DockerRun
@task(log_stdout=True)
def test(e):
return e.get("body")
storage = GitHub(repo=repo,
path="flows/flow.py",
access_token_secret="GITHUB_ACCESS_TOKEN")
run_config = DockerRun(labels=["L1", "L2", "L3"],
env={"EXTRA_PIP_PACKAGES": "zcrmsdk zohocrm-prefect-tasks"})
with Flow(name="Name", storage=storage, run_config=run_config) as flow:
# Pipeline parameters
event = Parameter('event', required=True)
test(event)
Copy code
if __name__ == '__main__':

    api_client = RegisterFlow(
        flow=flow,
        project_id='b34b0ec3-2b15-439b-ac07-5f74a72fa55b',
        api_base_url='<http://ip:4200>'
    )

    api_client.register_flow()
k
Why are you doing it like this? You can’t use the Prefect library to register? It seems the registration is not going successfully in some way
c
How?
k
Using
flow.register()
?
Copy code
from prefect import Flow, task

@task
def foo():
    return 1


with Flow("extra-loggers-example") as flow:
    foo()

flow.register("project_name")
c
Thank you @Kevin Kho I was using a repository with an example. I guess it’s not a good one. It’s working now.
k
Nice!