Paco Ibañez
10/06/2022, 2:45 AMVadym Dytyniak
10/06/2022, 7:50 AMprefect.context.scheduled_start_time
. Is it any alternative in Prefect 2? Thanks.Andreas Tsangarides
10/06/2022, 8:35 AMPekka
10/06/2022, 10:37 AMImportError: dbt-core needs to be installed to use this task; run `pip install "prefect-dbt[cli]"
when I definitely have prefect-dbt[cli]
installed both for the system and for the project.
The error happens when running
prefect_dbt.cli.commands.trigger_dbt_cli_command("dbt debug")
-- does this have to do with missing trigger_kwargs
-- not the package missing?
SOLVED: INSTALL PREFECT-DBT[CLI] AS ROOT (the subprocesses don't have the same PATH variable so the which
command doesn't find them)Zach Schuster
10/06/2022, 2:11 PMrectalogic
10/06/2022, 3:42 PMDavid Elliott
10/06/2022, 4:29 PMlogger
doesn’t seem to output logs either to terminal or to the cloud UI when using the DaskTaskRunner()
- I’m calling get_run_logger()
within the task - code attached in 🧵 . Any ideas / am I missing something?
(works fine with sequential / concurrent task runners, just not the Dask one)Nick DeCraene
10/06/2022, 4:47 PMNone
instead of what is hinted at in the method. Does anyone know of a work-around besides ignoring the errors?YZ
10/06/2022, 5:38 PMstate_handler(task, old_state, new_state)
Is there a way to access the original input parameters for the task
? For example, my original task is as below, and I would like to access config
in the state_handler
def state_handler(task, old_state, new_state):
# Question: how can I access the `config` variable I originally passed into `task`?
@task(state_handlers=[state_handler])
def my_task(config: Any):
if config.is_prod:
# do something
Daniel Burkhardt
10/06/2022, 5:59 PMJason Bertman
10/06/2022, 6:40 PM@flow(
task_runner=RayTaskRunner(
address="<ray://ray-cluster-kuberay-head-svc:10001>"
),
)
def main(...):
...
if __name__ == "__main__": # If this flow is called directly, don't use the address
main(..., task_runner=RayTaskRunner)
V
10/06/2022, 8:01 PMZackary Wixom
10/06/2022, 9:49 PMZac Hooper
10/07/2022, 4:47 AMRRuleSchedule(
rrule="DTSTART:20221007T120000Z\nRRULE:FREQ=WEEKLY;UNTIL=20240330T120000Z\nEXDATE:20221225T120000Z\nEXDATE:20221226T120000Z"
)
Thomas Opsomer
10/07/2022, 8:12 AMFailed to load and execute flow run: FlowStorageError('An error occurred while unpickling the flow:\n JSONDecodeError("Expecting \',\' delimiter: line 1141 column 82 (char 62124)")')
(We're using GCS for storage)Thomas Fredriksen
10/07/2022, 8:37 AMClovis
10/07/2022, 8:58 AMGCloudRun
. Great 🎉 !
However, after connecting to my prefect cloud environment no new block available and I could not find the way to find the Cloud UI current version. So my question was how can I retrieve the version from the cloud , and is there a way to update / rollback it manually ?Kelvin DeCosta
10/07/2022, 9:02 AMAnna Geller
Kelvin DeCosta
10/07/2022, 12:11 PMdataflow-ops
repo and I am planning to create an ECS service for a Prefect Agent that submits ECS Tasks that run the Prefect Deployments.
In the repo, as per my understanding, the agent runs as an ECS based on an ECS Task Definition which requires a Docker Image.
Does this Docker image need the different Python packages required by the flows
?
Or can it be lightweight and only include packages necessary for prefect
?Maren Elise Rønneseth
10/07/2022, 1:25 PMSeth Coussens
10/07/2022, 2:38 PMAnthony Desmier
10/07/2022, 3:40 PMUnknown parameter in input: "registeredBy", must be one of: family, taskRoleArn, executionRoleArn, networkMode, containerDefinitions, volumes, placementConstraints, requiresCompatibilities, cpu, memory, tags, pidMode, ipcMode, proxyConfiguration, inferenceAccelerators, ephemeralStorage, runtimePlatform
This appears to be an api incompatibility on the aws-sdk side where the json returned from the describe call contains fields that are incompatible with the register command. There is a workaround described in the issue, but I'm wondering if anyone has come across this before when using prefect-aws or whether we need to do some additional configuration on our end to account for this? Thanks in advance!Sam Thomas
10/07/2022, 3:45 PMclass Maker():
def __init__(self, val=5):
self._val=val
def make_number(self):
return self._val
@property
def make_number_task(self):
return prefect.Task(self.make_number)
class Adder():
def __init__(self, val=3):
self._val=val
def add_number(self, x):
return x+self._val
@property
def add_number_task(self):
return prefect.Task(self.add_number)
class Calculator():
def __init__(self):
self.maker = Maker()
self.adder = Adder()
def _flow(self):
a = self.maker.make_number_task()
b = self.adder.add_number_task(a)
return b
@property
def flow(self):
return prefect.Flow(self._flow)
This seems to work.
c = Calculator()
c.flow()
<Prefect messages>
8
c.maker._val=10
c.flow()
<Prefect messages>
13
I'm wondering if there's a better way of doing it. Wrapping class methods in prefect.flow or prefect.task doesn't work because it treats "self" like a required argument but the above seems to work.Sean Turner
10/07/2022, 5:35 PMflow
(foo
) is running and a new deployment is created that changes the flow code that foo
was executing.
QUESTION 1: Would foo
continue to execute and ignore the new deployment?
Also, what happens if foo
intends to trigger a sub flow
(bar
), but a new deployment is created that changes the flow code that bar
would execute?
So the timeline is
a. flow
foo
is triggered
b. sub flow
bar
is updated (sub flow id changes from 123
to 456
)
c. foo
gets to the point in execution where bar
is called.
QUESTION 2: Would foo
trigger the newly updated bar
code 456
? Or would foo
ignore the change to sub flow
bar
and trigger 123
as foo
was triggered before bar
was updated?Karan
10/07/2022, 6:18 PMJarvis Stubblefield
10/07/2022, 7:10 PMLuca Schneider
10/07/2022, 7:22 PMPaco Ibañez
10/07/2022, 9:16 PMflow
decorator. thanks!Kyle McChesney
10/07/2022, 10:34 PMexternal_job_id
parameter. So if a flow is running and it is to be associated with some external job record, the id is passed. If the id is passed, updates are made as part of the flow state handler (marking the job done when the flow completes, recording error messages, etc)
The above is working great. The flow transition handler is generic, and simply checks for the existence of the parameter in the context. The “new” use case is basically a flow that is triggered by some outside automation. Part of the flows responsibility is to create the external job record, get its ID, and update it when the flow completes. Meaning I dont have the parameter at the start. Is there anyway to save some kind of information onto the flow, within a task, so that I can check for its existence within the flow state transition function? I’ve tried doing flow.add_task(Parameter('external_job_id', default=$res)
where $res
is the result of a task that creates the job and returns the id (half way though the flow). I also tried setting it in the context directly within the task. No luck