aaron
06/10/2022, 5:17 PMAndreas Nigg
06/10/2022, 6:17 PMChristopher Haack
06/10/2022, 7:55 PMBoggdan Barrientos
06/10/2022, 8:31 PMParth Patel
06/10/2022, 9:42 PMYehor Anisimov
06/11/2022, 5:41 AM[2022-06-11 08:35:49+0300] ERROR - prefect.CloudTaskRunner | Task 'DbtShellTask': Exception encountered during task execution!
Traceback (most recent call last):
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\engine\task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\tasks\dbt\dbt.py", line 192, in run
return super(DbtShellTask, self).run(
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\utilities\tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "C:\Users\User\PycharmProjects\demo\venv\lib\site-packages\prefect\tasks\shell.py", line 145, in run
line = raw_line.decode("utf-8").rstrip()
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xad in position 7: invalid start byte
Any ideas what is the issue?Massimiliano Fanciulli
06/12/2022, 10:21 AMVolker L
06/12/2022, 11:06 AMmahmoud elhalwany
06/12/2022, 11:47 AMTraceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/cli/base.py", line 59, in wrapper
return fn(*args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 67, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_core/_eventloop.py", line 56, in run
return asynclib.run(func, *args, **backend_options)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 233, in run
return native_run(wrapper(), debug=debug)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 228, in wrapper
return await func(*args)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/prefect/cli/base.py", line 162, in version
if is_ephemeral:
UnboundLocalError: local variable 'is_ephemeral' referenced before assignment
An exception occurred.
Joshua Greenhalgh
06/12/2022, 1:48 PMDung Khuc
06/13/2022, 8:04 AMJoshua Greenhalgh
06/13/2022, 10:40 AM063bd788-bc0c-46f5-85e4-1092c2b19297
Priyank
06/13/2022, 11:14 AMIzu
06/13/2022, 11:28 AMAnna Geller
06/13/2022, 11:38 AMJelle Vegter
06/13/2022, 1:36 PMOscar Krantz
06/13/2022, 2:29 PMRenuka
06/13/2022, 2:59 PMyu zeng
06/13/2022, 3:20 PMBrian Phillips
06/13/2022, 5:44 PMMary Clair Thompson
06/13/2022, 7:08 PMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "/tmp/flow-script-calculate-blob-size4cemrmn_.py", line 1, in <module>
ModuleNotFoundError: No module named 'local_module'
Slackbot
06/13/2022, 7:17 PMMitchell Bregman
06/13/2022, 8:01 PMDbtShellTask
; locally works all as expected. when i deploy to prefect cloud, I am getting a dbt: command not found
.. requirements include dbt-redshift
. My flow storage is Docker. when i try to build the Docker image locally, inside the container I also get dbt: command not found
even though it is installed in my python environment packages. Has anyone run into this before?Ken Nguyen
06/13/2022, 11:15 PMa -> b ->c
I want it so that if a
and b
fails, it sends a Slack notifications saying “Flow failed”. But if c
fails, I want it alter the Slack message by adding b’s outputWei Mei
06/14/2022, 12:32 AMMitchell Bregman
06/14/2022, 1:58 AM01:13:51 Running with dbt=1.1.0
01:13:51 Partial parse save file not found. Starting full parse.
01:13:52 Found 9 models, 75 tests, 0 snapshots, 0 analyses, 196 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics
01:13:52
01:13:52 Encountered an error:
Database Error
could not connect to server: No such file or directory
Is the server running locally and accepting
connections on Unix domain socket "/var/run/postgresql/.s.PGSQL.5439"?
Michelle Brochmann
06/14/2022, 3:10 AMResult
object. I am doing operations on spark DataFrames
in Tasks
and would like to pass a DataFrame
from Task
to Task
via the Result
object. If the DataFrame
is too large to fit in memory simply passing a custom serializer won’t work. So I am thinking I could create a subclass of Result
called SparkDataFrameResult
where the read
and write
methods are overridden to use the spark load
and save
methods.
1. Is there any reason this wouldn’t work or wouldn’t be recommended?
2. Are there any best practices for doing something like this?William Jamir
06/14/2022, 9:01 AMSatnam Singh
06/14/2022, 9:22 AMFlorian Guily
06/14/2022, 10:12 AMFlorian Guily
06/14/2022, 10:12 AMAnna Geller
06/14/2022, 11:15 AMKevin Kho
06/14/2022, 1:50 PMFlorian Guily
06/15/2022, 8:44 AMrecords = get_records()
for record in records:
tags = []
record_id = get_record_id_if_exist()
if record_id == None:
for tag in record["tags"]:
tag_id = get_record_tag_if_exist()
if tag_id == None:
tag_id = create_tag()
tags.append(tag_id)
record_id = create_open_data(record['open_data'], tag_list)
version_id = create_version(record_id, record['version'])
Anna Geller
06/15/2022, 12:42 PMKevin Kho
06/15/2022, 12:55 PMFlorian Guily
06/15/2022, 1:07 PMKevin Kho
06/15/2022, 1:10 PMFlorian Guily
06/15/2022, 1:49 PMKevin Kho
06/15/2022, 2:04 PMapply_map
, I just add another input called y
but it doesn’t do anything:
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
@task
def inc(x,y):
return x + 1
@task
def negate(x,y):
return -x
@task
def is_even(x,y):
return x % 2 == 0
def inc_or_negate(x, y):
cond = is_even(x,y)
with case(cond, True):
res1 = inc(x,y)
with case(cond, False):
res2 = negate(x,y)
return merge(res1, res2)
I believe your goal is like this:
for x in [1,2,3,4]:
for y in ["A","B","C","D"]:
inc_or_negate(x, y)
So we can make another task `cross_product`:
@task(nout=2)
def cross_product(x_list, y_list):
res = []
for x in x_list:
for y in y_list:
res.append((x,y))
res_x = [_[0] for _ in res]
res_y = [_[1] for _ in res]
return res_x, res_y
and then run the Flow:
with Flow("apply-map example") as flow:
x, y = cross_product(range(4), ["A","B","C", "D"])
result = apply_map(inc_or_negate, x, y)
flow.run()
x
and y
already represent all the possible combinationsFlorian Guily
06/15/2022, 2:15 PMKevin Kho
06/15/2022, 2:17 PMFlorian Guily
06/15/2022, 2:17 PM