John Ramey
03/24/2022, 3:34 PM<http://pandas.to|pandas.to>_sql
. Apologies if this is answered in docs somewhere, but how do I pass my secrets to Prefect Cloud? I have 2 cases. In the first case I can run the script locally and pass the secrets as env vars, but in the second case, I’m hoping to run the job on Prefect Cloud once a week. Any help would be great! Thanks.Ken Nguyen
03/24/2022, 4:29 PMversion_group_id
instead of flow_id
in my GQL query? I tried to simply swap out flow_id
for version_group_id
, but that’s giving me the following errorAnders Segerberg
03/24/2022, 4:39 PMMathijs Miermans
03/24/2022, 4:41 PMflatten
expected to work on a dict
?
from prefect import Flow, task, context, flatten
@task
def A():
return {i: list(range(i)) for i in range(3)}
@task
def B(y):
logger = context.get("logger")
<http://logger.info|logger.info>(y)
with Flow('flat map') as f:
a = A() # {0: [], 1: [0], 2: [0,1]}
b = B.map(flatten(a))
if __name__ == "__main__":
f.run()
I'm getting an unexpected error:
ERROR - prefect.flat map | Unexpected error occured in FlowRunner: KeyError(3)
Diego Oliveira
03/24/2022, 6:32 PMKevin Mullins
03/24/2022, 7:47 PMLocalDaskExecutor
with 32 threads. I’ve verified in logs and the schematic that the Discover
task should be ready to go as all it’s upstream tasks are complete; however, it still waits around for the unrelated Sync
task to complete before starting.
Was hoping someone might have an idea how to figure out what’s going on. I’ve created another example flow that had a bunch of unrelated root/child tasks and verified it seems to behave as I would expected, but haven’t tried to reproduce this with a test flow that uses mapping yet. I’ll attach a screenshot that visually shows (hopefully) what I’m talking about.Gabriel Milan
03/24/2022, 8:25 PMcontext
, but I can't seem to find it. I'm trying to use the prefect.tasks.prefect.create_flow_run
using the very same labels as the parent flow, maybe there's a shortcut for it.Matthew Seligson
03/24/2022, 8:36 PMJohn Ramey
03/24/2022, 8:54 PMprefect.context.get("logger")
. Does a logger have to be defined within each task? This gets a bit repetitive if I have 5-10 tasks within a flow. Is there a more concise way to define loggers across tasks?
import prefect
@task
def my_task():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("An info message.")
logger.warning("A warning message.")
Lee Briggs
03/24/2022, 9:16 PMflow.register("pulumi_test", set_schedule_active*=False*)
it sets the flow to only run locally. More in 🧵Denis Pokotylyuk(External)
03/24/2022, 9:46 PMquery GetSchedules {
flow_group(where: {id: {_eq: "eb595947-xxxxxx"}}) {
schedule
}
}
Jake
03/24/2022, 9:48 PMAndrew Huang
03/24/2022, 10:33 PMfrom prefect.engine.state import Skipped
in orion? Maybe Cancelled
? https://orion-docs.prefect.io/concepts/states/#state-details
How do I import it?
from prefect.states import Cancelled
doesn’t seem to workSuresh R
03/25/2022, 1:12 AMChris L.
03/25/2022, 6:22 AMFuture
. However, before I continue down this async rabbit hole, I'm wondering if there are any gotchas (or docs or source code) I should look into? In particular, regarding how to use PrefectFutures in mixed async / sync flows and tasks. Moreover, will I be able to use `asnycio`'s synchronisation primitives (https://docs.python.org/3/library/asyncio-sync.html) with PrefectFutures? Thanks in advance! 🙌R Zo
03/25/2022, 8:09 AMBBPP
03/25/2022, 1:33 PMMatthias
03/25/2022, 1:49 PMAnna Geller
Joe Goldbeck
03/25/2022, 5:16 PMMadison Schott
03/25/2022, 5:45 PMMyles Steinhauser
03/25/2022, 5:58 PMError during execution of task: ValueError('Received both `flow_id` and `flow_name`. Only one flow identifier can be passed.')
More details in thread.Milton
03/25/2022, 6:25 PM--name
, it will assume the default name Kubernetes
. This is okay when you only run one replica, but when you increase the replica number to 2, both will take the default name and in the Prefect UI, the two agents are treated as the same agent. So what is the recommended way to deploy multiple agents for HA purposes in Kubernetes?Wei Mei
03/25/2022, 7:12 PMHenry
03/25/2022, 7:24 PMPatrick Tan
03/25/2022, 7:28 PMDanny Vilela
03/25/2022, 7:29 PMValueError
) is it possible to recover that traceback/error from a state handler? Right now I can say “this task failed and will be retrying in X minutes”, but not “this task failed for reason Y and will retry in X minutes”. Or can we pass keyword arguments to a state handler with this signature:
# What I have now.
def notify_on_retry(task: Task, old_state: State, new_state: Retrying) -> State: ...
# Maybe what I want?
def notify_on_retry(task: Task, old_state: State, new_state: Retrying, message: str) -> State: ...
# Alternative?
def notify_on_retry(task: Task, old_state: State, new_state: Retrying, **kwargs) -> State: ...
Harry Baker
03/25/2022, 8:45 PM@task()
def flow_run_helper(flow_name, project_name):
cfr = create_flow_run(flow_name=flow_name, project_name=project_name)
wfr = wait_for_flow_run(cfr, stream_logs=True, raise_final_state=True)
return wfr
but its yelling at me about "ValueError: Could not infer an active Flow context while creating edge". my app does a lot of chaining of flows, so i wanted to streamline thisLee Briggs
03/25/2022, 9:27 PMDominic Pham
03/26/2022, 12:22 AM