David Elliott
08/17/2022, 5:05 PMJosh Paulin
08/17/2022, 5:48 PMKhuyen Tran
08/17/2022, 7:12 PMemo loic
08/17/2022, 7:47 PMTony Yun
08/17/2022, 8:07 PMIlya Galperin
08/17/2022, 8:31 PMkubernetes-job
infrastructure block in Prefect 2.0? In 1.0, we could accomplish this by using the KubernetesRun
module and passing it like so: KubernetesRun(job_template_path="my/local/path.yaml")
It seems like we can try to mimic the functionality in 2.0 by using some hacks to pipe the output from our job template .yaml file, represent it as a one-line JSON string then force that output into --override job=
but this doesn’t seem ideal.Ross Teach
08/17/2022, 8:56 PMThis work queue uses a deprecated tag-based approach to matching flow runs; it will continue to work but you can't modify itI noticed this warning on the Prefect Cloud UI. Is there any documentation on the recommended approach to use?
Kathryn Klarich
08/17/2022, 9:30 PMAaron Goebel
08/18/2022, 12:10 AMParameters
or something analogous as input to a KubernetesRun
to request a specific amount of resources on the fly? I'm working w/ Fargate and want to give users flexibility to run flows with differing amounts of computeMichael Levenson
08/18/2022, 2:47 AMMichael Levenson
08/18/2022, 3:00 AMash
08/18/2022, 6:23 AMMarcin Grzybowski
08/18/2022, 7:32 AMRainer Schülke
08/18/2022, 9:40 AM@task(log_stdout=True)
def etl(
connector,
exec_file,
environment: Optional[str] = None
):
master_data = MasterDataSet(connector, exec_file)
master_data.get_dataset(environment)
master_data.load_dataset()
My flow looks like this:
.... as flow:
source = SOME_SOURCE
destination = SOME_DESTINATION
file_list = ['SOME/PATH/TO/FILE', 'ANOTHER/PATH/TO/FILE']
master_dataset = etl.map(
connector=unmapped(source),
exec_file=file_list
)
The cloud logs look like this:
Task 'etl_master_data': Starting task run...
Task 'etl_master_data': Finished task run for task with final state: 'Mapped'
Flow run FAILED: some reference tasks failed.
There is no failed reference task.
Does anybody has any idea why this is not working on the cloud but locally? It is configured like all the other flows, even the data handling by class is already implemented for other flows. I really have absolutely no clue why 😄Ha Pham
08/18/2022, 10:09 AMstart_date
parameter. In the flow code the default of this value is set like this
def sync_circleci(start_date: datetime.date = datetime.datetime.today().date()):
...
and it will come out in the YYYY-MM-DD format. Now when I deploy the flow, it seems that the default value is not recognized by Prefect UI. When I run this flow "using default param values", the flow fails with this error
prefect.exceptions.SignatureMismatchError: Function expects parameters ['start_date'] but was provided with parameters []
which means the default value is not recognized (?).
And when I select a date value using the date picker, looks like the date format is not recognized by my flow code with this error
- start_date: invalid date format
How should I handle this?Sven Aoki
08/18/2022, 10:11 AMJamie Blakeman
08/18/2022, 11:07 AM/api/flow_runs/
request from an external source?
This was something we’d do regularly with the GraphQL api in Prefect 1.0Oscar Björhn
08/18/2022, 11:34 AMSaman
08/18/2022, 12:12 PMOscar Björhn
08/18/2022, 1:01 PMLucien Fregosi
08/18/2022, 1:55 PMKubernetesJob
is it possible to specify a node-selector
and tolerations
to be able to assign the worker pod to a specific node pool ?
I can’t see it in the doc
Thanks for your helpClint M
08/18/2022, 2:11 PMnull
(local) storageTim Enders
08/18/2022, 2:23 PMRio McMahon
08/18/2022, 2:27 PMprefect agent start -q queue1 -q queue2 …
within the script below)
I am bootstrapping agents within a docker container with this python script:
import prefect
import asyncio
import subprocess
import time
import sys
async def bootstrap_agents():
# get list of work queues
pc = prefect.get_client()
work_queues = await pc.read_work_queues()
# create agents by ID
for wq in work_queues:
proc = subprocess.Popen(['prefect', 'agent', 'start', str(wq.id)])
print(f'Starting agent associated with {wq.name}. PID number is {proc.pid}', file=sys.stdout)
if __name__ == '__main__':
asyncio.run(bootstrap_agents())
while True:
time.sleep(1)
If there is a better deployment pattern I’d love to get insight on that.Ryan Lattanzi
08/18/2022, 2:49 PMProcess
- which means the flow run will be executed in the same container that the agent is running - am i understanding that correctly?Jared Robbins
08/18/2022, 2:54 PMTim-Oliver
08/18/2022, 3:07 PMDaskExecutor
with cluster_class="dask_jobqueue.SLURMCluster"
. Everything works fine, unless SLURM kills the resources (e.g. due to time limit). Then the computation is naturally terminated, but in the Prefect cloud I still see the flow and task running. I did an experiment where SLURM kills the resouces after 2min, but the flow is still shown as running for 1h 38min and counting. I thought that the heartbeat option should kick in at some point to flag the flow as Failed
. Has anyone experience with a similar setup?Charlie Henry
08/18/2022, 3:40 PMInterruptedError: denied: requested access to the resource is denied
. If I docker push
outside of prefect it'll push no problem.
Maybe it's an incorrect registry_url
?
flow.storage = Docker(
registry_url="<http://registry.hub.docker.com|registry.hub.docker.com>",
image_name="atddocker/atd-microstrategy",
image_tag=ENV,
)
Clint M
08/18/2022, 3:47 PMprefect agent start -q misspelledname
it shouldn’t auto create the queueSam Garvis
08/18/2022, 3:51 PMSam Garvis
08/18/2022, 3:51 PMAnna Geller
08/18/2022, 3:55 PM