Kelvin DeCosta
10/07/2022, 12:11 PMdataflow-ops
repo and I am planning to create an ECS service for a Prefect Agent that submits ECS Tasks that run the Prefect Deployments.
In the repo, as per my understanding, the agent runs as an ECS based on an ECS Task Definition which requires a Docker Image.
Does this Docker image need the different Python packages required by the flows
?
Or can it be lightweight and only include packages necessary for prefect
?Maren Elise Rønneseth
10/07/2022, 1:25 PMSeth Coussens
10/07/2022, 2:38 PMAnthony Desmier
10/07/2022, 3:40 PMUnknown parameter in input: "registeredBy", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform
This appears to be an api incompatibility on the aws-sdk side where the json returned from the describe call contains fields that are incompatible with the register command. There is a workaround described in the issue, but I'm wondering if anyone has come across this before when using prefect-aws or whether we need to do some additional configuration on our end to account for this? Thanks in advance!Sam Thomas
10/07/2022, 3:45 PMclass Maker():
def __init__(self, val=5):
self._val=val
def make_number(self):
return self._val
@property
def make_number_task(self):
return prefect.Task(self.make_number)
class Adder():
def __init__(self, val=3):
self._val=val
def add_number(self, x):
return x+self._val
@property
def add_number_task(self):
return prefect.Task(self.add_number)
class Calculator():
def __init__(self):
self.maker = Maker()
self.adder = Adder()
def _flow(self):
a = self.maker.make_number_task()
b = self.adder.add_number_task(a)
return b
@property
def flow(self):
return prefect.Flow(self._flow)
This seems to work.
c = Calculator()
c.flow()
<Prefect messages>
8
c.maker._val=10
c.flow()
<Prefect messages>
13
I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.Sean Turner
10/07/2022, 5:35 PMflow
(foo
) is running and a new deployment is created that changes the flow code that foo
was executing.
QUESTION 1: Would foo
continue to execute and ignore the new deployment?
Also, what happens if foo
intends to trigger a sub flow
(bar
), but a new deployment is created that changes the flow code that bar
would execute?
So the timeline is
a. flow
foo
is triggered
b. sub flow
bar
is updated (sub flow id changes from 123
to 456
)
c. foo
gets to the point in execution where bar
is called.
QUESTION 2: Would foo
trigger the newly updated bar
code 456
? Or would foo
ignore the change to sub flow
bar
and trigger 123
as foo
was triggered before bar
was updated?Karan
10/07/2022, 6:18 PMJarvis Stubblefield
10/07/2022, 7:10 PMLuca Schneider
10/07/2022, 7:22 PMPaco Ibañez
10/07/2022, 9:16 PMflow
decorator. thanks!Kyle McChesney
10/07/2022, 10:34 PMexternal_job_id
parameter. So if a flow is running and it is to be associated with some external job record, the id is passed. If the id is passed, updates are made as part of the flow state handler (marking the job done when the flow completes, recording error messages, etc)
The above is working great. The flow transition handler is generic, and simply checks for the existence of the parameter in the context. The “new” use case is basically a flow that is triggered by some outside automation. Part of the flows responsibility is to create the external job record, get its ID, and update it when the flow completes. Meaning I dont have the parameter at the start. Is there anyway to save some kind of information onto the flow, within a task, so that I can check for its existence within the flow state transition function? I’ve tried doing flow.add_task(Parameter('external_job_id', default=$res)
where $res
is the result of a task that creates the job and returns the id (half way though the flow). I also tried setting it in the context directly within the task. No luckHedgar
10/08/2022, 2:01 PMAaron Goebel
10/09/2022, 3:34 AMget_run_logger
. I've seen some references to PREFECT_LOGGING_EXTRA_LOGGERS
, but no legit examples for code that isn't library code. I have code like this at the top of all my utility function modules logging.getLogger("utils")
and have set PREFECT_LOGGING_EXTRA_LOGGERS='utils'
to no avail. Logs don't get streamed up to prefect UIBirkir Björnsson
10/09/2022, 3:28 PMsulfredlee
10/10/2022, 4:52 AMset_dependencies
I am trying to have this example:
@task(name="task_A")
def task_A() -(list, int):
test_output_list = list()
test_output_int = 10
return test_output_list, test_output_int
@task(name="task_B")
def task_B(input_a: list, input_b: int, input_c: float):
pirnt(f"{input_a}, {input_b}, {input_c},"
with Flow(name = "test_flow") as flow:
ret_a_list, ret_a_int = task_A()
task_B(ret_a_list, ret_a_int, 10.05)
I would like to change the implementation for the with Flow() as flow
to
test_flow = Flow(name="test_flow")
test_flow.set_dependencies([task_A, task_B], upstream_task=[task_A], downstream_tasks=[task_B])
I have 2 questions:
- is the set_dependencies
the correct function I should use?
- how to use set_dependencies
correctly to pass the output from task_A
to task_B
?
ThanksImre Kerr
10/10/2022, 8:33 AMRajvir Jhawar
10/10/2022, 1:21 PM"path": "/spec/template/spec/containers/0/env/-"
Which means the path for the volume mount should be:
"path": "/spec/template/spec/containers/0/volumeMounts/-"
When i use that path it doesn't work, any ideas?Mark
10/10/2022, 1:28 PMfrom prefect import flow
from prefect_dbt.cli.credentials import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
@flow
def trigger_dbt_cli_commands_flow():
dbt_cli_profile = DbtCliProfile.load("dbtbase")
trigger_kwargs = dict(
profiles_dir="../dbt/data-dbt",
project_dir="../dbt/data-dbt/dbt-project/",
overwrite_profiles=False,
dbt_cli_profile=dbt_cli_profile,
)
trigger_dbt_cli_command(
"dbt deps",
**trigger_kwargs
)
return result
trigger_dbt_cli_commands_flow()
Jai P
10/10/2022, 2:34 PMprefect-snowflake
library to wrap snowpark APIs?Kun Situ
10/10/2022, 2:41 PMraise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Client error '403 Forbidden' for url
Does anyone know how I can fix it, any help would be much appreciated!Jon Young
10/10/2022, 2:51 PMAttributeError: 'Parameter' object ...
. Why is the flow trying to run when I register it?
the more unclear thing is that when i declare a parameter, it returns a parameter object when i really just want the value that's passed to the flowDavid Cupp
10/10/2022, 4:20 PMsubmit()
method that lets you create task objects and them submit them directly, using wait_for
to specify dependencies.
Is there any similar mechanism for sub flows? I am interested in launching a "subflow" but without directly calling the annotated flow method, because I don't know which method I'm calling until runtime. For example, I would like to do this:
result = make_rpc_call()
newflow = Flow(
name=result.name,
parameters=result.parameters,
).submit(wait_for=[...])
is this possible?David Elliott
10/10/2022, 4:36 PMDavid Cupp
10/10/2022, 5:48 PM1 A = Task(...).submit()
...
5 B = Task(...).submit(wait_for=[A])
...
9 _ = Task(...).submit(wait_for=[B])
If an except is throw after line 1 and before line 9, is there an easy way to ensure that Task "A" does not start?Alexandru Anghel
10/10/2022, 6:06 PMGeorgiana Ogrean
10/10/2022, 6:38 PMConnectionRefusedError: [Errno 111] Connect call failed ('0.0.0.0', 4200)
The Orion UI comes up just fine when I do not try to build a deployment with:
prefect deployment build /app/my_package/flows/my_python_script.py:my_flow_name --name generate_deployment --tag dev --infra docker-container
Any ideas what I could be missing? More details in the thread.Kun Situ
10/10/2022, 8:04 PMLuca Schneider
10/10/2022, 8:16 PMJacqueline Garrahan
10/10/2022, 8:19 PMTaylor Babin
10/10/2022, 9:44 PMTaylor Babin
10/10/2022, 9:44 PMMatt Conger
10/10/2022, 10:28 PMMason Menges
10/10/2022, 10:36 PMTaylor Babin
10/11/2022, 2:15 PM