Marco Barbero Mota
06/07/2023, 5:14 PM@flow
in pipeline.py, each time updating settings.py metadata with values from battery_experiments.py.
The issue is that even though the global variables within settings change in each iteration (I print them out), it seem like the task decorator keeps taking the first iteration value always. I am guessing that this happens because the decorators are compiled before anything is ran, so they keep the values in settings.py before any update is done. I am not sure how to make the iterative updates in global variables to be reflected in the task decorator input.
Does any of you know if this is possible at all?
Thanks a lot!John Horn
06/07/2023, 5:18 PMprefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.
Given that I should be waiting for other tasks to finish.
Full source code:
from typing import List
from prefect import task, flow, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from prefect.client.schemas import State
@task
def divide_by_zero(input: int):
return input / 0
@task
def return_input(input: int):
return input
@task
def process_futures(futures_list: List[State]):
for future in futures_list:
print(future.is_failed())
if future.is_failed() == False:
print(future.result())
@flow(task_runner=ConcurrentTaskRunner())
def test_wait_for():
failed_future = divide_by_zero.submit(1, return_state=True)
success_future = return_input.submit(1, return_state=True)
futures_list=[failed_future, success_future]
print(failed_future.is_failed())
print(success_future.is_failed())
process_futures(futures_list, wait_for=[allow_failure(failed_future), allow_failure(success_future)])
if __name__ == '__main__':
test_wait_for()
Devin
06/07/2023, 6:00 PMFarhood Etaati
06/07/2023, 6:43 PMPREFECT_API_URL
programmatically in my python code but I can't, is there any example regarding this?Joish
06/07/2023, 7:15 PMSean Conroy
06/07/2023, 7:59 PMgperrone
06/07/2023, 8:02 PMMoe
06/07/2023, 11:28 PMRikimaru Yamaguchi
06/08/2023, 12:30 AMChris
06/08/2023, 7:24 AMDeceivious
06/08/2023, 8:22 AMMoty
06/08/2023, 8:37 AMMoty
06/08/2023, 9:12 AMGhislain Picard
06/08/2023, 11:34 AMFrancisco
06/08/2023, 12:05 PMprefect-gcp
? It is in an environment raising the flow in cloud run, on the machine where the prefect agent runs prefect==2.10.7
and prefect-gcp==0.4.1
are installed, it is a bit difficult to debug since the flow does not reach to be created in cloud run, however after a manual retry it works without problems,
Thanks in advance!...David Michael Carter
06/08/2023, 1:53 PMpyenv
now. Does anyone have experience with this or documentation that can point me in the correct direction? I tried using commands in the infrastructure block with no luck.Sean Conroy
06/08/2023, 3:27 PMFlow run infrastructure exited with non-zero status code -9.
with the crash.Gregory Hunt
06/08/2023, 4:23 PMJacob Bedard
06/08/2023, 9:27 PMdef little_function(thing):
print(thing)
@task(name='big function')
def big_function(list_of_things):
for thing in list_of_things:
little_function(thing)
Denys Volokh
06/09/2023, 5:51 AMclass SFTPClient(Base):
def __init__(self, block_name: str, broker_name: str = None) -> None:
self.rfs = RemoteFileSystem.load(block_name)
def load_file_data(self, remote_path: str) -> pd.DataFrame:
...
@flow(
name="Transfer files to S3 bucket",
retries=20,
retry_delay_seconds=5,
)
def flowrun(date: str = None):
context = get_run_context()
sftclient = SFTPClient(block_name="rjo-sftp", broker_name="RJO")
data = sftclient.load_file_data(remote_path="/tmp/test.csv")
if not data:
sftclient.rfs.filesystem.client.close()
sftclient = None
raise Exception("No data")
Daniel Manson
06/09/2023, 10:31 AMFilip Angelov
06/09/2023, 10:42 AMjobs_runs_submit_by_id_and_wait_for_completion
function from the prefect_databricks
package, but I am getting the below error at the end of flow run although the job in databricks succeeded. It is peculiar that the databricks job has only one task and the error refers to multiple tasks
error:
Flow run 'vermilion-manatee' - Created task run 'jobs_runs_get_output-0' for task 'jobs_runs_get_output'
12:37:18.270 | INFO | Flow run 'vermilion-manatee' - Submitted task run 'jobs_runs_get_output-0' for execution.
12:37:19.736 | ERROR | Task run 'jobs_runs_get_output-0' - Encountered exception during execution:
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url 'https://{removed}.<http://cloud.databricks.com/api/2.1/jobs/runs/get-output?run_id=5681931|cloud.databricks.com/api/2.1/jobs/runs/get-output?run_id=5681931>'
raise httpx.HTTPStatusError(
httpx.HTTPStatusError: A job run with multiple tasks was provided.JSON response: {'error_code': 'INVALID_PARAMETER_VALUE', 'message': 'Retrieving the output of runs with multiple tasks is not supported. Please retrieve the output of each individual task run instead.'}
Does anybody else have this problem using jobs_runs_submit_by_id_and_wait_for_completion
?Oscar Krantz
06/09/2023, 10:49 AMSonia Goyal
06/09/2023, 11:04 AMDeceivious
06/09/2023, 11:34 AMDenys Volokh
06/09/2023, 1:40 PMJavier Ebrero
06/09/2023, 1:41 PMDevin
06/09/2023, 1:48 PMIdan
06/09/2023, 2:57 PMRayTaskScheduler
, but it could be applied to any, I suppose.
Assume there is only one worker (node, process, or whatever fits the Scheduler
class), and the computation graph consists of multiple identical branches (A -> B -> C
x3).
Is there a way to tell Prefect to prefer to go DFS and complete one such branch, before attempting to continue to the other branches?
I’m aware of the wait_for
argument, is there anything else one can do?Giacomo Chiarella
06/09/2023, 3:12 PM