Mathijs Carlu
07/15/2022, 9:00 AMprefect deployment create file.py
the deployment gets created.
Now, when I modify the flow a little (flow name stays the same), change the deployment name and then re-execute the above command, a new deployment is created. This deployment points at the same flow object (flow_id is the same for both). However, both deployments execute different code, 'different versions of the same flow' if you will, although this 'version number' is not saved anywhere (I think).
This all is due to the fact that the location of the flow code (flow_data) is saved with the deployment, and not with the flow, which seems a little counterintuitive for me. If I see 2 flow runs in the UI that executed the same flow, I would expect them to have executed the same code.Stephen Lloyd
07/15/2022, 10:19 AMharis khan
07/15/2022, 10:24 AMRajvir Jhawar
07/15/2022, 10:43 AMyu zeng
07/15/2022, 12:23 PMfrom prefect.storage import GitHub,GitLab, S3, Webhook
from prefect.backend.artifacts import create_link_artifact
import prefect.engine.cache_validators
@task( task_run_name="mviz_task_{md5}", max_retries=0, cache_for=datetime.timedelta(hours=1),
)
def test( md5, ):
print('do test', md5 )
with Flow("epl" ) as flow:
test( '123' )
test( '123' )
flow.run()
hi, i try to use cache in a single flow run, but i got the belllow output which shows that cache not work. it seems that cache will not work durning same flow run or there are some mistakes in my code ?
[2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'epl'
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
do test 123
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Starting task run...
[2022-07-15 12:18:00+0000] WARNING - prefect.TaskRunner | Task 'test': Can't use cache because it is now invalid
do test 123
[2022-07-15 12:18:00+0000] INFO - prefect.TaskRunner | Task 'test': Finished task run for task with final state: 'Cached'
[2022-07-15 12:18:00+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Jehan Abduljabbar
07/15/2022, 1:32 PMjack
07/15/2022, 2:02 PMJason
07/15/2022, 3:04 PMalex
07/15/2022, 4:15 PMJosh
07/15/2022, 4:54 PMAndy Dang
07/15/2022, 5:10 PMDivya
07/16/2022, 1:30 AMMichael Reynolds
07/16/2022, 11:24 PMMichael Reynolds
07/16/2022, 11:33 PM23:29:26.313 | ERROR | Flow run 'mustard-quetzal' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
flow_call
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
cancellable=True,
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
await coro
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
func, *args, cancellable=cancellable, limiter=limiter
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/goron/main.py", line 35, in run_pipeline
messages = poll_kafka( conf[ 'kafka' ] ).result()
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
self._result, timeout=timeout, raise_on_failure=raise_on_failure
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
raise data
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
self._results[run_key] = await run_fn(**run_kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
client=client,
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
result, serializer="cloudpickle"
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: can't pickle cimpl.Message objects
(venv) michael.reynolds@mac1319 goron %
Michael Reynolds
07/16/2022, 11:34 PM23:29:26.313 | ERROR | Flow run 'mustard-quetzal' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 521, in orchestrate_flow_run
flow_call
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 120, in run_sync_in_interruptible_worker_thread
cancellable=True,
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 702, in _run_wrapped_task
await coro
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/to_thread.py", line 32, in run_sync
func, *args, cancellable=cancellable, limiter=limiter
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 96, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/goron/main.py", line 35, in run_pipeline
messages = poll_kafka( conf[ 'kafka' ] ).result()
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 210, in result
self._result, timeout=timeout, raise_on_failure=raise_on_failure
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 221, in sync
return run_async_from_worker_thread(__async_fn, *args, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/utilities/asyncio.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/myapp/.local/lib/python3.7/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
return self.__get_result()
File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/states.py", line 136, in result
raise data
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/task_runners.py", line 314, in _run_and_store_result
self._results[run_key] = await run_fn(**run_kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 799, in begin_task_run
client=client,
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/engine.py", line 905, in orchestrate_task_run
result, serializer="cloudpickle"
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/states.py", line 130, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/home/myapp/.local/lib/python3.7/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/home/myapp/.local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: can't pickle cimpl.Message objects
Jelle Vegter
07/17/2022, 12:35 PMJack Sundberg
07/17/2022, 1:59 PMEmil Ăstergaard
07/17/2022, 9:05 PM/api/blocks
endpoint.
In the 2.0b8 api I see endpoints for block schemas, block_types and block_document,
but as far as I can tell none of these create a block?
Through the UI, it is not possible to supply the necessary client_kwargs
and config_kwargs
.
Is this no longer possible, or am I misunderstanding something?
For reference I used a request like this:
PAYLOAD=$(cat <<EOF
{
"name": "minio",
"block_spec_id": ${FILE_STORAGE_ID},
"data": {
"base_path": "<s3://prefect-flows>",
"key_type": "hash",
"options": {
"use_ssl": false,
"key": "blablabla",
"secret": "blablabla",
"client_kwargs": {"endpoint_url": "<http://minio:9000>"},
"config_kwargs":{"signature_version": "s3v4"},
}
}
}
EOF
)
BLOCK_ID=$(echo -n "$PAYLOAD" | curl -vs -XPOST -H "Content-Type: application/json" <http://localhost:4200/api/blocks/> -d@- | jq -r '.id')
Stefan
07/17/2022, 9:46 PMyu zeng
07/18/2022, 5:30 AMRajeshwar Agrawal
07/18/2022, 6:34 AMBeginning Flow run for 'âŚ.'
which indicates that flow was retried by prefect. Any ideas?
It seems that heartbeat is also not disabled, as we are seeing logs No heartbeat detected from the remote task; marking the run as failed.
FYI we are using prefect server community Core Version 1.2.0+10.gafda99411J
07/18/2022, 7:19 AMxyzz
07/18/2022, 9:28 AMRiccardo Tesselli
07/18/2022, 9:46 AMAbhishek Mitra
07/18/2022, 10:22 AMPaul Lucas
07/18/2022, 11:17 AMDbtShellTask
which isnât very helpful. Is there a way to rename the individual tasks to something more useful?
Iâve tried using name
and task_run_name
arguments but without any luck.haris khan
07/18/2022, 12:44 PMPriyank
07/18/2022, 1:03 PMEmil Barbuta
07/18/2022, 1:28 PMwait_for_flow_run
fails by throwing TypeError: Object of type FlowRunView is not JSON serializable
. Am I passing the wrong object type as input?
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("run-pipeline") as flow:
flow_a_id = create_flow_run(flow_name=flow_a.name, ...[SOME INPUT PARAMS])
flow_a_wait = wait_for_flow_run(flow_a_id)
(I'm using prefect 1.0.0)Josh Paulin
07/18/2022, 1:55 PMJosh Paulin
07/18/2022, 1:55 PMKevin Kho
07/18/2022, 2:14 PMJosh Paulin
07/18/2022, 2:15 PMKevin Kho
07/18/2022, 2:21 PMJosh Paulin
07/18/2022, 2:23 PM