Akash Rai
03/18/2021, 7:47 AMHawkar Mahmod
03/18/2021, 12:53 PMprint(state.result[baseline_fractions]._result)
. All that happens is that my dir
provided location produces the directories and then an empty folder. What am I missing here?Marwan Sarieddine
03/18/2021, 2:53 PMnot_all_skipped
trigger, it seems the naming might be a bit not intuitive for me, or I might be missing something - please see the example in the thread (any help would be appreciated)Zach Hodowanec
03/18/2021, 3:33 PMrun_config
parameters on a Kubernetes Agent for subsequent flows to consume rather than duplicating similar run_configs
across various flows. We currently make use of the PREFECT__CLOUD__AGENT__ENV_VARS
to pass along storage
configurations, but not having much success attempting to update the execution environment to use an internal custom image. I have tried passing the IMAGE
and the PREFECT__CONTEXT__IMAGE
environment variables to my job spec thus far to no avail.Chris Jordan
03/18/2021, 4:13 PMLuis Gallegos
03/18/2021, 4:38 PMimport prefect
import pendulum
cron_now = pendulum.now()
str_date = cron_now.strftime('%Y%m%d_%H%M%S')
custom_schedule = CronSchedule("0 9 * * 0", start_date=cron_now)
def slack(text):
data = '{"channel":"XXX","text":"%s: %s"}' % (str_date, text)
@task
def task():
## do something
##call_slack
slack("Hello")
with Flow("fact_czenk", schedule=custom_schedule) as flow:
task = task()
flow.register()
Samuel Hinton
03/18/2021, 5:19 PMdh
03/18/2021, 7:53 PMflow.register
context: we want to create a dynamically defined flow (e.g. Flow(result=S3Result(location=f'bucket/{<http://args.my|args.my>_result_key}', ...)
for flow reuse. To parse user-pass args, we instantiate the Flow behind __main__
. Now we noticed we can’t use prefect cli to register because it can’t pass extra user args. Alternatively, we are considering flow.register
and wonder if there would be any risks we should be aware of.
[1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/register.py#L70Massoud Mazar
03/19/2021, 1:37 AM0.14.6
to 0.14.12
and since then I see even when idle, CPU hovers between 10%-50%.
docker stats --all --format "table {{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"
shows the following:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT
c6eda45c6f59 tmp_ui_1 0.00% 4.953MiB / 7.432GiB
2466edd82357 tmp_towel_1 0.00% 52.57MiB / 7.432GiB
e9d0643d134e tmp_apollo_1 2.69% 62.68MiB / 7.432GiB
ad41dcf8646a tmp_graphql_1 3.24% 69.5MiB / 7.432GiB
a8f65bcb2efa tmp_hasura_1 4.62% 153.8MiB / 7.432GiB
bc9742482bd8 tmp_postgres_1 4.64% 27.86MiB / 7.432GiB
Jay Sundaram
03/19/2021, 1:55 AMprefect backend server
prefect server start
prefect create project etl-project
prefect agent local start --label etl-label
prefect register flow --file simple_flow.py --name a-simple-etl-flow -l etl-label -p etl-project
In the UI, I can click on QUICK RUN and observe the flow execute.
Next , in another simple script named start_flow_run.py:
from prefect.tasks.prefect.flow_run import StartFlowRun
kickoff_task = StartFlowRun(
project_name='etl-project',
flow_name='a-simple-etl-flow'
)
which I execute like this:
python start_flow_run.py
But nothing happens.
The agent doesn't detect it; no activity in the UI.
I was expecting my registered flow named 'a-simple-etl-flow' to execute.
Please advise. Thanks.Sven Teresniak
03/19/2021, 8:22 AMJacopo Tagliabue
03/19/2021, 10:11 AMfrom prefect import task, Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
# Define checkpoint task
validation_task = RunGreatExpectationsValidation()
with Flow("ge_test") as flow:
validation_task(checkpoint_name='gitter_checkpoint')
flow.run()
Samuel Hinton
03/19/2021, 11:14 AMon_failure
callback of a flow to send a message to a slack channel that ideally looks something like “OH MAN THE FLOW FAILED - Click here to see the flow” with a proper link. Does the flow object contain any information I can use to construct a useful URL, specifically the flow_id?Tim Enders
03/19/2021, 2:40 PMflatten
to gather all of the results back up. When RETRY
is raised I am getting the following: TypeError: object of type 'RETRY' has no len()
Tim Enders
03/19/2021, 2:40 PMMarwan Sarieddine
03/19/2021, 5:13 PMZach Hodowanec
03/19/2021, 5:59 PMPYTHONPATH
in my local environment, but not having much luck setting a similar ENV variable on the agent or job_spec. Any suggestions or documentation that might help get over this hump?
Repository Structure:
/src
--/flows
----/flow.py
--/tasks
----/task.py
Error Message: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src.tasks'")
Samuel Hinton
03/19/2021, 6:02 PMRenzo Becerra
03/19/2021, 6:44 PMprefect agent ecs start --cluster my-cluster-arn --launch-type EC2
botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User: arn:aws:iam::**********:user/********* is not authorized to perform: ecs:RegisterTaskDefinition on resource: *
Tim Enders
03/19/2021, 6:58 PMJulie Sturgeon
03/19/2021, 7:42 PMChris White
03/19/2021, 8:03 PMhttps://youtu.be/EwsMecjSYEU▾
Trevor Kramer
03/20/2021, 11:54 PMEspen Overbye
03/21/2021, 3:32 PMEspen Overbye
03/22/2021, 6:51 AMVincent Chéry
03/22/2021, 9:46 AMChris Bowdon
03/22/2021, 3:53 PMprefect
or server
projects, so wondered if maybe I'm just misunderstanding something about how it's supposed to work. Is anyone aware of this already?Nathan Walker
03/22/2021, 6:02 PMTrevor Kramer
03/22/2021, 7:22 PMstandardizer_task = submit_standardizer_job(files, bucket, 'mcule', version)
standardizer_wait_task = AWSClientWait(client='batch', waiter_name='JobComplete', max_retries=2, retry_delay=datetime.timedelta(minutes=1))(waiter_kwargs={'jobs': [standardizer_task]})
Kelly Huang
03/22/2021, 8:11 PMKelly Huang
03/22/2021, 8:11 PMKyle Moon-Wright
03/22/2021, 8:29 PMKelly Huang
03/22/2021, 8:31 PMKyle Moon-Wright
03/22/2021, 8:33 PMKelly Huang
03/23/2021, 12:09 AM