Emma Rizzi
01/14/2022, 4:57 PMHenrietta Salonen
01/14/2022, 5:06 PMWilliam Jevne
01/14/2022, 5:24 PMYusuf Khan
01/14/2022, 5:34 PMrun_in_pwsh = ShellTask(name="Powershell run", shell="pwsh")
this is what I was trying to run. Then within the flow I had: run_in_pwsh(command='ls')
The documentation for the shell argument says 'shell to run the command with; defaults to "bash"'. I assumed it would accept whatever as long as that kicked off in the terminal correctly? What I'm actually trying to do is run an azure command line utility called 'azcopy' (which is not part of the generic az cli). I need to do it both on a windows machine and a linux machine. Having separate scripts is fine. Any thoughts for how I could/should do this on windows?Jason Motley
01/14/2022, 7:02 PMNonetype object has no attribute 'to_sql
Ā butĀ onlyĀ in production? I've spot checked and my load statement is constructed identically, registration is fine, etc. Local runs go fine as well.Andrey Tatarinov
01/14/2022, 7:20 PMPhilip MacMenamin
01/14/2022, 8:24 PMConnecting to Prefect Server at ip/graphql:4200
Jon Ruhnke
01/14/2022, 9:35 PMGreg Adams
01/14/2022, 9:38 PMJosh
01/14/2022, 11:01 PMerror: <nothing> not callable
for the task run when I try to test it out
class MyTask(Task):
def run(self):
# do something
return True
if __name__ == "__main__":
my_task = MyTask()
with Flow("My Flow") as flow:
my_task()
flow.run()
Amber Papillon
01/16/2022, 3:04 AMPhilipp Eisen
01/16/2022, 11:54 AMdistributed.worker - WARNING - Compute Failed
Function: orchestrate_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x7f02f0f56430>, 'task_run': TaskRun(id=UUID('2acf899f-67f5-4717-9665-c91f730f3719'), created=datetime.datetime(2022, 1, 16, 11, 45, 38, 995585, tzinfo=datetime.timezone.utc), updated=datetime.datetime(2022, 1, 16, 11, 45, 39, 19000, tzinfo=datetime.timezone.utc), name='get-product-b7ee3036-0', flow_run_id=UUID('a3e75090-2e7d-42f6-8dda-b00600f70b12'), task_key='b7ee3036fbe1354fe2fbf30215a316c4', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=10, retry_delay_seconds=0.0), tags=[], state_id=UUID('36aa692f-175d-4bff-81ed-e57f2228cdfa'), task_inputs={}, state_type=StateType.PENDING, run_count=0, expected_start_time=datetime.datetime(2022, 1, 16, 11, 45, 38, 988955, tzinfo=datetime.timezone.utc), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta
Exception: "ValueError('Invalid task run: 2acf899f-67f5-4717-9665-c91f730f3719')"
Is there something obvious Iām missing?Tao Bian
01/16/2022, 9:39 PM@task
def write_timestamp_into_database():
...
with Flow("sample-flow", daily_schedule) as flow:
timestamp = str(datetime.datetime.now())
write_timestamp_into_database(timestamp)
Sultan Orazbayev
01/17/2022, 12:18 AMNoam Gal
01/17/2022, 8:12 AMprefect.Parameter
Tasks. The type of the parameters are just native str
and int
The flow's logic uses some other tasks that are using the parameter tasks.
Those tasks are using some other helper functions that help me reuse code and make the code more readable.
f I want that my helper function to use one of the parameter (I just need the value, not the task itself) I need to set it as a prefect task by itself and when calling it from other task it should be called with .run
since inside the task it isn't in the context of a flow.
For example:
import prefect
from prefect import Parameter, Flow, task
with Flow("my flow") as my_flow:
id = Parameter("id", required=True) # int value
description = Parameter("description", required=True). # str value
result1 = my_task1(id, description)
result2 = my_task2(id, description)
my_reduce_task(result1, result2)
@task
def my_task1(id, description):
val1 = calc_logic_func1()
return shared_task.run(id, val1)
@task
def my_task2(id, description):
val2 = calc_logic_func2()
return shared_task.run(id, val2)
@task
def shared_task(id: int, value:int) -> int:
return ...
def calc_logic_func1() -> int:
return ...
In the example above I want to use a helper function shared_task
with the integer id
value but since id
is a prefect Parameter Task
, therefore shared_task itself must be a task and calling it from other task (e.g. my_task1
it should be called with shared_task.run
.
Well, this is how I understand this so far.
Is there any other way to use it?
(not setting shared_task
as a task OR not calling it with .run
since my_task1
is already called from `my_flow`context)
If this is the right way to use it - are there any other effects on the flow run (I guess my_task1
will execute shared_task
itself in the same agent)
Thanks!Florian Kühnlenz
01/17/2022, 9:37 AMTom Klein
01/17/2022, 11:49 AMMarwan Sarieddine
01/17/2022, 1:53 PMBruno Murino
01/17/2022, 4:29 PMMiguel Angel
01/17/2022, 6:00 PMimport dask.dataframe as dd
from dask.distributed import Client
from s3fs import S3FileSystem
s3 = S3FileSystem()
client = Client()
folder_list = [
"file1",
"file2",
"file3",
"file4",
"file5",
"file6",
"file7",
"file8",
]
file_list = list(
map(lambda folder: f"<s3://my-bucket/parquet/{folder}/*.parquet>", folder_list,)
)
dataframe_list = client.map(dd.read_parquet, file_list, gather_statistics=False)
dataframe = client.submit(dd.concat, dataframe_list)
mean_value = client.submit(lambda x: ["some_data_column"].mean(), dataframe)
mean_compute = client.submit(lambda x: x.compute(), mean_value)
print(mean_compute.result())
Andreas Eisenbarth
01/17/2022, 7:27 PMcreate_flow_run
with map
to create multiple flows, each with a different dict of parameters. On one server, all created flows receive the same flow_run_id
, which means they overwrite their logs and we only see one in Prefect UI.
(Locally I cannot reproduce it and every child flow has a different flow run ID. This server is running in docker, and in that setup create_flow_run
was working correctly previously.)
Does anyone have ideas? (Example code attached)Matt Alhonte
01/17/2022, 7:38 PMSamay Kapadia
01/17/2022, 10:46 PMNo module named '/Users/sa/'
. Why does it want my home directory to be a module? More details insideYusuf Khan
01/17/2022, 11:03 PMSon Nguyen
01/18/2022, 9:30 AMprefect server start
and everything started correctly. But in the UI, when I click into a flow, itās not redirected to flow detail page.
It looks like the following docker images version introduced a new bug
prefecthq/apollo core-0.15.12 d8519b0544d0 5 days ago 324MB
prefecthq/server core-0.15.12 d828f40dbf19 5 days ago 403MB
prefecthq/ui core-0.15.12 5edd4fee96ed 3 weeks ago 225MB
because it worked fine with this version
prefecthq/ui core-0.15.11 6fac027b4605 4 weeks ago 225MB
prefecthq/server core-0.15.11 f6280189d6a5 6 weeks ago 402MB
prefecthq/apollo core-0.15.11 d1b07b3c9a57 6 weeks ago 324MB
Akharin Sukcharoen
01/18/2022, 9:50 AMEmma Rizzi
01/18/2022, 9:56 AMFailed to load and execute Flow's environment: FlowStorageError("An error occurred while unpickling the flow:\n Ā TypeError('an integer is required (got type bytes)')\nThis may be due to one of the following version mismatches between the flow build and execution environments:\n Ā - prefect: (flow built with '0.15.10', currently running with '0.15.12')\n Ā - python: (flow built with '3.7.11', currently running with '3.9.9')")
?
I search this slack for insights, I use prefect Cloud with a docker agent on a VM, I upgraded prefect to 0.15.12 on both agent and development machineMalthe Karbo
01/18/2022, 11:03 AMRuntimeError: IOLoop is closed
Flow example in thread and pinned versions as wellAaron Pickering
01/18/2022, 11:45 AM"Failed to load and execute Flow's environment: FlowStorageError('An error occurred while unpickling the flow:\n NameError("name \'err\' is not defined")')"
The task itself is straightforward, it looks like this:
snowsql_obj = SnowflakeQueriesFromFile(account=SNOWFLAKE_ACCOUNT, user=SNOWFLAKE_USER, password=SNOWFLAKE_PWD, file_path="../../sql/amplitude_raw.sql")
snowsql_obj.run()
Samay Kapadia
01/18/2022, 3:20 PMSamay Kapadia
01/18/2022, 3:20 PMKevin Kho
01/18/2022, 3:25 PMSamay Kapadia
01/18/2022, 3:26 PMPREFECT__USE_LOCAL_SECRETS=false poetry run python main.py
Iām still getting
ValueError: Local Secret "prod-prefect-flows" was not found.
š¤PREFECT__*CLOUD*__USE_LOCAL_SECRETS=false
and it workedKevin Kho
01/18/2022, 3:39 PM