Michael Levenson
08/18/2022, 3:00 AMash
08/18/2022, 6:23 AMMarcin Grzybowski
08/18/2022, 7:32 AMRainer Schülke
08/18/2022, 9:40 AM@task(log_stdout=True)
def etl(
connector,
exec_file,
environment: Optional[str] = None
):
master_data = MasterDataSet(connector, exec_file)
master_data.get_dataset(environment)
master_data.load_dataset()
My flow looks like this:
.... as flow:
source = SOME_SOURCE
destination = SOME_DESTINATION
file_list = ['SOME/PATH/TO/FILE', 'ANOTHER/PATH/TO/FILE']
master_dataset = etl.map(
connector=unmapped(source),
exec_file=file_list
)
The cloud logs look like this:
Task 'etl_master_data': Starting task run...
Task 'etl_master_data': Finished task run for task with final state: 'Mapped'
Flow run FAILED: some reference tasks failed.
There is no failed reference task.
Does anybody has any idea why this is not working on the cloud but locally? It is configured like all the other flows, even the data handling by class is already implemented for other flows. I really have absolutely no clue why 😄Ha Pham
08/18/2022, 10:09 AMstart_date
parameter. In the flow code the default of this value is set like this
def sync_circleci(start_date: datetime.date = datetime.datetime.today().date()):
...
and it will come out in the YYYY-MM-DD format. Now when I deploy the flow, it seems that the default value is not recognized by Prefect UI. When I run this flow "using default param values", the flow fails with this error
prefect.exceptions.SignatureMismatchError: Function expects parameters ['start_date'] but was provided with parameters []
which means the default value is not recognized (?).
And when I select a date value using the date picker, looks like the date format is not recognized by my flow code with this error
- start_date: invalid date format
How should I handle this?Sven Aoki
08/18/2022, 10:11 AMJamie Blakeman
08/18/2022, 11:07 AM/api/flow_runs/
request from an external source?
This was something we’d do regularly with the GraphQL api in Prefect 1.0Oscar Björhn
08/18/2022, 11:34 AMSaman
08/18/2022, 12:12 PMOscar Björhn
08/18/2022, 1:01 PMLucien Fregosi
08/18/2022, 1:55 PMKubernetesJob
is it possible to specify a node-selector
and tolerations
to be able to assign the worker pod to a specific node pool ?
I can’t see it in the doc
Thanks for your helpClint M
08/18/2022, 2:11 PMnull
(local) storageTim Enders
08/18/2022, 2:23 PMRio McMahon
08/18/2022, 2:27 PMprefect agent start -q queue1 -q queue2 …
within the script below)
I am bootstrapping agents within a docker container with this python script:
import prefect
import asyncio
import subprocess
import time
import sys
async def bootstrap_agents():
# get list of work queues
pc = prefect.get_client()
work_queues = await pc.read_work_queues()
# create agents by ID
for wq in work_queues:
proc = subprocess.Popen(['prefect', 'agent', 'start', str(wq.id)])
print(f'Starting agent associated with {wq.name}. PID number is {proc.pid}', file=sys.stdout)
if __name__ == '__main__':
asyncio.run(bootstrap_agents())
while True:
time.sleep(1)
If there is a better deployment pattern I’d love to get insight on that.Ryan Lattanzi
08/18/2022, 2:49 PMProcess
- which means the flow run will be executed in the same container that the agent is running - am i understanding that correctly?Jared Robbins
08/18/2022, 2:54 PMTim-Oliver
08/18/2022, 3:07 PMDaskExecutor
with cluster_class="dask_jobqueue.SLURMCluster"
. Everything works fine, unless SLURM kills the resources (e.g. due to time limit). Then the computation is naturally terminated, but in the Prefect cloud I still see the flow and task running. I did an experiment where SLURM kills the resouces after 2min, but the flow is still shown as running for 1h 38min and counting. I thought that the heartbeat option should kick in at some point to flag the flow as Failed
. Has anyone experience with a similar setup?Charlie Henry
08/18/2022, 3:40 PMInterruptedError: denied: requested access to the resource is denied
. If I docker push
outside of prefect it'll push no problem.
Maybe it's an incorrect registry_url
?
flow.storage = Docker(
registry_url="<http://registry.hub.docker.com|registry.hub.docker.com>",
image_name="atddocker/atd-microstrategy",
image_tag=ENV,
)
Clint M
08/18/2022, 3:47 PMprefect agent start -q misspelledname
it shouldn’t auto create the queueSam Garvis
08/18/2022, 3:51 PMMohamed Ayoub Chettouh
08/18/2022, 4:41 PMVishy ganesh
08/18/2022, 5:40 PMJimmy Le
08/18/2022, 5:42 PM--work-queue
but the CLI says no such option exists. Removing the --work-queue
flagged resolved the issue.Shanhui Bono
08/18/2022, 6:21 PMJared Robbins
08/18/2022, 8:19 PMTim Enders
08/18/2022, 8:20 PMflatten
call in Prefect 2.0? I have a list of lists after mapping tasks and I used to be able to use flatten
to coalesce that into a single large list.Mars
08/18/2022, 8:46 PMrequests-oauthlib
connection, and now all I need to do is:
1. Copy logging.yaml
2. Set my logger:
requests_oauthlib:
level: DEBUG
handlers: [console]
propagate: no
3. Set PREFECT_LOGGING_SETTINGS_PATH=logging.yaml
in my .env
file.
And it works!Mars
08/18/2022, 9:16 PMcreate_markdown_artifact
in Prefect 2.0? I don’t see markdown or artifacts mentioned in the migration guide.Josh
08/18/2022, 9:56 PMmy_task.map(databases)
But I don’t want to overload my databases, so I want to set concurrency limits at the task level to make sure there are only ever N < limit
tasks running against a given database at one time. I know you can set task tags when I’m creating the task object, but can I do so as the flow is executing against mapped tasks?Blake Hamm
08/18/2022, 10:48 PMDeployment
class. Previously, I defined a KubernetesJob
infrastructure block in a different python script and used the .save()
method to instantiate a block in the UI. It looks like this was removed in 2.1... How can I do this now?
Moreover, I'm trying to understand what I pass in for the Deployment
class in the infrastructure
parameter. With the storage
parameter, I just load the storage
block that I previously defined. Can I do the same with the infrastructure
block? If so, how do I load an existing KubernetesJob
block? Or, is the new workflow to just input the KubernetesJob
dictionary directly in your Deployment
class? If that's the case, how do I tie this to a work queue?
In general, I'm a big fan of the KubernetesJob
block. idc whether it's an actual block or just a python dictionary I pass into the deployment. Either way, it's been extremely helpful to manage compute depending on the deployment. Would love to streamline it with CICD. I know that's in the works and I'm eager to see best practices with this and implement it.