Sean Leakehe
12/13/2021, 10:33 PMfunc4
to have an upstream dependency on func1
, it executes at the same time as func2
, as expected. However, if we make func4
a mapped task with the same upstream dependency on func1
, it doesn’t actually execute until after func3
has finished. Is there a way to get mapped tasks to run in parallel?Sean Leakehe
12/13/2021, 10:33 PMJohn Muehlhausen
12/14/2021, 12:54 AMprefect agent local start {...}
Tilak Maddy
12/14/2021, 10:30 AMJeremiah Lethoba
12/14/2021, 12:23 PMTom Shaffner
12/14/2021, 3:24 PMPedro Machado
12/14/2021, 3:42 PMloguru
https://github.com/Delgan/loguru
It's used inside of a task. When I run the flow locally, I see the loguru output but when it runs remotely (ECS), I don't see the logs in Prefect Cloud. How could I get it to write to the Prefect logs?
Also, how should I set up my ECS flow to get it to write the container logs to cloudwatch?
Thanks!Leanna Morinishi
12/14/2021, 5:39 PMquery_list
is a list of str
, each of which is a valid query. Let’s say there’s 3 queries and each query gives me back 10 rows. This task works fine.
data = execute_test_query.map(query=query_list)
Now I want to transform a concatenated dataframe in its entirety.
@task
def n_rows(df):
rows = df.shape[0]
print(f"There are {rows} rows!")
return rows
I was expecting data_2 = n_rows.map(flatten(data))
to give "There are 30 rows!"
, but I get key errors. Any idea what I need to do to flatten data
?Jason Motley
12/14/2021, 6:27 PMRoyzac
12/14/2021, 7:16 PMDavid Yang
12/14/2021, 8:23 PMKevin Kho
12/14/2021, 8:36 PMAmanda Wee
12/14/2021, 8:51 PMPedro Machado
12/14/2021, 10:22 PMS3Result
with a target
is configured for this task so the output of the API lands in S3 in json format.
• A pipe in Snowflake will detect and load these files
My question: is relying on the Prefect results an acceptable way to save data to S3 or should I have an explicit task that will handle writing to S3?
In this approach (S3Results), will the data stay in memory until the end of the process or will the memory be released when the data is written to S3?Matt Denno
12/14/2021, 11:40 PMPREFECT__CLOUD__HEARTBEAT_MODE
Or is there an equivalent env for server, like this:
PREFECT__SERVER__HEARTBEAT_MODE
Thanks,
MattJacob Blanco
12/15/2021, 5:48 AMChun Shen Wong
12/15/2021, 7:48 AMChun Shen Wong
12/15/2021, 7:49 AMhaf
12/15/2021, 8:59 AMThomas Furmston
12/15/2021, 10:39 AMThomas Furmston
12/15/2021, 10:39 AMMPIJob
(from the https://github.com/kubeflow/mpi-operator library) given in the body of the task. Is that roughly correct?Thomas Furmston
12/15/2021, 10:39 AMJelle Vegter
12/15/2021, 1:28 PMTom Shaffner
12/15/2021, 1:56 PMJason Motley
12/15/2021, 3:25 PMTilak Maddy
12/15/2021, 3:29 PMKevin Kho
12/15/2021, 3:47 PMEnda Peng
12/15/2021, 4:13 PMAdam Roderick
12/15/2021, 4:45 PMTao Bian
12/15/2021, 4:56 PMfrom prefect.client.client import Client
client = Client()
client.graphql(query)
Tao Bian
12/15/2021, 4:56 PMfrom prefect.client.client import Client
client = Client()
client.graphql(query)
Anna Geller
12/15/2021, 4:59 PMimport requests
query = """
mutation {
create_flow_run(input: { version_group_id: "fb919cc4-7f74-4ed7-9f3c-71bdd9be69e8" }) {
id
}
}
"""
url = "<https://api.prefect.io>"
response = <http://requests.post|requests.post>(
url, json={"query": query}, headers={"authorization": "Bearer XXX"}
)
print(response.status_code)
print(response.text)
Kevin Kho
12/15/2021, 4:59 PMprefect auth login --key API_KEY
, then the Client will pull that API key. You can also pass it into the Client()
. You can use client.create_flow_run()
instead of the client.graphql()
. You can also use create_flow_run
in our task library which is much more flexible. Just invoke it by calling .run()
create_flow_run.run(flow_name="name", project_name="project")
Tao Bian
12/15/2021, 6:33 PM