Lucas Cavalcanti Rodrigues
09/09/2022, 9:17 PMdump_to_gcs_flow = create_flow_run.map(
flow_name=unmapped(utils_constants.FLOW_DUMP_TO_GCS_NAME.value),
project_name=unmapped(constants.PREFECT_DEFAULT_PROJECT.value),
parameters=tables_to_zip,
labels=unmapped(current_flow_labels),
run_name=unmapped("Dump to GCS"),
)
except parameters
all the other arguments are constants, so I use unmapped
on them. tables_to_zip
is a list of dictionaries containing the parameters values for each table to be zip. However, this didn't work. I'm currently receiving the error:
prefect.exceptions.ClientError: [{'message': 'parsing UUID failed, expected String, but encountered Array', 'locations': [{'line': 2, 'column': 5}], 'path': ['flow_run'], 'extensions': {'path': '$.selectionSet.flow_run.args.where.id._eq', 'code': 'parse-failed', 'exception': {'message': 'parsing UUID failed, expected String, but encountered Array'}}}]
what am I doing wrong here?Chern Hong Poh
10/12/2022, 10:08 AMKhyaati Jindal
10/18/2022, 10:28 AMStephen Thibeault
10/27/2022, 8:06 PMAndreas Nigg
10/28/2022, 1:20 PM{% if is_incremental() %}
where loaded_at>= coalesce(_dbt_max_partition, '2022-01-01')
{% else %}
But, the problem is, that in my incremental model I do not partition by “loaded_at” but by a different column (due to use-case demands). So _dbt_max_partition would not help here, as it would simply return the maximum partition value of the model (which I can’t use as filter for the source table).
In “native” BigQuery I would simply use a scripting variable as follows
declare max_source_partition timestamp;
set max_source_partition = (select max(loaded_at) as ts from `my_model_table`);
select * from `my_source_table` where loaded_at > max_source_partition
How can one implement such a scenario with dbt? Is there a way to create scripting variables as part of my models? Or do I need to add it as a on-start-hook? Or any better strategies to exclude partitions in my source without having the same column as partition field in my model?Mark
11/01/2022, 9:00 AMBoris Tseytlin
11/02/2022, 4:34 PMget_folder
which will download the whole storage to my local drive. I don’t want that.
I see two alternative ways:
• Make an S3 block, use that.
• Make a secret block, in my code set up a minio connection, use that.
Which one is better?Jaafar
11/03/2022, 9:30 AMBoris Tseytlin
11/03/2022, 5:44 PMpytorch/pytorch
.
It fails because prefect is not installed
17:43:10.947 | INFO | prefect.infrastructure.docker-container - Pulling image 'pytorch/pytorch'...
17:43:12.610 | INFO | prefect.infrastructure.docker-container - Creating Docker container 'prudent-goshawk'...
17:43:12.679 | INFO | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'created'
17:43:12.934 | INFO | prefect.agent - Completed submission of flow run '64e545de-27ff-4d51-85d6-cec4d8eb0afe'
17:43:12.949 | INFO | prefect.infrastructure.docker-container - Docker container 'prudent-goshawk' has status 'running'
/opt/conda/bin/python: Error while finding module specification for 'prefect.engine' (ModuleNotFoundError: No module named 'prefect')
Adding prefect to the container block extra pip packages doesnt help:
{
"EXTRA_PIP_PACKAGES": "prefect"
}
Stephen Thibeault
11/08/2022, 1:45 PMDan Wise
11/08/2022, 3:35 PMKhyaati Jindal
11/29/2022, 6:39 AMRUN apt update ; apt upgrade -y
WORKDIR /project_dir
COPY requirements.txt .
RUN pip3 install -r requirements.txt
RUN prefect cloud login -k <key> --workspace <my_workspace_name>
COPY . .
CMD [ "python3", "deployment.py"]
I am deploying this dockers image using github actions, but the image building fails, because , to me it seems like, it is expecting a input , coz I face the following build error
Step 6/8 : RUN prefect cloud login -k <key> --workspace <workspace>
---> Running in a1f954737f0f
Creating a profile for this Prefect Cloud login. Please specify a profile name: Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 41, in wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 201, in coroutine_wrapper
return run_async_in_new_loop(async_fn, *args, **kwargs)
An exception occurred.
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 152, in run_async_in_new_loop
return anyio.run(partial(__fn, *args, **kwargs))
File "/usr/local/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/usr/local/lib/python3.10/site-packages/prefect/cli/cloud.py", line 209, in login
cloud_profile_name = app.console.input(
File "/usr/local/lib/python3.10/site-packages/rich/console.py", line 2102, in input
result = input()
EOFError: EOF when reading a line
The command '/bin/sh -c prefect cloud login -k <key> --workspace <workspace> ' returned a non-zero code: 1
make: *** [Makefile:16: docker-build] Error 1
Error: Process completed with exit code 2.
Anders Smedegaard Pedersen
12/01/2022, 8:34 AMZack
12/01/2022, 3:44 PMZack
12/02/2022, 12:52 AMJon Martin
12/02/2022, 8:07 PMJason Ma
12/02/2022, 10:44 PMTomas Moreno
12/08/2022, 8:32 PMDang Khoi Vo
12/21/2022, 4:35 PMNicolas Gastaldi
01/02/2023, 8:49 PMDarren Liu
01/25/2023, 3:05 AMIshan Anilbhai Koradiya
01/31/2023, 5:33 AMYusuf
02/01/2023, 7:27 AMprefect cloud login
without verifying ssl certs? I'm getting an ssl, unable to find local issuer cert error. I can resolve it with IT it'll just take a week and a bit to go through the whole process. I'm just prototyping some stuff. I checked here:
https://discourse.prefect.io/t/how-to-disable-the-ssl-verification-when-setting-up-a-pr[…]verify-failed-unable-to-get-local-issuer-certificate/597/2
But the solutions didn't seem to work. I'm using windows btwYusuf
02/01/2023, 8:13 AMHaotian Li
02/08/2023, 9:09 AMEvan Curtin
02/08/2023, 5:14 PMStephen
02/09/2023, 1:45 AMJacob Bedard
02/13/2023, 10:27 PMAttributeError: 'coroutine' object has no attribute 'get'
and resolved it?Ravi
02/21/2023, 11:49 AMKelvin DeCosta
02/22/2023, 9:38 AMasync
tasks, either via .submit
or .map
, I've found that using a SequentialTaskRunner
causes the flow to run much faster than using ConcurrentTaskRunner
.
I'd like to know more about this behavior.
Any feedback is appreciated!Kelvin DeCosta
02/22/2023, 9:38 AMasync
tasks, either via .submit
or .map
, I've found that using a SequentialTaskRunner
causes the flow to run much faster than using ConcurrentTaskRunner
.
I'd like to know more about this behavior.
Any feedback is appreciated!Peyton Runyan
02/22/2023, 4:58 PMSequentialTaskRunner
currently submits asynchronously, not sequentially, on map. That's going to be changed soon. But I don't know why it would be faster.submit
? I can't reproduce thisKelvin DeCosta
02/23/2023, 7:18 AMasync
tasks with .submit
or .map
, seems to be faster when using a SequentialTaskRunner
instead of the default.
@task(
name="Greet",
task_run_name="{name}",
)
async def greet(name: str) -> str:
message = f"Hello, {name}!"
await asyncio.sleep(2)
print(message)
return message
@flow(
name="Multi Greeter",
task_runner=SequentialTaskRunner(),
)
async def multi_greeter():
names = [
"Kelvin",
"Peyton",
]
message_states = await greet.map(name=names, return_state=True)
messages = await asyncio.gather(
*[state.result(fetch=True) for state in message_states if state.is_completed()]
)
@flow(name="Main")
def main():
multi_greeter()
if __name__ == "__main__":
main()
Running this locally, with and without SequentialTaskRunner
, produces runs with different run durations. Adding more names
makes this difference larger