Aakash Indurkhya
01/04/2021, 10:45 PMAakash Indurkhya
01/04/2021, 10:58 PMAakash Indurkhya
01/04/2021, 10:58 PMJeremy Phelps
01/04/2021, 11:03 PMRob Fowler
01/05/2021, 12:25 AMfrom time import sleep
from prefect.engine import state
from prefect import task, Task, Flow, context
import prefect
from prefect.engine.executors import LocalDaskExecutor, LocalExecutor
def ignore_timeout_handler(task, old_state, new_state):
print(f"{prefect.context.parameters} old {old_state} new {new_state}")
if new_state.is_failed() and isinstance(new_state, state.TimedOut):
return_state = state.Success(result={"state": "forced ok"})
else:
return_state = new_state
return return_state
@task
def produce_range():
return range(5, 10)
class SlowTask(Task):
def run(self, item, sleep_time=9, **kwopts):
sleep(sleep_time)
# doing stuff with a host called 'item'
return item
with Flow("Slow flow") as flow:
slow_task = SlowTask(timeout=6, max_retries=2, retry_delay=2, state_handlers=[ignore_timeout_handler])
nrange = produce_range()
result = slow_task.map(item=nrange,
sleep_time=nrange)
# executor = LocalDaskExecutor(scheduler="threads", num_workers=10)
executor = LocalExecutor()
for ii in flow.run(executor=executor).result[result].result:
print(ii)
Alex Koay
01/05/2021, 3:03 AMemre
01/05/2021, 1:44 PMmeta_df = SnowflakePandasResultTask(
db=SNOW_DB,
checkpoint=True,
result=LocalResult(dir=".prefect_cache"),
cache_for=timedelta(days=14),
cache_key="snow_pandas_out",
)(query=info_query)
This persist files with arbitrary names under .prefect_cache
. On every run I get a warning that my cache is not valid anymore, Can anyone point me to where I am doing things wrong?Chris Jordan
01/05/2021, 3:15 PMStartFlowRun
? I've got a task
spawn_time_series_import = StartFlowRun(
project_name="python_imports",
flow_name="blast_metric_series_flow")
which takes an argument
spawn = spawn_time_series_import(
parameters=dict(action_id=blast_id))
and a task that returns a list of `blast_id`s with which I want to spawn many flows
blast_id_list = push_records_to_summary_table(imported)
I want to put all this together in something like this, but I'm having trouble finding the right syntax
spawn_time_series_import = StartFlowRun(
project_name="python_imports",
flow_name="blast_metric_series_flow")
with Flow("blast_import_flow",
schedule=daily_schedule,
state_handlers=[cloud_only_slack_handler]
) as flow:
[[[stuff]]]
blast_id_list = push_records_to_summary_table(imported)
spawn = spawn_time_series_import.map(
parameters=dict(action_id=blast_id_list))
is there an accepted syntax for this?Dilip Thiagarajan
01/05/2021, 4:03 PMflow.serialized_hash()
in 0.14.0?Hui Zheng
01/05/2021, 7:17 PMjeff n
01/05/2021, 7:21 PMHui Zheng
01/05/2021, 9:08 PMMarwan Sarieddine
01/05/2021, 10:38 PMBraun Reyes
01/05/2021, 10:57 PMIf neither are set then script will not be uploaded and users should manually place the script file in the desired key location in an S3 bucket.
I could not gather how to make sure the flow run will use the S3 artifact.Verun Rahimtoola
01/06/2021, 5:00 AMGreg Roche
01/06/2021, 9:52 AMUnexpected error: KeyError('endpoint_resolver')
We have a daily flow which includes around 25 mapped S3Upload tasks, and approximately once every five flow runs, one single mapped S3Upload task fails with this error. We're not doing anything particularly complicated or novel with the task, and the flow always succeeds when restarted, so I'm curious to know if anyone else has experienced this error and knows of a fix. More details in the thread, and thanks in advance for any help 🙂Lukas N.
01/06/2021, 4:08 PMNone
. Reproducible example in thread. Thanks in advance for any help 🙂Pedro Martins
01/06/2021, 6:17 PMcustom_confs = {
"run_config": KubernetesRun(
image="drtools/prefect:dask-test",
image_pull_secrets=["regcred"],
),
"storage": S3(bucket="dr-prefect"),
"executor": DaskExecutor(address="dask-scheduler.dask.svc.cluster.local:8786")
}
with Flow("dask-example", **custom_confs) as flow:
incs = inc.map(x=range(100))
decs = dec.map(x=range(100))
adds = add.map(x=incs, y=decs)
total = list_sum(adds)
Ideally I want to have incs
and decs
running in parallel in different workers but they are running asynchronously as you can see in the image below.
I tried with 3, 5 and 10 dask-workers but the tasks doesn't run in parallel.
What do you suggest?Verun Rahimtoola
01/06/2021, 9:30 PMCharles Lariviere
01/06/2021, 9:37 PMprefect agent kubernetes install
command. I’m running into ImportError
when trying to run a flow that uses the SnowflakeQuery
Prefect task.
Failed to load and execute Flow's environment: ImportError('Using `prefect.tasks.snowflake` requires Prefect to be installed with the "snowflake" extra.')
I’m wondering where I should specify the dependencies for this flow? I understand that if I was using the Docker
storage I would define those in python_dependencies
, but I’m not sure where I should define those without using the Docker
storage.Javier Velez
01/07/2021, 5:56 AMJoël Luijmes
01/07/2021, 10:19 AMale
01/07/2021, 2:37 PMEric
01/07/2021, 3:46 PMCharles Lariviere
01/07/2021, 4:20 PM~/.prefect/config.toml
, a Kubernetes agent, and a Flow configured with Docker
storage.
When I register the flow, it looks like my local dev credentials are packaged in the Docker image for that flow, and the flow runs deployed through Prefect Cloud, running on our Kubernetes agent, do not use the Secrets logged in Prefect Cloud — they instead use my local secrets. The only way I have found for that not to happen is to comment out or delete my local config before registering the flow.
Is that expected? If so, how does one ensure that devs to not accidentally package their local credentials when registering flows?Brett Naul
01/07/2021, 6:52 PMmanual_only
trigger but that can be achieved at runtime instead. so something like
if condition and not prefect.context["manual_approval"]:
raise signals.PAUSE()
Jeff Williams
01/07/2021, 7:39 PMAnish Chhaparwal
01/07/2021, 8:00 PMfrom prefect import Flow, Task, Parameter
class PrintTheStatements(Task):
def task_1():
print(f"first name is {self.firstname} and last name is {self.lastname}")
def run(self, firstname, lastname):
self.firstname = firstname
self.lastname = lastname
task_1()
if __name__ == "__main__":
with Flow("ClassTask") as flow:
firstname = Parameter("firstname", default="anish")
lastname = Parameter("lastname", default="chhaparwal")
apt = PrintTheStatements()
result = apt(firstname=firstname, lastname=lastname)
flow.run()
Verun Rahimtoola
01/07/2021, 10:14 PMVerun Rahimtoola
01/07/2021, 10:57 PMResult
subclass to support our own result storage backend?Verun Rahimtoola
01/07/2021, 10:57 PMResult
subclass to support our own result storage backend?Dylan
01/07/2021, 10:59 PMVerun Rahimtoola
01/07/2021, 11:00 PMDylan
01/07/2021, 11:05 PM