matta
03/27/2021, 2:14 AMSuccessful
. I've tried setting Reference tasks after the Flow def, I'd tried making a task that raises signals.SUCCESS()
and is downstream from the two main tasks (and I've tried setting the trigger to be all_finished
), nothing seems to work. The main element of it is is two mapped tasks.Ananthapadmanabhan P
03/27/2021, 4:33 PMPREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS='{"ACCESS_KEY": "<my_key_here>", "SECRET_ACCESS_KEY": "<my_secret_key_here>"}' python create_flow.py
And this is how i internally pass it down into the KubernetesRun method
job_env = {
"PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS":
os.getenv("PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"),
"PREFECT__BACKEND":
"server"
}
flow.run_config = KubernetesRun(env=job_env,
image="ananthutest/prefect-test:latest")
But when i do kubectl describe of the created pod/job in k8s, it shows PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS
under Environment
in plain text. Anyway I can avoid this?Trevor Kramer
03/27/2021, 6:59 PM@task()
def get_s3_location(workspace_bucket):
return f's3://{workspace_bucket}/{prefect.context.get("flow_run_name")}/'
Something like the simple addition prefect turns into tasks automatically within the flow?xyzy
03/27/2021, 8:53 PMxyzy
03/27/2021, 9:35 PMTrevor Kramer
03/27/2021, 10:59 PMSean Harkins
03/28/2021, 6:52 PMDaskExecutor
. When running this example Flow
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
with Flow(
"dask-test-flow",
storage=storage.S3(
bucket=storage_bucket
),
run_config=ECSRun(
image=worker_image,
labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
task_definition=definition,
),
) as flow:
hello_result = say_hello()
I am able to view log information (specifically the “Hello, Cloud” info line) directly in Prefect Cloud. But using an identical flow with a DaskExecutor
logging information is sent to my CloudWatch dask-ecs
log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?bral
03/28/2021, 6:52 PMJeremy Tee
03/29/2021, 8:56 AMfrom prefect import Task, Flow
from prefect.tasks.prefect import StartFlowRun
class ExtractSomeData(Task)
def run(self):
return {"param-key": "some-random-piece-of-data"}
extract_some_data = ExtractSomeData(name="extract_some_data")
# assumes you have registered a flow named "exampled" in a project named "examples"
flow_run = StartFlowRun(flow_name="example", project_name="examples")
with Flow("parent-flow") as flow:
flow_run.set_upstream(extract_some_data, key="parameters")
but i am wondering what is the parameters
? and whether the param-key
from ExtractSomeData
is passed into the example flow
Christoph Wiese
03/29/2021, 1:07 PMSean Talia
03/29/2021, 2:07 PMShellTask
source because I was trying to see if there's a way to stream the task output when the log level is INFO
. Is it possible? I have a task whose output i'd like to regularly stream to my logs, but I don't want to have the entire flow running in debug modeJeff Williams
03/29/2021, 2:17 PMSamuel Hinton
03/29/2021, 2:48 PM@task(timeout=60, nout=2)
def get_date(
dt,
ensure_explicit=False,
max_late=timedelta(hours=12),
):
# some code
return None, None
However, when I try and add to my flow a get_date(dt, max_late=timedelta(minutes=1))
I get TypeError: got an unexpected keyword argument
. I assume the task decorator is wrapping the original function and doesnt pass through any kwargs it doesnt recognise.
Is there a subtle keywork Im missing to pass kwargs through, or should I rewrite these functions of mine to utilise class based Tasks?Atalya
03/29/2021, 3:09 PMJoseph Ellis
03/29/2021, 3:16 PMJoseph Ellis
03/29/2021, 3:39 PMAmber Papillon
03/29/2021, 3:40 PMSean Talia
03/29/2021, 4:28 PMxyzy
03/29/2021, 5:22 PMPREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS={"ACCESS_KEY": "myaccesskey", "SECRET_ACCESS_KEY": "mysecretaccesskey"}
Will Milner
03/29/2021, 10:42 PM--job-template
is an option when using start, but not when using install. I'm able to set up a volume on the agent, but I realized that won't actually make the volumes available to my flows when I run them.Trevor Kramer
03/30/2021, 2:39 AMEnrique Plata
03/30/2021, 7:54 AMJ. Martins
03/30/2021, 9:04 AMGopinath Jaganmohan
03/30/2021, 10:40 AMxyzz
03/30/2021, 2:44 PMAdam Lewis
03/30/2021, 3:18 PMMarwan Sarieddine
03/30/2021, 3:24 PMflow.run()
and flow.visualize(flow_state=flow_state)
is there a way to display the duration of the tasks ?Trevor Kramer
03/30/2021, 6:18 PMwith case(is_rapids(input), True):
export_result = ExportsTask()(secret_task, 'somekey', stack)
with case(is_rapids(input), False):
export_result = ExportsTask()(secret_task, 'anotherkey', stack)
MyTask()(input, export_result['training_job_queue'], export_result['training_job_definition'], PipelineArtifactsBucketTask()(secret_task, stack))
Cab Maddux
03/30/2021, 6:36 PMSean Harkins
03/30/2021, 7:43 PMDaskExecutor
is dynamically creating these temporary clusters https://github.com/PrefectHQ/prefect/blob/05cac2372c57a93ea72b05e7c844b1e115c01047/src/prefect/executors/dask.py#L213 on a per flow basis so I’m unsure how I can obtain the scheduler’s address (and dashboard link) without some hook here in the Prefect code?
In an ideal world I would like to have the Dask scheduler’s public address and dashboard link reported as part of the Flow logs in Prefect UI. Any suggestions are greatly appreciated.Sean Harkins
03/30/2021, 7:43 PMDaskExecutor
is dynamically creating these temporary clusters https://github.com/PrefectHQ/prefect/blob/05cac2372c57a93ea72b05e7c844b1e115c01047/src/prefect/executors/dask.py#L213 on a per flow basis so I’m unsure how I can obtain the scheduler’s address (and dashboard link) without some hook here in the Prefect code?
In an ideal world I would like to have the Dask scheduler’s public address and dashboard link reported as part of the Flow logs in Prefect UI. Any suggestions are greatly appreciated.Jim Crist-Harif
03/30/2021, 8:18 PMdistributed.dashboard.link
(https://docs.dask.org/en/latest/configuration-reference.html#distributed.dashboard.link) which can template that out. If you're fine setting that up properly, then I'd be happy to add a log line with the dashboard link during dask executor startup.Sean Harkins
03/30/2021, 9:05 PMhost
information used here. I believe the scheduler.address will report the host
from the private subnet range https://github.com/dask/distributed/pull/3429/files. For example, my scheduler logs show the private ip rather than the public ENI.
distributed.scheduler - INFO - Scheduler at: <tcp://10.0.115.40:8786>
distributed.scheduler - INFO - dashboard at: :8787
I guess this might be a deeper question on how to report the public ip for the container where the scheduler is running rather than it’s private ip.Jim Crist-Harif
03/30/2021, 9:09 PMSean Harkins
03/30/2021, 9:12 PMdask.config.set({"distributed.dashboard.link": "http://{host}:{port}/status"})
won’t my resulting dashboard_link be <http://10.0.114.40:8786/status>
?Jim Crist-Harif
03/30/2021, 9:14 PMSean Harkins
03/30/2021, 9:23 PMdask-cloudprovider
in more detail, it looks like much of this logic is already handled. https://github.com/dask/dask-cloudprovider/blob/main/dask_cloudprovider/aws/ecs.py#L190-L213 But given this, you should be able to use dashboard_link
and assume the correct address. Where would this be logged? In the Flow log directly?Jim Crist-Harif
03/30/2021, 9:42 PMSean Harkins
03/30/2021, 9:43 PMJim Crist-Harif
04/01/2021, 1:54 AMSean Harkins
04/01/2021, 1:56 AMpangeo-forge
for context.Jim Crist-Harif
04/01/2021, 2:01 AMMarvin
04/01/2021, 2:02 AMJim Crist-Harif
04/01/2021, 2:03 AM