matta
07/31/2021, 2:58 AMMichael Warnock
07/31/2021, 3:45 PMIrvin Tang
08/02/2021, 12:24 AMpyproject.toml
have a git dependency pointing to that test branch. after building the image with this new version of the library, registering the flow with that image, running the flow doesn’t display any of the additional logs
i have another project where i’m doing the same thing. the flows in that project do log the additional messages. i’m not sure if this might be the way i configured prefect/how i’m registering the flows. has anyone ever encountered this problem or have encountered something similar?
also please let me know if i was unclear about anything. thank you!Reece Hart
08/02/2021, 1:03 AMseqtk (args) | fastp (args) | sentieon (args) | samtools (args) >out.tmp
In our pipeline, most steps are wrapped in a script, which is what the Makefile calls. This step starts with apx 100GB of data and dumps a 150GB of data. Given the data volume, I would be reluctant to write intermediate files in lieu of the pipes.
Given the nature of workflow -- all command-line tools with file-based data -- I think adopting Prefect would amount to making most of our scripts into Prefect's ShellTasks. I wonder whether this is really worth the effort.
The main drivers for choosing a workflow tool are to help with pipeline versioning, schedule and track jobs, to help orchestrate infrastructure scale up/down.
Thanks for any guidance.Omar Sultan
08/02/2021, 8:23 AMSamuel Tober
08/02/2021, 9:33 AMmarkets = Parameter('markets', default=['stockholm', 'oslo'])
And then run a function:
total_df = load_data.map(
city=markets,
date_from=date_from
)
where I pass a single value parameter, date_from, and my list parameter markets.
Everything runs without error, however, the function is only run for the first value in the markets list. What I want is to run for each value in markets, using the same value of date_from for each element in markets. How can I achieve this?Jai Deo
08/02/2021, 9:51 AMaman gupta
08/02/2021, 12:07 PMRobert Hales
08/02/2021, 1:59 PMmax_retries
on a task at flow run time?Bouke Krom
08/02/2021, 3:18 PMArtifact
(just a link in our case) in a custom Slack notification. The State Handler gets a Flow
and State
. I'm having trouble finding the Artifact
somewhere in the Flow
objects. I guess I should try and find a flow_run
object of some sort?Mehdi Nazari
08/02/2021, 5:12 PMTask<name>
instance instead.Miguel Angel
08/02/2021, 6:23 PM.my-package
├── __init__.py
├── _config.yml
├── flows
├── __init__.py
├── flow1.py
├── flow2.py
└── flow3.py
└── utils.py
So I can expose my flows via storage object like this:
from prefect.storage import Module
storage = Module("my-package.flows")
Each of the flows that belong to my-package
have the following structure:
from prefect import Parameter, task
from prefect.utilities import logging as logging
def core_function(**args)-> prefect.flow:
# process flow
return flow
flow = core_function()
Seem legit to me, since I haven't spotted any downside, do you have any recommended pattern or advise?Krapi Shah
08/02/2021, 7:54 PMKyle McChesney
08/02/2021, 8:01 PM@task(result=S3Result('bucket', location='example.out')
def example():
return [1, 2, 3]
Is it just a pickle file that when loaded, it recreated the list of [1, 2, 3]
How does it work for more complicated returns, for example a task that returns a tuple or a pandas DataFrame?Madison Schott
08/02/2021, 8:05 PMPhilip MacMenamin
08/02/2021, 8:45 PMHarry Baker
08/02/2021, 9:36 PMMehdi Nazari
08/02/2021, 9:53 PMLeon Kozlowski
08/02/2021, 10:18 PMParameter
?
Something like:
choice_param = Parameter(
name="choice_param",
choices=["some", "choice", "params"],
default="some"
)
Sumit Kumar Rai
08/03/2021, 4:02 AMScarlett King
08/03/2021, 9:56 AMsnapshot_date = Parameter(‘snapshot_date’, default=dt.datetime.now().strftime(‘%Y-%m-%d’))
run = apply_map(full_flow, params=params, snapshot_date=unmapped(snapshot_date))
And inside full_flow
def full_flow(params, snapshot_date):
# ..
snapshot_date = dt.datetime.strptime(snapshot_date, ‘%Y-%m-%d’)
print(f’{snapshot_date:%Y%m%d}’)
# ..
It keeps giving me error because the Parameter object is passed instead of a string. How can I access the parameter value only?Samuel Kohlleffel
08/03/2021, 2:48 PMflow.serialized_hash()
to only register the modified flows. However, flow.serialized_hash()
is not returning a consistent hash value for flows that have not been modified. Why would this be the case?
For context, I'm testing by registering the flows locally with flow.register()
Kyle McChesney
08/03/2021, 4:05 PM.map
and include the mapped value into either the command or the helper_script. For example:
@task
def files():
return [
'/opt/file1.txt',
'/opt/file2.txt',
'/opt/file3.txt',
'/opt/file4.txt',
]
rm_task = ShellTask(
command='rm $file',
)
with Flow('shell') as flow:
files_to_delete = files()
rm_task.map(files_to_delete, helper_script='file="{mapped_value}"')
Mehdi Nazari
08/03/2021, 4:41 PMMiguel Angel
08/03/2021, 4:45 PMPhilip MacMenamin
08/03/2021, 4:51 PMConstantino Schillebeeckx
08/03/2021, 5:22 PM.py
file? we've split out commonly shared functionality between flows into an e.g. utils.py
file which is referenced in the flow. given all the storage documentation this design doesn't seem to fit into the intended use of storage.David Elliott
08/03/2021, 6:32 PMprefect-job-xxxxx
would create 4 ephemeral dask workers (named something like dask-root-xxxx
)
• Now the behaviour I'm seeing is:
◦ K8s agent creates the prefect-job-xxx
◦ In the prefect-job
logs, it gives me _prefect.DaskExecutor | Creating a new Dask cluster with __main__.make_cluster
.Creating scheduler pod on cluster. This may take some time._
◦ there are then 5x dask-root-xxx pods
created, where 1 of them is a dask scheduler - ie the scheduler no-longer sits within the prefect-job-xx
? Just wanted to check if this was expected/intended behaviour - I couldn't see any reference to it in the prefect release notes
• In addition, (and this is more a side note that I think the prefect k8s rbac needs updating) - I've had to add 2x more rulesets to my k8s RBAC to make it work - see these docs for what's now required. Here is specifically what's changed vs the prefect docs
Thanks!Billy McMonagle
08/03/2021, 8:24 PMprefect create project "My Project"
... is it safe to run this command multiple times?Jeff Baatz
08/03/2021, 9:00 PM