Raviraja Ganta
05/05/2022, 11:04 AMmain-project
|
src
|
| __init__.py
| module1.py
| module2.py
|
configs
|
| config.yaml
|
flows
|
| sample_flow.py
Rasmus Lindqvist
05/05/2022, 1:23 PMprefect server
for local development and prefect cloud
for production which is quite neat in terms of speed. When doing this we set the --external-postgres flag in order to use our development database, which works great!
However we are using Hasura for our other backend service which creates some git-pain as Prefect overwrites our local Hasura metadata files. Have anyone run into the same problem ?Ievgenii Martynenko
05/05/2022, 1:43 PMAdi Gandra
05/05/2022, 2:24 PMEdmondo Porcu
05/05/2022, 3:57 PMGeoffrey Keating
05/05/2022, 4:08 PMprefect_test_harness
provides an environment for flows to be tested, how might I go about testing a task in Prefect 2.0? It seems like calling .run()
on tasks has not made it over from 1.0 yet.Edmondo Porcu
05/05/2022, 5:46 PMwith Flow("mapped-fibonacci") as mapped_flow:
ms = Parameter("ms")
fib_nums = compute_large_fibonacci.map(ms)
flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
nice but what about non array parameters? Let's say I have two parameters, one that's an array, and the other which is a constant.
with Flow("mapped-fibonacci") as mapped_flow:
ms = Parameter("ms")
ms2 = Parameter("ms2")
fib_nums = compute_large_fibonacci.map(ms, ms2)
flow_state = mapped_flow.run(ms=[10, 100, 1000, 1500])
André Dias
05/05/2022, 5:47 PMPrefect Agent
and a prefect server
on my local computer, everything residing inside of containers
. My Prefect Server
`container(s)`is based on the docker-compose
file retrieved from prefect server config > server-docker-compose.yaml
. When spinning up a container for an Agent and running prefect agent local start --api <http://host.docker.internal:4200>
, it says that I must be authenticated to communicate with the containerized server living in my local machine.
My first question is: Is authentication
mandatory in this kind of situation , for local run ?
When I authenticate by creating a new tenant using prefect server create-tenant -n default
outside the Agent container, I’m able to spin up the Agent Container
and run everything. The problem is that I wanted to run the create-tenant
inside the container too, but it’s not possible as I’m getting connection refused between containers.Jonathan Mathews
05/05/2022, 5:57 PMPatrick Tan
05/05/2022, 6:09 PMJonathan Mathews
05/05/2022, 9:14 PMLeon Kozlowski
05/05/2022, 9:32 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 413, in heartbeat
self.manage_jobs()
File "/usr/local/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 193, in manage_jobs
f"Job {job.name!r} is for flow run {flow_run_id!r} "
AttributeError: 'V1Job' object has no attribute 'name'
I found a thread discussing this - but I wanted to know if there was another way to alleviate this other than upgrading to prefect 1.0+ - If that is the only solution, are there any gotchas/things I should look out for when upgrading? And will flows as old as 0.14.X still run as expected?Seth Just
05/05/2022, 10:09 PMLeon Kozlowski
05/06/2022, 12:18 AMAlex Shea
05/06/2022, 12:36 AMArtem Vysotsky
05/06/2022, 1:02 AMGustavo Frederico
05/06/2022, 1:39 AMGustavo Frederico
05/06/2022, 1:55 AMEddie Atkinson
05/06/2022, 2:48 AMFargateCluster
. It seems like the cluster is being assigned a public IP address. Ideally I wouldn’t want that to be the case as I don’t want people snooping on my cluster / submitting jobs. However, when I pass "fargate_private_ip": True
to cluster_kwargs
my cluster fails to start with the error: Cluster failed to start: Timed out trying to connect to <tcp://10.0.1.111:8786> after 30 s
That makes sense. Someone somewhere failed to connect to a local IP address, presumably from outside the subnet. What I don’t understand is how I can prevent people from arbitrarily accessing my cluster from the internet whilst allowing all the ‘right’ traffic throughJeff Kehler
05/06/2022, 6:40 AMIntervalClock
scheduler. If i were to pass interval=timedelta(hours=1)
to schedule a run for every hour, would this be scheduled to occur at the beginning of every hour? Or would it be relative to when I register the Flow? eg if I register the flow at 1:13pm then the IntervalClock
would trigger at 13 minutes past the hour every hour? Or would it just kick off at the first minute of every hour?Thomas Huijskens
05/06/2022, 7:57 AMIevgenii Martynenko
05/06/2022, 8:16 AMJonathan Mathews
05/06/2022, 8:43 AMIevgenii Martynenko
05/06/2022, 12:20 PMtoken = EnvVarSecret("TOKEN")
connections = get_connections(token=token) # this is a task that returns dict(name, connection_string)
start_task_result = start_task(connections.get('some_key')) # i'd expect to get value from dict, not attribute error
Nikhil Joseph
05/06/2022, 12:54 PMJovan Sakovic
05/06/2022, 1:06 PM--watch
flag.
However, as we’re spinning it all up with docker-compose, and we have multiple Prefect Projects, so I want to run a few of these prefect register
commands in the container’s entrypoint shell script (code example in 🧵)
Is there an easy way to:
• either run the prefect register
command in background, so it lets the rest run
• or, run prefect register
with specifying the project in the Flow script, so we’d need to run the command only once for all projectsMaria
05/06/2022, 1:30 PMHenning Holgersen
05/06/2022, 2:05 PMAlvaro Durán Tovar
05/06/2022, 2:16 PMJai P
05/06/2022, 2:25 PMJai P
05/06/2022, 2:25 PM@task
def task_one():
return 1
@task
def task_two(results):
logger = get_run_logger()
<http://logger.info|logger.info>(results)
return 2
@flow
def test_flow():
results = {}
t1_result = task_one()
results["t1_result"] = t1_result
t2_result = task_two(results, wait_for=[t1_result])
results["t2_result"] = t2_result
appears to cause the flow to run indefinitely and i'm unclear why..Anna Geller
05/06/2022, 2:26 PMJai P
05/06/2022, 2:27 PM{"t1_result": PrefectFuture}
in task_two
though?Anna Geller
05/06/2022, 2:29 PMJai P
05/06/2022, 2:31 PM.result()
is called?t1_result
in task_two
in the above example, i wouldn't expect it to be stuck. fully agree if i planned to use it, i would have to do something like:
def task_two(results):
do_something_with(results["t1_result"].result())
given a list of prefect tasks, execute each one while passing the results of all previously completed tasks to the nextso given that, an example would be:
@task def t1
@task def t2
@task def t3
...
@flow
def my_flow():
tasks = [t1, t2, t3, t4]
results = {}
for task in tasks:
task_result = task(results)
results[task.name] = task_result
task_result = task(**results)
because prior task results (futures) need to be passed in as args/kwargsMichael Adkins
05/06/2022, 2:38 PMKevin Kho
05/06/2022, 2:38 PMMichael Adkins
05/06/2022, 2:38 PMJai P
05/06/2022, 2:41 PMMichael Adkins
05/06/2022, 2:41 PMJai P
05/06/2022, 2:46 PM@flow
def test_flow():
results = {}
t1_result = task_one()
results["t1_result"] = t1_result
t2_result = task_two(results, wait_for=[t1_result])
new_results = {**results}
new_results["t2_result"] = t2_result
successfully runsMichael Adkins
05/06/2022, 2:48 PMtask_two(results.copy(), …
also works yeahJai P
05/06/2022, 2:53 PMMichael Adkins
05/06/2022, 2:57 PMNone
or something. I’d probably just throw an exception if we find that.Jai P
05/06/2022, 2:58 PMMichael Adkins
05/06/2022, 2:58 PMJai P
05/06/2022, 3:01 PM.copy()
is a totally reasonable workaround. it won't affect execution time significantly because we don't expect the list of tasks to be extremely longMichael Adkins
05/06/2022, 3:02 PMSequentialTaskRunner
if you’re not using concurrency.Jai P
05/06/2022, 3:34 PMdict
approach. i guess if the consequence of that of that is that each subsequent task relies on the completion of every task before it, it is essentially sequential