xyzy
03/27/2021, 9:35 PMTrevor Kramer
03/27/2021, 10:59 PMSean Harkins
03/28/2021, 6:52 PMDaskExecutor
. When running this example Flow
@task
def say_hello():
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("Hello, Cloud")
return "hello result"
with Flow(
"dask-test-flow",
storage=storage.S3(
bucket=storage_bucket
),
run_config=ECSRun(
image=worker_image,
labels=json.loads(os.environ["PREFECT_AGENT_LABELS"]),
task_definition=definition,
),
) as flow:
hello_result = say_hello()
I am able to view log information (specifically the āHello, Cloudā info line) directly in Prefect Cloud. But using an identical flow with a DaskExecutor
logging information is sent to my CloudWatch dask-ecs
log group but is not available in Prefect Cloud. Is it possible to view all Dask worker logging in Prefect Cloud and if so, what additional logging config to I need to include?bral
03/28/2021, 6:52 PMJeremy Tee
03/29/2021, 8:56 AMfrom prefect import Task, Flow
from prefect.tasks.prefect import StartFlowRun
class ExtractSomeData(Task)
def run(self):
return {"param-key": "some-random-piece-of-data"}
extract_some_data = ExtractSomeData(name="extract_some_data")
# assumes you have registered a flow named "exampled" in a project named "examples"
flow_run = StartFlowRun(flow_name="example", project_name="examples")
with Flow("parent-flow") as flow:
flow_run.set_upstream(extract_some_data, key="parameters")
but i am wondering what is the parameters
? and whether the param-key
from ExtractSomeData
is passed into the example flow
Christoph Wiese
03/29/2021, 1:07 PMSean Talia
03/29/2021, 2:07 PMShellTask
source because I was trying to see if there's a way to stream the task output when the log level is INFO
. Is it possible? I have a task whose output i'd like to regularly stream to my logs, but I don't want to have the entire flow running in debug modeJeff Williams
03/29/2021, 2:17 PMSamuel Hinton
03/29/2021, 2:48 PM@task(timeout=60, nout=2)
def get_date(
dt,
ensure_explicit=False,
max_late=timedelta(hours=12),
):
# some code
return None, None
However, when I try and add to my flow a get_date(dt, max_late=timedelta(minutes=1))
I get TypeError: got an unexpected keyword argument
. I assume the task decorator is wrapping the original function and doesnt pass through any kwargs it doesnt recognise.
Is there a subtle keywork Im missing to pass kwargs through, or should I rewrite these functions of mine to utilise class based Tasks?Atalya
03/29/2021, 3:09 PMJoseph Ellis
03/29/2021, 3:16 PMJoseph Ellis
03/29/2021, 3:39 PMAmber Papillon
03/29/2021, 3:40 PMSean Talia
03/29/2021, 4:28 PMxyzy
03/29/2021, 5:22 PMPREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS={"ACCESS_KEY": "myaccesskey", "SECRET_ACCESS_KEY": "mysecretaccesskey"}
Will Milner
03/29/2021, 10:42 PM--job-template
is an option when using start, but not when using install. I'm able to set up a volume on the agent, but I realized that won't actually make the volumes available to my flows when I run them.Trevor Kramer
03/30/2021, 2:39 AMEnrique Plata
03/30/2021, 7:54 AMJ. Martins
03/30/2021, 9:04 AMGopinath Jaganmohan
03/30/2021, 10:40 AMxyzz
03/30/2021, 2:44 PMAdam Lewis
03/30/2021, 3:18 PMMarwan Sarieddine
03/30/2021, 3:24 PMflow.run()
and flow.visualize(flow_state=flow_state)
is there a way to display the duration of the tasks ?Trevor Kramer
03/30/2021, 6:18 PMwith case(is_rapids(input), True):
export_result = ExportsTask()(secret_task, 'somekey', stack)
with case(is_rapids(input), False):
export_result = ExportsTask()(secret_task, 'anotherkey', stack)
MyTask()(input, export_result['training_job_queue'], export_result['training_job_definition'], PipelineArtifactsBucketTask()(secret_task, stack))
Cab Maddux
03/30/2021, 6:36 PMSean Harkins
03/30/2021, 7:43 PMDaskExecutor
is dynamically creating these temporary clusters https://github.com/PrefectHQ/prefect/blob/05cac2372c57a93ea72b05e7c844b1e115c01047/src/prefect/executors/dask.py#L213 on a per flow basis so Iām unsure how I can obtain the schedulerās address (and dashboard link) without some hook here in the Prefect code?
In an ideal world I would like to have the Dask schedulerās public address and dashboard link reported as part of the Flow logs in Prefect UI. Any suggestions are greatly appreciated.Bose
03/30/2021, 9:45 PMRob Fowler
03/31/2021, 3:16 AMRob Fowler
03/31/2021, 3:17 AMxyzy
03/31/2021, 3:44 AMchain(
extract(from_date, to_date, base_currency),
transform,
load
)
Instead of this:
extract_result = extract(from_date, to_date, base_currency)
transform_result = transform(extract_result)
load(transform_result)
?