Chris Jordan
03/18/2021, 4:13 PMLuis Gallegos
03/18/2021, 4:38 PMimport prefect
import pendulum
cron_now = pendulum.now()
str_date = cron_now.strftime('%Y%m%d_%H%M%S')
custom_schedule = CronSchedule("0 9 * * 0", start_date=cron_now)
def slack(text):
data = '{"channel":"XXX","text":"%s: %s"}' % (str_date, text)
@task
def task():
## do something
##call_slack
slack("Hello")
with Flow("fact_czenk", schedule=custom_schedule) as flow:
task = task()
flow.register()
Samuel Hinton
03/18/2021, 5:19 PMdh
03/18/2021, 7:53 PMflow.register
context: we want to create a dynamically defined flow (e.g. Flow(result=S3Result(location=f'bucket/{<http://args.my|args.my>_result_key}', ...)
for flow reuse. To parse user-pass args, we instantiate the Flow behind __main__
. Now we noticed we can’t use prefect cli to register because it can’t pass extra user args. Alternatively, we are considering flow.register
and wonder if there would be any risks we should be aware of.
[1]: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/cli/register.py#L70Massoud Mazar
03/19/2021, 1:37 AM0.14.6
to 0.14.12
and since then I see even when idle, CPU hovers between 10%-50%.
docker stats --all --format "table {{.ID}}\t{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}"
shows the following:
CONTAINER ID NAME CPU % MEM USAGE / LIMIT
c6eda45c6f59 tmp_ui_1 0.00% 4.953MiB / 7.432GiB
2466edd82357 tmp_towel_1 0.00% 52.57MiB / 7.432GiB
e9d0643d134e tmp_apollo_1 2.69% 62.68MiB / 7.432GiB
ad41dcf8646a tmp_graphql_1 3.24% 69.5MiB / 7.432GiB
a8f65bcb2efa tmp_hasura_1 4.62% 153.8MiB / 7.432GiB
bc9742482bd8 tmp_postgres_1 4.64% 27.86MiB / 7.432GiB
Jay Sundaram
03/19/2021, 1:55 AMprefect backend server
prefect server start
prefect create project etl-project
prefect agent local start --label etl-label
prefect register flow --file simple_flow.py --name a-simple-etl-flow -l etl-label -p etl-project
In the UI, I can click on QUICK RUN and observe the flow execute.
Next , in another simple script named start_flow_run.py:
from prefect.tasks.prefect.flow_run import StartFlowRun
kickoff_task = StartFlowRun(
project_name='etl-project',
flow_name='a-simple-etl-flow'
)
which I execute like this:
python start_flow_run.py
But nothing happens.
The agent doesn't detect it; no activity in the UI.
I was expecting my registered flow named 'a-simple-etl-flow' to execute.
Please advise. Thanks.Sven Teresniak
03/19/2021, 8:22 AMJacopo Tagliabue
03/19/2021, 10:11 AMfrom prefect import task, Flow, Parameter
from prefect.tasks.great_expectations import RunGreatExpectationsValidation
# Define checkpoint task
validation_task = RunGreatExpectationsValidation()
with Flow("ge_test") as flow:
validation_task(checkpoint_name='gitter_checkpoint')
flow.run()
Samuel Hinton
03/19/2021, 11:14 AMon_failure
callback of a flow to send a message to a slack channel that ideally looks something like “OH MAN THE FLOW FAILED - Click here to see the flow” with a proper link. Does the flow object contain any information I can use to construct a useful URL, specifically the flow_id?Tim Enders
03/19/2021, 2:40 PMflatten
to gather all of the results back up. When RETRY
is raised I am getting the following: TypeError: object of type 'RETRY' has no len()
Tim Enders
03/19/2021, 2:40 PMMarwan Sarieddine
03/19/2021, 5:13 PMZach Hodowanec
03/19/2021, 5:59 PMPYTHONPATH
in my local environment, but not having much luck setting a similar ENV variable on the agent or job_spec. Any suggestions or documentation that might help get over this hump?
Repository Structure:
/src
--/flows
----/flow.py
--/tasks
----/task.py
Error Message: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named 'src.tasks'")
Samuel Hinton
03/19/2021, 6:02 PMRenzo Becerra
03/19/2021, 6:44 PMprefect agent ecs start --cluster my-cluster-arn --launch-type EC2
botocore.errorfactory.AccessDeniedException: An error occurred (AccessDeniedException) when calling the RegisterTaskDefinition operation: User: arn:aws:iam::**********:user/********* is not authorized to perform: ecs:RegisterTaskDefinition on resource: *
Tim Enders
03/19/2021, 6:58 PMJulie Sturgeon
03/19/2021, 7:42 PMChris White
Trevor Kramer
03/20/2021, 11:54 PMEspen Overbye
03/21/2021, 3:32 PMEspen Overbye
03/22/2021, 6:51 AMVincent Chéry
03/22/2021, 9:46 AMChris Bowdon
03/22/2021, 3:53 PMprefect
or server
projects, so wondered if maybe I'm just misunderstanding something about how it's supposed to work. Is anyone aware of this already?Nathan Walker
03/22/2021, 6:02 PMTrevor Kramer
03/22/2021, 7:22 PMstandardizer_task = submit_standardizer_job(files, bucket, 'mcule', version)
standardizer_wait_task = AWSClientWait(client='batch', waiter_name='JobComplete', max_retries=2, retry_delay=datetime.timedelta(minutes=1))(waiter_kwargs={'jobs': [standardizer_task]})
Kelly Huang
03/22/2021, 8:11 PMCharles Liu
03/22/2021, 9:53 PMAlexandru Sicoe
03/22/2021, 11:11 PMTsang Yong
03/23/2021, 12:19 AMcluster = KubeCluster.from_yaml(dask_worker_spec_file_path)
cluster.adapt(minimum=1, maximum=10)
executor = DaskExecutor(cluster.scheduler_address)
state = flow.run(executor=executor)
but when I try to access the state I'm getting this.
Python 3.8.6 (default, Dec 11 2020, 14:38:29)
Type 'copyright', 'credits' or 'license' for more information
IPython 7.21.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: state
Out[1]: <Failed: "Unexpected error: TypeError('Could not serialize object of type Failed.\nTraceback (most recent call last):\n File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps\n result = pickle.dumps(x, **dump_kwargs)\nTypeError: cannot pickle \'_thread.RLock\' object\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 307, in serialize\n header, frames = dumps(x, context=context) if wants_context else dumps(x)\n File "/usr/local/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 58, in pickle_dumps\n frames[0] = pickle.dumps(\n File "/usr/local/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps\n result = cloudpickle.dumps(x, **dump_kwargs)\n File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps\n cp.dump(obj)\n File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 563, in dump\n return Pickler.dump(self, obj)\nTypeError: cannot pickle \'_thread.RLock\' object\n')">
any idea what I'm doing wrong?Mahesh
03/23/2021, 1:04 PMimport prefect
from prefect.tasks.snowflake.snowflake import SnowflakeQuery
from prefect import task, Flow
query = """
SHOW DATABASES;
"""
snowflake_def = SnowflakeQuery(
account="account",
user="user",
password="****",
database="***",
warehouse="****",
role="***",
query=query
)
with Flow("hello-snowflake") as flow:
snowflake_def()
flow.register(project_name="tutorial")
flow.run()
when i trigger quick run from UI, Iam facing below issue
Unexpected error: TypeError("cannot pickle '_thread.lock' object")
Traceback (most recent call last):
File "/opt/prefect_env/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/opt/prefect_env/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 900, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/opt/prefect_env/lib/python3.8/site-packages/prefect/engine/results/local_result.py", line 116, in write
value = self.serializer.serialize(new.value)
File "/opt/prefect_env/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/opt/prefect_env/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 72, in dumps
cp.dump(obj)
File "/opt/prefect_env/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_thread.lock' object
I made Checkpoint as FALSE