Mohamed Ayoub Chettouh
08/08/2022, 2:47 PMAndreas Nord
08/08/2022, 3:07 PMError during execution of task: ValueError('A secret name must be provided.')
The secret task runs for 18 minutes (?) and it downstream tasks succeeds somehow, see image. I am on Prefect 1.2.4Lukáš Pravda
08/08/2022, 4:25 PMList[int]
for simplicity here)
@task
def test_task(lst: List[int]):
log = prefect.context.logger
val = sum(lst)
<http://log.info|log.info>(val)
return val
What I’d like to do is to run a mapped task over, but I’m not entirely sure how to invoke it using unmapped
(if this is even possible). When I call it like:
with Flow("test flow") as flow:
p = Parameter("p", default=[[1, 2, 3], [2, 3, 4]])
test_task.map(p)
it works as expected and the test_task
is called with:
test_task([1,2,3])
test_task([2,3,4])
I’d like to achieve the following to be run, but with the use of unmapped task:
test_task(10, 1, 2)
test_task(20, 1, 2)
test_task(30, 1, 2)
so I tried
with Flow("test flow") as flow:
p = Parameter("p", default=[10, 20, 30])
test_task.map([p, unmapped(1), unmapped(2)])
this one fails with int not being iterable
exception, while if I remove the array like this:
with Flow("test flow") as flow:
p = Parameter("p", default=[10, 20, 30])
test_task.map(p, unmapped(1), unmapped(2))
I’m getting “too many positional arguments error”
I dont know how better should I put it to get what I need. Other solution is to come up with a task per array size, but that is just a lot. Any advice? Thank you very much for your help!Rohan Chutke
08/08/2022, 5:26 PMERROR
prefect-server.ZombieKiller.TaskRun
No heartbeat detected from the remote task; marking the run as failed.
Can anyone help me with workaround for this? Its trying to run a sql query but timing out. ThanksNeil Natarajan
08/08/2022, 6:37 PMIlya Galperin
08/08/2022, 8:13 PMprefecthq/prefect:2.0.2-python3.8
and trying to execute the following flow from this tutorial on a Kubernetes cluster pointing to an S3 storage block. When doing so, we receive the following error:
RuntimeError: File system created with scheme 's3' from base path 's3://<mybucket>' could not be created. You are likely missing a Python module required to use the given storage protocol.
It looks like others that have been experiencing this have had to manually run pip install s3fs
in their execution environment to get S3 external storage working with the Kubernetes execution environment. Is this the recommended deployment pattern for now? If so, is there a plan to start including these dependencies in the prefect filesystems package? It seems strange that we’d need a custom image for something that is supposed to already be tightly integrated with Prefect 2.0 and a fundamental requirement for using k8s infrastructure.
import prefect
from prefect import task, flow, get_run_logger
from prefect.filesystems import S3
s3_block = S3.load("aws-s3")
@task
def hello_world():
logger = get_run_logger()
text = "hello from orion_flow!"
<http://logger.info|logger.info>(text)
return text
@flow(name="orion_flow")
def orion_flow():
logger = get_run_logger()
<http://logger.info|logger.info>("Hello from Kubernetes!")
hw = hello_world()
return
Xavier Babu
08/08/2022, 8:42 PMThuy Tran
08/08/2022, 8:52 PMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/s3fs/core.py", line 646, in _lsdir
async for i in it:
File "/usr/local/lib/python3.8/site-packages/aiobotocore/paginate.py", line 32, in __anext__
response = await self._make_request(current_kwargs)
File "/usr/local/lib/python3.8/site-packages/aiobotocore/client.py", line 265, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidAccessKeyId) when calling the ListObjectsV2 operation: The AWS Access Key Id you provided does not exist in our records.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/usr/local/lib/python3.8/site-packages/prefect/filesystems.py", line 373, in get_directory
return await self.filesystem.get_directory(
File "/usr/local/lib/python3.8/site-packages/prefect/filesystems.py", line 251, in get_directory
return self.filesystem.get(from_path, local_path, recursive=True)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 111, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 96, in sync
raise return_result
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 53, in _runner
result[0] = await coro
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 536, in _get
rpaths = await self._expand_path(rpath, recursive=recursive)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 736, in _expand_path
out = await self._expand_path([path], recursive, maxdepth)
File "/usr/local/lib/python3.8/site-packages/fsspec/asyn.py", line 754, in _expand_path
rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
File "/usr/local/lib/python3.8/site-packages/s3fs/core.py", line 718, in _find
out = await self._lsdir(path, delimiter="", prefix=prefix)
File "/usr/local/lib/python3.8/site-packages/s3fs/core.py", line 669, in _lsdir
raise translate_boto_error(e)
PermissionError: The AWS Access Key Id you provided does not exist in our records.
Jeffrey Lam
08/08/2022, 10:29 PMMarcos
08/08/2022, 10:35 PMimport asyncio
from prefect import task, get_run_logger, flow
@task
async def task1():
await asyncio.sleep(1)
@task
def task2():
logger = get_run_logger()
<http://logger.info|logger.info>("hi")
@flow
async def test_flow():
await task1.submit()
task2.submit()
The following exception is raised:
Encountered exception during execution:
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 550, in orchestrate_flow_run
result = await flow_call()
File "/app/src/streamcast/agent/flows/test.py", line 22, in test_flow
task2.submit()
File "/opt/conda/lib/python3.10/site-packages/prefect/tasks.py", line 491, in submit
return enter_task_run_engine(
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 691, in enter_task_run_engine
return flow_run_context.sync_portal.call(begin_run)
File "/opt/conda/lib/python3.10/site-packages/anyio/from_thread.py", line 283, in call
return cast(T_Retval, self.start_task_soon(func, *args).result())
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/opt/conda/lib/python3.10/site-packages/anyio/from_thread.py", line 219, in _call_func
retval = await retval
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 783, in create_task_run_then_submit
task_run = await create_task_run(
File "/opt/conda/lib/python3.10/site-packages/prefect/engine.py", line 823, in create_task_run
task_run = await flow_run_context.client.create_task_run(
File "/opt/conda/lib/python3.10/site-packages/prefect/client.py", line 1635, in create_task_run
response = await <http://self._client.post|self._client.post>(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/opt/conda/lib/python3.10/site-packages/prefect/client.py", line 258, in send
await super().send(*args, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
response = await connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/connection.py", line 90, in handle_async_request
return await self._connection.handle_async_request(request)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http11.py", line 105, in handle_async_request
raise exc
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http11.py", line 84, in handle_async_request
) = await self._receive_response_headers(**kwargs)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http11.py", line 148, in _receive_response_headers
event = await self._receive_event(timeout=timeout)
File "/opt/conda/lib/python3.10/site-packages/httpcore/_async/http11.py", line 177, in _receive_event
data = await self._network_stream.read(
File "/opt/conda/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 33, in read
return await self._stream.receive(max_bytes=max_bytes)
File "/opt/conda/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1265, in receive
await self._protocol.read_event.wait()
File "/opt/conda/lib/python3.10/asyncio/locks.py", line 211, in wait
fut = self._get_loop().create_future()
File "/opt/conda/lib/python3.10/asyncio/mixins.py", line 30, in _get_loop
raise RuntimeError(f'{self!r} is bound to a different event loop')
RuntimeError: <asyncio.locks.Event object at 0x7fd237ee4310 [unset]> is bound to a different event loop
Blake Hamm
08/08/2022, 10:44 PMIan Andres Etnyre Mercader
08/08/2022, 11:05 PMPYTHONPATH=/home/alburati/Proyectos/pipeline/src prefect deployment build src/sources/biorxiv/flows/biorxiv_main_flow.py:biorxiv_main_flow -n 'source_flow' -t source_work
Error:
Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/site-packages/prefect/engine.py", line 246, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/site-packages/prefect/deployments.py", line 325, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/site-packages/prefect/filesystems.py", line 95, in get_directory
shutil.copytree(from_path, local_path)
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/shutil.py", line 324, in copytree
os.makedirs(dst)
File "/home/alburati/anaconda3/envs/nlp_env/lib/python3.7/os.py", line 223, in makedirs
mkdir(name, mode)
FileExistsError: [Errno 17] File exists: '.'
Any suggestions?js10
08/09/2022, 2:11 AMFlow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/prefect/engine.py", line 247, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.9/dist-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.9/dist-packages/prefect/deployments.py", line 47, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/usr/local/lib/python3.9/dist-packages/prefect/filesystems.py", line 98, in get_directory
shutil.copytree(from_path, local_path, dirs_exist_ok=True)
File "/usr/lib/python3.9/shutil.py", line 568, in copytree
return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks,
File "/usr/lib/python3.9/shutil.py", line 522, in _copytree
raise Error(errors)
shutil.Error: [('/home/app/app/.prefect/orion.db-shm', './.prefect/orion.db-shm', "[Errno 2] No such file or directory: '/home/app/app/.prefect/orion.db-shm'")]
wonsun
08/09/2022, 4:29 AMRio McMahon
08/09/2022, 5:00 AMHafsa Junaid
08/09/2022, 6:55 AMJamie Blakeman
08/09/2022, 8:58 AMAnat Tal Gagnon
08/09/2022, 9:13 AMGaurav Nagar
08/09/2022, 9:16 AMDeployment
object for creating flows, now it is deprecated. Is there any other way to run flows dynamically with schedule. Haven't found any way to run flows dynamically through code.Patrick Tan
08/09/2022, 1:13 PMRoger Webb
08/09/2022, 1:22 PMPedro Machado
08/09/2022, 2:01 PMRoger Webb
08/09/2022, 2:15 PMFlow_A = create_flow_run(
flow_name="Flow A",
project_name="Project A",
task_args=dict(name="Flow A (Execution)"),
scheduled_start_time=pendulum.now().add(minutes=60),
parameters={"Parameter1":"Flow A Parameter 1"}
)
Flow_A_Flag = wait_for_flow_run(
Flow_A,
raise_final_state=True,
stream_logs=True
)
I would expect the parent flow to kick off the flow run.. and then the wait for to wait for an hour before that wait for would succeed. But, it appears that the Flow A actually executes immediately.. not waiting the hour, so the wait_for succeeds after minutes. Is my misunderstanding in the scheduled_Start_time.. or the wait_for?Jessica Smith
08/09/2022, 2:48 PMclone_url = "XXXXXXX"
with TemporaryGitRepo(clone_url) as temp_repo:
flow = extract_flow_from_file(
file_path=os.path.join(
temp_repo.temp_dir.name,
"path/to/flow/fil.py",
),
flow_name="flow_name",
)
Patrick Tan
08/09/2022, 3:01 PMdatamongus
08/09/2022, 3:26 PMSam Garvis
08/09/2022, 3:28 PM$ prefect profile use development_service_account
⠋ Connecting...
Error authenticating with Prefect Cloud using profile 'development_service_account'
No matter what I do I cannot connect to our Prefect 2.0 Cloud environment from my terminal.
When I try to run a flow locally I get
RuntimeError: Cannot create flow run. Failed to reach API at <https://api.prefect.cloud/api/accounts> ...
My account_id and workspace_id are correct, I have checked.
I don't understand why I cannot connectTarek
08/09/2022, 4:02 PMTony Yun
08/09/2022, 4:08 PM.prefect/config.toml
file to runtime environment? For example, I’m writing pytest
in VScode, when I run the test, it always fail for not finding the secrets that specified in that config file. I can only run in CLI python flow.py
fine.Oscar Björhn
08/09/2022, 4:29 PM