yusuf yel
03/25/2023, 9:57 PMManoj
03/26/2023, 8:45 AM__init__(__self__, *args, **kwargs)
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\site-packages\prefect\flows.py", line 183, in __init__
self.parameters = parameter_schema(self.fn)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\site-packages\prefect\utilities\callables.py", line 306, in parameter_schema
model = pydantic.create_model("Parameters", __config__=ModelConfig, **model_fields)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "pydantic\main.py", line 1026, in pydantic.main.create_model
File "pydantic\main.py", line 197, in pydantic.main.ModelMetaclass.__new__
File "pydantic\fields.py", line 506, in pydantic.fields.ModelField.infer
File "pydantic\fields.py", line 436, in pydantic.fields.ModelField.__init__
File "pydantic\fields.py", line 546, in pydantic.fields.ModelField.prepare
File "pydantic\fields.py", line 570, in pydantic.fields.ModelField._set_default_and_type
File "pydantic\fields.py", line 439, in pydantic.fields.ModelField.get_default
File "pydantic\utils.py", line 693, in pydantic.utils.smart_deepcopy
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\copy.py", line 273, in _reconstruct
y.__setstate__(state)
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\site-packages\prefect_dask\task_runners.py", line 330, in __setstate__
self._client = distributed.get_client()
^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\Administrator\anaconda3\envs\prefect_test\Lib\site-packages\distributed\worker.py", line 2782, in get_client
raise ValueError("No global client found and no address provi
Abhishek Mitra
03/26/2023, 2:48 PMSamuel Hinton
03/27/2023, 2:47 AMYaron Levi
03/27/2023, 4:46 AMTim-Oliver
03/27/2023, 7:46 AMEmma Rizzi
03/27/2023, 9:03 AMDeceivious
03/27/2023, 9:26 AMPREFECT_TASKS_REFRESH_CACHE
has been added. But I cant find it documented in https://docs.prefect.io/concepts/settings/ .
Is the PREFECT_TASKS_REFRESH_CACHE
setting enforced on the prefect server/cloud OR is it only enforced on a single agent where the command is executed?jpuris
03/27/2023, 10:55 AMpython flows/flow.py
and parameters are filled in via
if __name__ == "__main__":
my_flow(
parameter_1=value_1,
parameter_2=value_2,
parameter_3=value_3,
)
I do not want my_flow
run to be registered on prefect cloud due to many different reasons š
How can I achieve this?Samuel Hinton
03/27/2023, 11:38 AMStephen Herron
03/27/2023, 12:35 PMEd Burroughes
03/27/2023, 3:16 PM@task(result=PrefectResult())
def return_results(table_name):
return {"table_name": table_name)
2. Running the task no matter the upstream state using trigger=all_finished:
@task(result=PrefectResult(), trigger=all_finished)
def return_results(table_name):
return {"table_name": table_name)
Can anyone point me in the right direction to the equivalent Prefect 2 features, thank you šjpuris
03/27/2023, 3:21 PM/tmp
where it would basically double the snapshot size compared to venv
it is copied from.
..more info in the threadRik
03/27/2023, 3:25 PMAdam
03/27/2023, 4:33 PMJarvis Stubblefield
03/27/2023, 6:13 PM@flow(name="whatever")
or if I leave it blank. Same error persists. What brought it up as odd is that I have had a similar issue in the past where it complained about duplicate task / flow names. The error is in the thread along with the deployment code thatās being run.Marc Lipoff
03/27/2023, 6:31 PMEric Ma
03/27/2023, 8:52 PMgoogle.auth._default - Authentication failed using Compute Engine authentication due to unavailable metadata server.
but when I run the project outside of this docker container, it calls the prefect secret block perfectly fine
Anonymous caller does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist)., 401
Ryan Brennan
03/28/2023, 4:04 AMon_failure
to access to the Exception class and/or traceback raised by the task?YSF
03/28/2023, 4:36 AMSamuel Hinton
03/28/2023, 6:09 AMLuis Arias
03/28/2023, 8:42 AMTimeout context manager should be used inside a task
works fine in 2.8.6 and below. I'm however not quite sure how to reproduce it outside of my code base.Luis Arias
03/28/2023, 8:42 AMDeepak Kumar
03/28/2023, 9:31 AMapply_deployment
step(prefect 2.8.0
):
TypeError: Type is not JSON serializable: <class name of the object>
Basically my flow decorated function takes this object as an argument. The error suggests that only JSON serialisable things could be used as a param for flows.
I need this object in the flow for further processing. What is the possible workaround for this?Marcos
03/28/2023, 11:51 AMrun_deployment
as suggested and I'm not able to evaluate the flow state. I also tried to wrap the run_deployment
within a task.
@task()
def run_worker(name, flow_run_name, etc....):
flow_run = run_deployment(
name=name,
flow_run_name=flow_run_name,
timeout=timeout,
tags=tags,
poll_interval=poll_interval,
parameters=parameters,
)
if flow_run.is_failed():
raise FailedRun(message=f"Subflow {flow_run_name} failed to complete successfully.")
What's the best way of chaining up subflows and terminate the parent flow if any subflow fails?Matthew Scanlon
03/28/2023, 1:29 PMDeceivious
03/28/2023, 1:45 PMMatt Delacour
03/28/2023, 2:04 PMError getting config status, workload certificates may not be configured: HTTP 404
Ed Burroughes
03/28/2023, 2:46 PMall_finished
but I am getting some unexpected behaviour. For example, if I have multiple upstream tasks like so:
@flow
def flow():
_do_something = do_something()
_do_something2 = do_something2()
_do_something3 = do_something3()
_always_run = always_run(
wait_for=[
allow_failure(_do_something),
allow_failure(_do_something2),
allow_failure(_do_something3)
]
)
return _always_run
Say if any do_something
functions fail, I would like always_run
to always implement. Currently, when I force any upstream task to fail the always_run task doesn't run. Any idea why this is and if there are any work around thanksYSF
03/28/2023, 4:07 PMDEPLOYMENT_NAME = "local-docker-deployment"
#Import your dag definition
from dag import augmentation_pipeline
#Import prefect deployment spec library
from prefect.deployments import Deployment
from prefect.infrastructure.docker import DockerContainer
from prefect.filesystems import LocalFileSystem
docker_container_block = DockerContainer.load("example-prefect")
#fs = LocalFileSystem(basepath='C:\\Users\\ykadmin081021\\Documents\\Projects\\del2\\')
fs = LocalFileSystem.load("test-ysf")
deployment = Deployment.build_from_flow(
flow=augmentation_pipeline,
name=DEPLOYMENT_NAME,
work_queue_name="test",
infrastructure=docker_container_block,
storage=fs
)
if __name__ == "__main__":
deployment.apply()
It fails with this error: FileNotFoundError: [Errno 2] No such file or directory: '/opt/prefect/C:\\Users\\ykadmin081021\\Documents\\Projects\\del2\\'
it seems to be pre-pending my provided path with /opt/prefect/
It creates the docker container successfully it seems, but then it can't access my local code. My understanding is for a docker block you have to provide storage (although is that true even if you're doing a COPY
of everything into the docker container anyways?)
Is this just always going to be incompatible with windows? Do I need to set it up as volume mount at runtime somehow? Does anyone have a working example of what I'm trying to do?