Jon Ruhnke
01/14/2022, 9:35 PMGreg Adams
01/14/2022, 9:38 PMJosh
01/14/2022, 11:01 PMerror: <nothing> not callable
for the task run when I try to test it out
class MyTask(Task):
def run(self):
# do something
return True
if __name__ == "__main__":
my_task = MyTask()
with Flow("My Flow") as flow:
my_task()
flow.run()
Amber Papillon
01/16/2022, 3:04 AMPhilipp Eisen
01/16/2022, 11:54 AMdistributed.worker - WARNING - Compute Failed
Function: orchestrate_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x7f02f0f56430>, 'task_run': TaskRun(id=UUID('2acf899f-67f5-4717-9665-c91f730f3719'), created=datetime.datetime(2022, 1, 16, 11, 45, 38, 995585, tzinfo=datetime.timezone.utc), updated=datetime.datetime(2022, 1, 16, 11, 45, 39, 19000, tzinfo=datetime.timezone.utc), name='get-product-b7ee3036-0', flow_run_id=UUID('a3e75090-2e7d-42f6-8dda-b00600f70b12'), task_key='b7ee3036fbe1354fe2fbf30215a316c4', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=10, retry_delay_seconds=0.0), tags=[], state_id=UUID('36aa692f-175d-4bff-81ed-e57f2228cdfa'), task_inputs={}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 1, 16, 11, 45, 38, 988955, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta
Exception: "ValueError('Invalid task run: 2acf899f-67f5-4717-9665-c91f730f3719')"
Is there something obvious I’m missing?Tao Bian
01/16/2022, 9:39 PM@task
def write_timestamp_into_database():
...
with Flow("sample-flow", daily_schedule) as flow:
timestamp = str(datetime.datetime.now())
write_timestamp_into_database(timestamp)
Sultan Orazbayev
01/17/2022, 12:18 AMNoam Gal
01/17/2022, 8:12 AMprefect.Parameter
Tasks. The type of the parameters are just native str
and int
The flow's logic uses some other tasks that are using the parameter tasks.
Those tasks are using some other helper functions that help me reuse code and make the code more readable.
f I want that my helper function to use one of the parameter (I just need the value, not the task itself) I need to set it as a prefect task by itself and when calling it from other task it should be called with .run
since inside the task it isn't in the context of a flow.
For example:
import prefect
from prefect import Parameter, Flow, task
with Flow("my flow") as my_flow:
id = Parameter("id", required=True) # int value
description = Parameter("description", required=True). # str value
result1 = my_task1(id, description)
result2 = my_task2(id, description)
my_reduce_task(result1, result2)
@task
def my_task1(id, description):
val1 = calc_logic_func1()
return shared_task.run(id, val1)
@task
def my_task2(id, description):
val2 = calc_logic_func2()
return shared_task.run(id, val2)
@task
def shared_task(id: int, value:int) -> int:
return ...
def calc_logic_func1() -> int:
return ...
In the example above I want to use a helper function shared_task
with the integer id
value but since id
is a prefect Parameter Task
, therefore shared_task itself must be a task and calling it from other task (e.g. my_task1
it should be called with shared_task.run
.
Well, this is how I understand this so far.
Is there any other way to use it?
(not setting shared_task
as a task OR not calling it with .run
since my_task1
is already called from `my_flow`context)
If this is the right way to use it - are there any other effects on the flow run (I guess my_task1
will execute shared_task
itself in the same agent)
Thanks!Florian Kühnlenz
01/17/2022, 9:37 AMTom Klein
01/17/2022, 11:49 AMMarwan Sarieddine
01/17/2022, 1:53 PMBruno Murino
01/17/2022, 4:29 PMMiguel Angel
01/17/2022, 6:00 PMimport dask.dataframe as dd
from dask.distributed import Client
from s3fs import S3FileSystem
s3 = S3FileSystem()
client = Client()
folder_list = [
"file1",
"file2",
"file3",
"file4",
"file5",
"file6",
"file7",
"file8",
]
file_list = list(
map(lambda folder: f"<s3://my-bucket/parquet/{folder}/*.parquet>", folder_list,)
)
dataframe_list = client.map(dd.read_parquet, file_list, gather_statistics=False)
dataframe = client.submit(dd.concat, dataframe_list)
mean_value = client.submit(lambda x: ["some_data_column"].mean(), dataframe)
mean_compute = client.submit(lambda x: x.compute(), mean_value)
print(mean_compute.result())
Andreas Eisenbarth
01/17/2022, 7:27 PMcreate_flow_run
with map
to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same flow_run_id
, which means they overwrite their logs and we only see one in Prefect UI.
(Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup create_flow_run
was working correctly previously.)
Does anyone have ideas? (Example code attached)Matt Alhonte
01/17/2022, 7:38 PMSamay Kapadia
01/17/2022, 10:46 PMNo module named '/Users/sa/'
. Why does it want my home directory to be a module? More details insideYusuf Khan
01/17/2022, 11:03 PMSon Nguyen
01/18/2022, 9:30 AMprefect server start
and everything started correctly. But in the UI, when I click into a flow, it’s not redirected to flow detail page.
It looks like the following docker images version introduced a new bug
prefecthq/apollo core-0.15.12 d8519b0544d0 5 days ago 324MB
prefecthq/server core-0.15.12 d828f40dbf19 5 days ago 403MB
prefecthq/ui core-0.15.12 5edd4fee96ed 3 weeks ago 225MB
because it worked fine with this version
prefecthq/ui core-0.15.11 6fac027b4605 4 weeks ago 225MB
prefecthq/server core-0.15.11 f6280189d6a5 6 weeks ago 402MB
prefecthq/apollo core-0.15.11 d1b07b3c9a57 6 weeks ago 324MB
Akharin Sukcharoen
01/18/2022, 9:50 AMEmma Rizzi
01/18/2022, 9:56 AMFailed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n - prefect: (flow built with '0.15.10', currently running with '0.15.12')\n - python: (flow built with '3.7.11', currently running with '3.9.9')")
?
I search this slack for insights, I use prefect Cloud with a docker agent on a VM, I upgraded prefect to 0.15.12 on both agent and development machineMalthe Karbo
01/18/2022, 11:03 AMRuntimeError: IOLoop is closed
Flow example in thread and pinned versions as wellAaron Pickering
01/18/2022, 11:45 AM"Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n NameError("name \'err\' is not defined")')"
The task itself is straightforward, it looks like this:
snowsql_obj = SnowflakeQueriesFromFile(account=SNOWFLAKE_ACCOUNT, user=SNOWFLAKE_USER, password=SNOWFLAKE_PWD, file_path="../../sql/amplitude_raw.sql")
snowsql_obj.run()
Samay Kapadia
01/18/2022, 3:20 PMKonstantin
01/18/2022, 3:39 PMJohan Wåhlin
01/18/2022, 3:46 PMSamay Kapadia
01/18/2022, 3:51 PMThe secret KUBERNETES_API_KEY was not found
if I’m running the prefect agent inside the cluster? According to this doc it will attempt an in cluster connection but my hello world task seems to keep failing 😭Jason Motley
01/18/2022, 3:55 PMmax_retries
feature but for an entire flow? I.e. if the flow fails for some reason, retry it 5 minutes later.Muddassir Shaikh
01/18/2022, 4:04 PM@task(task_run_name="{task_name_from_tup(details)}", max_retries=3, retry_delay=timedelta(minutes=1))
def processing(details):
//some code//
Yusuf Khan
01/18/2022, 5:40 PMFrank Oplinger
01/18/2022, 10:05 PM