Stéphan Taljaard
06/18/2021, 1:16 PMevery_30_min = IntervalClock(interval=timedelta(minutes=30), parameter_defaults={"Start date hours delta": 0.75})
daily_schedule_time = pendulum.time(5, 30)
every_day_0530 = IntervalClock(
start_date=pendulum.today(tz=TIMEZONE).at(daily_schedule_time.hour, daily_schedule_time.minute),
interval=timedelta(days=1),
parameter_defaults={"Start DateTime": "yesterday"},
)
schedule = Schedule(clocks=[every_30_min, every_day_0530])
to filter as follows:
• Pull a large chunk data once per day at 05:30
• Pull a small chunk of data every 30 minutes (but not on 05:30)
I think I need a no_filter
, however it seems that applies to all clocks. It will thus remove both my scheduled runs, leaving only the 30minute one?Prabin Mehta
06/18/2021, 2:16 PMMarko Jamedzija
06/18/2021, 3:40 PMRunNamespacedJob
task to run k8s jobs. When they fail they don’t get deleted, which is fine and described in the docstring If job is in the failed status, resources will not be removed from the cluster so that user can check the logs on the cluster.
. The problem appears the next time I run a job because it complains that the job with the same name already exists and I don’t want to delete the job manually every time it happens. What would be the ideal solution to go around this? Thanks!matta
06/18/2021, 6:15 PMBerty
06/18/2021, 6:39 PMclient.scatter
?
python3.8/site-packages/distributed/worker.py:3373: UserWarning: Large object of size 126.73 MB detected in task graph:
{'task': <Task: blah>, 'state': None, 'ups ... _parent': True}
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers
future = client.submit(func, big_data) # bad
big_future = client.scatter(big_data) # good
future = client.submit(func, big_future) # good
warnings.warn(
Killed
python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 48 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
Leon Kozlowski
06/18/2021, 6:41 PMBen Muller
06/18/2021, 8:05 PMmatta
06/18/2021, 8:20 PM~/.prefect/config.toml
be if I installed with conda?matta
06/18/2021, 9:13 PMAndrew Hannigan
06/19/2021, 12:51 AMGopinath Jaganmohan
06/19/2021, 2:15 PMAyyanar Thangaraj
06/21/2021, 3:24 AMAyyanar Thangaraj
06/21/2021, 3:25 AMAyyanar Thangaraj
06/21/2021, 3:25 AMAyyanar Thangaraj
06/21/2021, 3:26 AMNoah Holm
06/21/2021, 7:02 AMupstream_tasks=[some_task]
for those that just need to run in order. When the last one fails it’ll only retry the failed one and not the ones that it depends on. I use S3 storage, therefore S3 results but I have added checkpoint=False
to all my tasks in the flow. When restarting in the Cloud UI I get a message saying that “restarting tasks may require more config, read docs”, but I don’t see where I’d solve my use case. I would expect that the tasks that have dependent tasks in the flow gets rerun since they didn’t have any checkpoint of their results.Thomas Hoeck
06/21/2021, 11:00 AMOmar Sultan
06/21/2021, 11:30 AMToni Vlaic
06/21/2021, 11:50 AMUnexpected error: ClientError([{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}])
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 856, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/executors.py", line 298, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/usr/local/lib/python3.8/site-packages/prefect/utilities/tasks.py", line 449, in method
return run_method(self, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/tasks/prefect/flow_run.py", line 172, in run
run_link = client.get_cloud_url("flow-run", flow_run_id)
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 895, in get_cloud_url
tenant_slug = self.get_default_tenant_slug(as_user=as_user and using_cloud_api)
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 928, in get_default_tenant_slug
res = self.graphql(query)
File "/usr/local/lib/python3.8/site-packages/prefect/client/client.py", line 318, in graphql
raise ClientError(result["errors"])
prefect.utilities.exceptions.ClientError: [{'path': ['user'], 'message': 'field "user" not found in type: \'query_root\'', 'extensions': {'path': '$.selectionSet.user', 'code': 'validation-failed', 'exception': {'message': 'field "user" not found in type: \'query_root\''}}}]
Any ideas on how to approach to troubleshoot this?Shivam Shrey
06/21/2021, 12:38 PMthis.id
in the currently running flow.Gitesh Shinde
06/21/2021, 1:51 PMKubernetesRun
to be pulled from aws ecr?ciaran
06/21/2021, 3:01 PMmyth busted
Alexander
06/21/2021, 5:36 PMJoe
06/21/2021, 5:43 PM[21 June 2021 1:41pm]: Failed to load and execute Flow's environment: ModuleNotFoundError("No module named '/root/'")
Joseph Loss
06/21/2021, 6:54 PMLeon Kozlowski
06/21/2021, 7:46 PMmatta
06/21/2021, 10:49 PMflow.register()
at the bottom. probably a way to have it run flow.py
upon uploading a new version?Ben Muller
06/21/2021, 11:53 PMprefect kv set
in a Docker image.
Is the only thing I need to do so that the container is authed with my cloud account ....
export PREFECT__CLOUD__AGENT__AUTH_TOKEN=foobar
I am getting the error:
prefect.utilities.exceptions.ClientError: Malformed response received from Cloud - please ensure that you have an API token properly configured.
Sam Gibson
06/22/2021, 4:40 AM@task()
def a():
# do stuff...
@task()
def b():
# do stuff...
@task(requires=[a, b])
def c(a, b):
# do stuff...
The benefit of such an approach is that is that what your dependencies is defined in the same place you need them (instead of at the bottom of the file in a flow block??). This is especially useful when your dependencies involve making queries to various data sources and you parameterise the queries, e.g.
@task(requires=[
query_table("trades"),
query_table("orders")
])
def targets(trades, orders):
# do stuff...
In the prefect model, from what I gather, for the above example I would instead write something like...
@task
def targets(trades, orders):
# do stuff...
# maybe lots of other code goes here...
with Flow("targeting") as flow:
trades = query_table("trades")(**params)
orders = query_table("orders")(**params)
targets(trades, orders)
From a code author point of view the prefect model is (subjectively, I admit) awkward, especially as the complexity of your piplines grow (with tasks depending on tasks, depending on queries, etc).
I've played around with implementing some DSL on top of prefect to achieve something similar to the above, but it feels like I'm really trying to pound a square peg into a round hole.
What I'd like to understand is what patterns or idioms are used for complicated pipelines, especially those spanning multiple modules/files, with multiple flows depending on one another? Are there advantages to declaring the DAG and your logic separately that I'm not understanding? Especially for pipelines that are built where a "start" and "end" time are implicit in every task while operating on time series data.Prabin Mehta
06/22/2021, 9:21 AMFile "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
value = secrets[self.name]
Prabin Mehta
06/22/2021, 9:21 AMFile "/usr/local/lib/python3.8/site-packages/prefect/client/secrets.py", line 137, in get
value = secrets[self.name]
ciaran
06/22/2021, 9:46 AMPrabin Mehta
06/22/2021, 11:03 AMwith Flow(name="schema-generator",state_handlers=[handler],
storage=GitHub(repo="prefect-flows",
path="flows/schema_generator.py",access_token_secret="GITHUB_ACCESS_TOKEN"),
run_config=KubernetesRun(image="prefecthq/prefect:0.14.22-python3.8",
env={"EXTRA_PIP_PACKAGES": "msgpack==1.0.0 lz4==3.1.1 numpy==1.18.1 prefect[snowflake]"}),
executor=DaskExecutor("<tcp://dask-scheduler:8786>"),
result=PrefectResult()) as flow_schema_generator:
schema = Parameter("schema", default=[])
_create_schema(schema=schema)
2. This is the flow where I am passing ecr repo which have the prefect image installed with all dependencies. This is giving me the above mentioned error in job pods.
with Flow(name="schema-generator",state_handlers=[handler],
storage=GitHub(repo="prefect-flows",
path="flows/schema_generator.py",access_token_secret="GITHUB_ACCESS_TOKEN"),
run_config=KubernetesRun(image="ecr_link/prefect-aura-image:latest",
image_pull_secrets=["secret"]),
executor=DaskExecutor("<tcp://dask-scheduler:8786>"),
result=PrefectResult()) as flow_schema_generator:
schema = Parameter("schema", default=[])
_create_schema(schema=schema)
ciaran
06/22/2021, 11:32 AMPrabin Mehta
06/22/2021, 12:21 PMciaran
06/22/2021, 12:27 PMPrabin Mehta
06/22/2021, 12:33 PMciaran
06/22/2021, 12:35 PM