Martha Edwards
03/08/2022, 6:16 PMRoger Webb
03/08/2022, 9:42 PMMoises Vera
03/09/2022, 2:23 AMextract_data.py
flow file (config for the flow)
▪︎ here I import the tasks directory with a simple import tasks
◦ tasks
at the same level, a directory with my tasks
◦ When I register this flow it works correctly in schedule basis
• I just added a new flow calculate_something.py
• The tasks for this flow are in the tasks
directory too
• now when I want to run this new flow I get FlowStorageError('An error occurred while unpickling the flow:\n ModuleNotFoundError("No module named \'tasks\'")
What I don't get is... Why is it working for the first flow and not for this new one?
Any ideas? I appreciate itSuresh R
03/09/2022, 8:00 AMToby Rahloff
03/09/2022, 8:21 AMmanual_only
trigger. Is it possible to do the same with Orion (in the future)?Bruno Nunes
03/09/2022, 8:46 AMTomer Cagan
03/09/2022, 10:16 AMVadym Dytyniak
03/09/2022, 11:28 AM@task(result=S3Result(bucket='bucket-name'))
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({'col_1': ['1', '2', '3'], 'col_2': [1, 2, 3]})
ddf = dd.from_pandas(df, npartitions=1)
return ddf
@task()
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
raise ValueError("Checkpoint testing...")
Flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
How to be sure that on restart after failure(ValueError I added) accept_checkpointed_data task loads data from S3 and not using cached_inputs?
ThanksMartin T
03/09/2022, 12:22 PMPrefectSecret()
to load secrets from Prefect Cloud. Is it possible to load secret from cloud into prefect.context.secrets
during flow start so all tasks can access them when needed?
Most of our tasks variables are auth-related, and the code/layout is getting overly complex.Prasanth Kothuri
03/09/2022, 1:05 PMNo heartbeat detected from the remote task; marking the run as failed.
Michał
03/09/2022, 1:48 PMWesley Jin
03/09/2022, 4:31 PMproject
a create_flow_run
call creates a Flow for depending on environment variables? Example in threadChris Reuter
03/09/2022, 4:33 PMBrian Phillips
03/09/2022, 11:05 PMBrian Phillips
03/09/2022, 11:33 PMchild_ids_result = create_flow_run.map(
flow_name=unmapped(...),
project_name=unmapped(...),
parameters=parameters,
run_name=run_names,
)
wait_for_flow_run_result = wait_for_flow_run.map(
flow_run_id=child_ids_result,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
final_task(..., upstream_tasks=[wait_for_flow_run_result])
Kevin Otte
03/10/2022, 5:04 AMscrapers
> scraper1
> scraper2
prefect_flow.py
and scraper1/2 have Classes or methods that scrape specific websites. Am I able to leave those defined in those files, while referencing them in my flow.py file? It would look something like this..
import ...
@task
def scrape_site_one():
s = Scraper()
return s.scrape_site('www...')
....
with Flow("update historical pricing") as flow:
scrape_site_one()
Samuel Tober
03/10/2022, 8:19 AMTomer Cagan
03/10/2022, 8:38 AMFredrik Blomgren
03/10/2022, 9:47 AMLuuk
03/10/2022, 12:28 PMaz acr login --name acrname
logs out after a couple hours and requires me to do it again.
I also run the command within the docker image as well (for the docker agent)
It's all working, but after a while I get logged out from the ACR and my flow starts to crash.
Error message:
500 Server Error for http+docker://localhost/v1.41/images/create?tag=latest&fromImage=acrname.azurecr.io%2Fimage: Internal Server Error ("Head https://acrname.azurecr.io/v2/image/manifests/latest: unauthorized: authentication required")
Tomer Cagan
03/10/2022, 12:49 PMFuETL
03/10/2022, 12:59 PMset_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Scheduled()
)
DL
03/10/2022, 1:16 PMBrian Phillips
03/10/2022, 2:35 PMAn error occurred (ThrottlingException) when calling the RegisterTaskDefinition operation (reached max retries: 2): Rate exceeded
Constantino Schillebeeckx
03/10/2022, 3:29 PM@task
def sleep_5():
print(f'sleeping 5')
time.sleep(5)
print(f'done sleeping 5')
@task
def sleep_x(x):
print(f'sleeping {x}')
time.sleep(x)
print(f'done sleeping {x}')
with DHFlow("foo") as flow:
sleep_5()
sleep_x.map([7, 8])
when I execute the above, sleep_5
is run first; only after it finishes will sleep_x
run.Pedro Machado
03/10/2022, 4:26 PMNo heartbeat detected from the remote task; retrying the run.This will be retry 1 of 2.
However, the last message was written 12 hours ago. It does not look like the flow got retried at all.
Could someone help me figure out what may have happened? I am using ECS to run the flows.
The flow run is fed29ba9-66a6-4f0b-aa69-cf9db908bd58
I went ahead and canceled it but it does not offer me the option to restart it. How can I restart it and preserve the same flow run ID so that the tasks that succeeded are not executed again? Ideally, I'd rely on the task state but if they need to run again, I am hoping that caching will help avoid doing the work again. The cache key is based on the flow_run_id
.
Thanks!Olivér Atanaszov
03/10/2022, 4:45 PMn_workers
agents as kubernetes jobs similarly to this:
# Agent flow
with Flow("run-agent") as flow_agent:
ShellTask(agent_command) # launch agent computing tasks
@task
def run_agent(worker_id):
run_agent_flow_id = create_flow_run.run("run-agent",
name=f"run-agent_#{worker_id}")
return run_agent_flow_id
# Main flow
with Flow("main") as flow:
n_workers = Parameters("n_workers", default=2)
worker_ids = get_worker_ids(n_workers) # [0, 1]
run_agent.map(worker_id=worker_ids)
When the agents finish their tasks, for some reason the kubernetes jobs are not terminating. Inside the job's container apparently the agent's process terminates, but I see prefect execute flow-run
and prefect hearbeat flow-run -i ...
being stuck.Ling Chen
03/10/2022, 5:18 PMSarah Floris
03/10/2022, 6:49 PM