Hi guys, hope someone can help me with my current problem. I have two virtual machines. One is run...
v

Volker L

over 2 years ago
Hi guys, hope someone can help me with my current problem. I have two virtual machines. One is running the Prefect server and I try to deploy a flow from the other one. I have started the Prefect server successfully
ubuntu in 🌐 instance-20230102-1221 in ~ via 🅒 prefect took 5m57s 
❯ prefect config set PREFECT_SERVER_API_HOST=0.0.0.0
Set 'PREFECT_SERVER_API_HOST' to '0.0.0.0'.
Updated profile 'default'.

ubuntu in 🌐 instance-20230102-1221 in ~ via 🅒 prefect took 3s 
❯ prefect server  start

 ___ ___ ___ ___ ___ ___ _____ 
| _ \ _ \ __| __| __/ __|_   _| 
|  _/   / _|| _|| _| (__  | |  
|_| |_|_\___|_| |___\___| |_|  

Configure Prefect to communicate with the server with:

    prefect config set PREFECT_API_URL=<http://0.0.0.0:4200/api>

View the API reference documentation at <http://0.0.0.0:4200/docs>

Check out the dashboard at <http://0.0.0.0:4200>
I am able to connect to the Prefect UI in the browser, by replacing
0.0.0.0
with the ip address of the vm. On my second vm, i have done the following:
ubuntu in 🌐 instance-20221129-2229 in yfin-db/yfin_db on  main [!?] via 🐍 v3.11.0 via 🅒 main took 6m37s 
❯ prefect config set PREFECT_API_URL=<http://xxx.yyy.zzz.xxx:4200/api>
Set 'PREFECT_API_URL' to '<http://xxx.yyy.zzz.xxx:4200/api>'.
Updated profile 'default'.

ubuntu in 🌐 instance-20221129-2229 in yfin-db/yfin_db on  main [!?] via 🐍 v3.11.0 via 🅒 main took 6s 
❯ prefect deployment apply healthcheck-deployment.yaml 
Successfully loaded 'healthcheck'
Deployment 'healthcheck/healthcheck' successfully created with id '52b59be9-fe11-49d1-acad-3257f3fc758d'.
View Deployment in UI: <http://xxx.yyy.zzz.xxx:4200/deployments/deployment/52b59be9-fe11-49d1-acad-3257f3fc758d>

To execute flow runs from this deployment, start an agent that pulls work from the 'test' work queue:
$ prefect agent start -q 'test'
As you can see, I have changed the
PREFECT_API_URL
to the Prefect server running on my first vm. Than I have applied the deployment and Prefect tells me, that the deployment was successfully created. However, the deployment does not appear in the Prefect UI. Did I miss anything?
1
Hi all, just exploring Prefect v3 and have a (hopefully) simple question that I'm struggling to find...
j

Julian Regan

about 1 year ago
Hi all, just exploring Prefect v3 and have a (hopefully) simple question that I'm struggling to find a clear answer to. I've created some very basic tasks:
@task
def say_hello() -> bool:
    print(f"Hello, world")
    return True

@task
def say_goodbye() -> str:
    raise ValueError("Goodbye")

@task
def see_whats_up(said_hello: bool):
    if said_hello:
        print(f"What's up?")
Now, from what I've read in the docs on final state determination, mainly:
• If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and nested flows within it.
◦ If any task run or nested flow run fails, then the final flow run state is marked as
FAILED
.
◦ If any task run is cancelled, then the final flow run state is marked as
CANCELLED
.
I should be able to write a flow containing some basic concurrency like this:
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    say_hello.submit()
    say_hello.submit()

    say_goodbye.submit()

    see_whats_up.submit(said_hello=said_hello)
However, this throws multiple warnings:
A future was garbage collected before it resolved. Please call
.wait()
or
.result()
on futures to ensure they resolve.
and errors with:
Please wait for all submitted tasks to complete before exiting your flow by calling
.wait()
on the
PrefectFuture
returned from your
.submit()
calls.
The simplest way I've found to make this work is to assign and manually return the future from every single task, which feels a bit verbose:
@flow(log_prints=True)
def test():
    said_hello = say_hello.submit()
    said_hello_2 = say_hello.submit()
    said_hello_3 = say_hello.submit()

    goodbye = say_goodbye.submit()

    whats_up = see_whats_up.submit(said_hello=said_hello)

    # all futures must be returned for the flow to execute correctly
    return (
        said_hello,
        said_hello_2,
        said_hello_3,
        goodbye,
        whats_up,
    )
I suppose I could also replace the return statement with an equally long list of
future.wait()
calls, though if anything that's more confusing. 1. Is there a cleaner way of handling concurrency within a flow? 2. Have I missed a big warning / explanation about this in the docs? 🫠