Kelby
05/04/2021, 1:51 PMNicholas Chammas
05/04/2021, 4:04 PMeli
05/04/2021, 5:55 PMSuraj Mittal
05/04/2021, 7:34 PMAlex Furrier
05/04/2021, 8:14 PMfeather
format and sometimes ran into errors with HDF
as wellJosh Lowe
05/04/2021, 10:19 PMMessage: [{'path': ['set_flow_run_states'], 'message': '[{\'extensions\': {\'path\': \'$\', \'code\': \'data-exception\'}, \'message\': \'invalid input syntax for type uuid: "3fca7934-6af8-45f7-aae4-ea6fec0339c9:FargateAgent:agent:<agent_name>"\'}]', 'extensions': {'code': 'INTERNAL_SERVER_ERROR'}}]
No changes have been made to our platform, and it looks like some flows are able to be run, but then will fail again on subsequent runs. Anyone seen anything like this before?
FargateAgent is deprecated, and we have plans to update and move to ECS agent - but I haven't seen any updates in a while that might be causing thisRajdeep Rao
05/04/2021, 11:18 PM/bin/sh -c ./bash-script.sh prefect execute flow-run
? I would like to run this bash script that basically exports all my env vars that are in a file. Since we use dot-env to run our services, my base docker image already has all my env vars I just need to export them while my tasks spin up.
I tried overriding them with run-task-kwargs, in the ECSRunconfig() and by passing in a custom task_def, but for some reason my tasks don't respect theseBrian Keating
05/05/2021, 12:13 AM.map
. I would like to:
1. start a batch of worker instances
2. download data on each instance
3. run a job on each instance
obviously, the job for a given instance shouldn't start until the data is downloaded onto that instance. however, I don't want to wait until the data is downloaded onto all of the instances before kicking off jobs.Hui Zheng
05/05/2021, 1:09 AMAdam Brusselback
05/05/2021, 3:25 AMAdam Brusselback
05/05/2021, 3:27 AM@task
def load_context(config):
prefect.context.setdefault("secrets", {})
for name in config.options('Default'):
string_value = config.get('Default', name)
prefect.context.secrets[name] = string_value
Adam Brusselback
05/05/2021, 3:28 AMAdam Brusselback
05/05/2021, 3:29 AMFabrice Toussaint
05/05/2021, 9:44 AMGiovanni Giacco
05/05/2021, 9:55 AMDaskExecutor
, inside a task in order to execute any workload on that Dask cluster? In our tasks we have to deal with huge pandas dataframe and we’d like to use the same Dask cluster to parallelize our computation. Any tip?Robin
05/05/2021, 10:59 AMadapt_kwargs
project 2: another flow fails before really starting
• with 100 task concurrency
• 2 GB memory requested
• pods are not always properly teared down after flow termination, neither with adapt_kwargs
nor with n_workers
in KubernetesRun
Errors in project 2 flows:
• Unexpected error: KeyError('data')
• arrange error
• futures
All flows ran properly a couple of days ago and there were only minor changes.
Commonalities:
• We have one prefect agent running on an EKS cluster with prefect cloud
• both flows share the same prefect agent
• both flows use ECR
Discrepancies:
• both flows have different base images
• the flows are in different projects
Further interesting behavior:
• prefect jobs get created but error out even when now flow is runningRobin
05/05/2021, 1:06 PMbattery_id_generator
) 🎉
However, the mapped tasks did not persist any results to S3. 🤔
Do I need to specify something for the maps to also persist the results?ciaran
05/05/2021, 1:36 PMKubernetesRun
amd DaskExecutor (KubeCluster)
on AKS.
I submit my flow and it runs, I can see the Dask Scheduler pod stand up, but then nothing happens.
For example, the flow image provided shows a simple flow that has been running for 17(now 20)+ hours 😮
You can find the flow definition here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/flow_test/manual_flow.py
And you can find the agent conf here https://github.com/pangeo-forge/pangeo-forge-azure-bakery/blob/add-k8s-cluster/prefect_agent_conf.yamlRobin
05/05/2021, 1:56 PMtask map input parameters
to the task result location templates
?
e.g. "{flow_name}/{task_name}/{system_id}"
where system_id
is an input parameter to the task run (e.g. task.map(system_id=system_ids, further_input=unmapped(further_input))
)?
I would use it instead of {task_run_id}
, as the task_run_id
might change with every flow run, but the system_id
would be always the same 🤔
Or is there a smarter approach to achieving the same?Zach Schumacher
05/05/2021, 3:41 PMRobin
05/05/2021, 3:46 PMfrom prefect.engine.cache_validators import all
version = "0_1_17"
s3_result_battery_id = S3Result(
bucket="accure-prefect-results",
location=f"{{flow_name}}/{version}{{task_name}}/{{battery_id}}.prefect",
)
import datetime
@task(
result=s3_result_battery_id,
# max_retries=max_retry,
# retry_delay=retry_delay,
# timeout=300,
log_stdout=True,
tags=["analysis", "task"],
cache_validator=prefect.engine.cache_validators.all_parameters,
cache_for=datetime.timedelta(hours=48),
)
However, it seems like the failed task runs aren't rerun either, as the inputs have not changed and therefore the results are read from the cache.
How to explicitly tell prefect to rerun those tasks, which have previously failed? Or am I missing something?
Bonus:
• Is that actually a good philosophy?
• I thought of introducing the package release version (simply "version" in above code snippet)
into the mix
◦ so that one can control with the package release version when to rerun all tasks
◦ e.g. due to some feature enhancements in an entirely new flow run, as opposed to just some bugfixes within the same flow release
• note: package release version != prefect flow version
• In general we are not yet quite sure how to best handle versioning and staging with prefect 🤔Dimosthenis Schizas
05/05/2021, 3:47 PMJames Brink
05/05/2021, 3:59 PMJames Brink
05/05/2021, 4:05 PMimport prefect
from prefect import task, Flow
from prefect.engine.signals import RETRY, FAIL, PAUSE
from datetime import datetime, timedelta
@task
def start_collection(timeout_interval, collection_name):
invoke_outside_service(collection_name)
timeout_time = datetime.now() + timedelta(seconds=timeout_interval)
return timeout_time
#can I use code to define max retries? ideally 'max_retries=timeout_interval//600 + 2' to retry every 5 minutes until timeout period ends
@task(max_retries=50, retry_delay=timedelta(seconds=300))
def check_collection_status(timeout_time, collection_name):
status = poll_outside_service_status(collection_name)
if status == 'complete':
return '<s3://nitorum-data/>' + collection_name + '/' + datetime.now(
).strftime('%Y-%m-%d') + '.csv'
elif status == 'failed':
raise FAIL(message='Collection failed.')
elif status == 'running':
if datetime.now() > timeout_time:
raise PAUSE(message='Collection timed out.')
else:
raise RETRY(message='Collection still running.')
@task
def load_data(path):
data = download_data_from_s3(path)
return data
@task
def transform_task(data):
data = transform(data)
return data
@task
def save_data(data):
save_to_database(data)
with Flow("ETL with Outside Service Data Collection") as flow:
timeout_interval = Parameter("timeout_interval", default=3600)
collection_name = Parameter("collection_name", default="colleciton_1")
timeout_time = start_collection(timeout_interval, collection_name)
path = check_collection_status(timeout_time, collection_name)
data = load_data(path)
data = transform_task(data)
save_data(data)
Adam Brusselback
05/05/2021, 4:25 PMAdam Brusselback
05/05/2021, 4:27 PMUserWarning: A Task was passed as an argument to PostgresExecute, you likely want to first initialize PostgresExecute with any static (non-Task) arguments, then call the initialized task with any dynamic (Task) arguments instead. For example:
my_task = PostgresExecute(...) # static (non-Task) args go here
res = my_task(...) # dynamic (Task) args go here
see <https://docs.prefect.io/core/concepts/flows.html#apis> for more info.
Adam Brusselback
05/05/2021, 4:27 PMcould not translate host name "<Task: get_value>" to address: Unknown host
Adam Brusselback
05/05/2021, 4:29 PMwith Flow("test") as flow:
client_name = Parameter('client_name', default='client_demo')
client_context = get_client_context("C:/.secrets/client/", client_name)
db_exec = PostgresExecute(
host=get_value(client_context, 'oltp_host'), db_name=get_value(client_context, 'oltp_database'), user=get_value(client_context, 'oltp_user'), query="SELECT do_something()"
)
result = db_exec(password=get_value(client_context, 'oltp_pass'))
bastianwegge
05/05/2021, 5:53 PMPREFECT__CLOUD__AUTH_TOKEN
is a bit too hidden when you’re approaching from this side. I had to dig into the Storage Registration Code (Prefect library internals) to find that the Client takes this variable (if it is specified) and uses it for registration purposes on prefect Cloud. Just a small thing to improve in the API-docs I guess that might make it a lot easier to get up and running.Todd Lindstrom
05/05/2021, 5:55 PM