Steve Pamer
07/30/2021, 7:43 PMMichael Warnock
07/30/2021, 8:50 PMconfig.toml
and secrets added to it becoming available when doing local testing? That appears to be the case, since a missing local secret error went away seemingly magically, as I was preparing to ask about it. Alternatively, I had added the secret to cloud first, so maybe there was a (longer) lag before it found it there, and it's still not using my local-secrets?
Also, SlackTask doesn't work when supplying 'message' at runtime (or directly to run); I get TypeError: run() got multiple values for argument 'message'
- something to do with that defaults_from_attrs magic?Fina Silva-Santisteban
07/30/2021, 9:02 PMdef do_something_1(params_1):
(does something)
File 2:
from path_to_file.file_1 import do_something_1
class SomethingTask(Task):
def run(self, params_1):
do_something_1(params_1)
do_something_2
File 3:
import unittest
from unittest.mock import patch
from path_to_file.file_2.SomethingTask import SomethingTask
@patch('path_to_file.file_1.do_something_1')
class TestSomethingTask(unittest.TestCase):
def test_1(self, mock_do_something_1):
task = SomethingTask()
task.run()
self.assertTrue(mock_do_something_1.called)
Instead of mocking do_something_1()
it actually calls do_something_1()
, and strangely the assertion returns False.Kyle McChesney
07/30/2021, 9:35 PMAn error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
I am running a flow via an ECS agent. It worked just fine until I specified a custom value for image
when submitting the job via the UI. The image I specified was for an ECR imagematta
07/31/2021, 2:58 AMMichael Warnock
07/31/2021, 3:45 PMIrvin Tang
08/02/2021, 12:24 AMpyproject.toml
have a git dependency pointing to that test branch. after building the image with this new version of the library, registering the flow with that image, running the flow doesn’t display any of the additional logs
i have another project where i’m doing the same thing. the flows in that project do log the additional messages. i’m not sure if this might be the way i configured prefect/how i’m registering the flows. has anyone ever encountered this problem or have encountered something similar?
also please let me know if i was unclear about anything. thank you!Reece Hart
08/02/2021, 1:03 AMseqtk (args) | fastp (args) | sentieon (args) | samtools (args) >out.tmp
In our pipeline, most steps are wrapped in a script, which is what the Makefile calls. This step starts with apx 100GB of data and dumps a 150GB of data. Given the data volume, I would be reluctant to write intermediate files in lieu of the pipes.
Given the nature of workflow -- all command-line tools with file-based data -- I think adopting Prefect would amount to making most of our scripts into Prefect's ShellTasks. I wonder whether this is really worth the effort.
The main drivers for choosing a workflow tool are to help with pipeline versioning, schedule and track jobs, to help orchestrate infrastructure scale up/down.
Thanks for any guidance.Omar Sultan
08/02/2021, 8:23 AMSamuel Tober
08/02/2021, 9:33 AMmarkets = Parameter('markets', default=['stockholm', 'oslo'])
And then run a function:
total_df = load_data.map(
city=markets,
date_from=date_from
)
where I pass a single value parameter, date_from, and my list parameter markets.
Everything runs without error, however, the function is only run for the first value in the markets list. What I want is to run for each value in markets, using the same value of date_from for each element in markets. How can I achieve this?Jai Deo
08/02/2021, 9:51 AMaman gupta
08/02/2021, 12:07 PMRobert Hales
08/02/2021, 1:59 PMmax_retries
on a task at flow run time?Bouke Krom
08/02/2021, 3:18 PMArtifact
(just a link in our case) in a custom Slack notification. The State Handler gets a Flow
and State
. I'm having trouble finding the Artifact
somewhere in the Flow
objects. I guess I should try and find a flow_run
object of some sort?Mehdi Nazari
08/02/2021, 5:12 PMTask<name>
instance instead.Miguel Angel
08/02/2021, 6:23 PM.my-package
├── __init__.py
├── _config.yml
├── flows
├── __init__.py
├── flow1.py
├── flow2.py
└── flow3.py
└── utils.py
So I can expose my flows via storage object like this:
from prefect.storage import Module
storage = Module("my-package.flows")
Each of the flows that belong to my-package
have the following structure:
from prefect import Parameter, task
from prefect.utilities import logging as logging
def core_function(**args)-> prefect.flow:
# process flow
return flow
flow = core_function()
Seem legit to me, since I haven't spotted any downside, do you have any recommended pattern or advise?Krapi Shah
08/02/2021, 7:54 PMKyle McChesney
08/02/2021, 8:01 PM@task(result=S3Result('bucket', location='example.out')
def example():
return [1, 2, 3]
Is it just a pickle file that when loaded, it recreated the list of [1, 2, 3]
How does it work for more complicated returns, for example a task that returns a tuple or a pandas DataFrame?Madison Schott
08/02/2021, 8:05 PMPhilip MacMenamin
08/02/2021, 8:45 PMHarry Baker
08/02/2021, 9:36 PMMehdi Nazari
08/02/2021, 9:53 PMLeon Kozlowski
08/02/2021, 10:18 PMParameter
?
Something like:
choice_param = Parameter(
name="choice_param",
choices=["some", "choice", "params"],
default="some"
)
Sumit Kumar Rai
08/03/2021, 4:02 AMScarlett King
08/03/2021, 9:56 AMsnapshot_date = Parameter(‘snapshot_date’, default=dt.datetime.now().strftime(‘%Y-%m-%d’))
run = apply_map(full_flow, params=params, snapshot_date=unmapped(snapshot_date))
And inside full_flow
def full_flow(params, snapshot_date):
# ..
snapshot_date = dt.datetime.strptime(snapshot_date, ‘%Y-%m-%d’)
print(f’{snapshot_date:%Y%m%d}’)
# ..
It keeps giving me error because the Parameter object is passed instead of a string. How can I access the parameter value only?Samuel Kohlleffel
08/03/2021, 2:48 PMflow.serialized_hash()
to only register the modified flows. However, flow.serialized_hash()
is not returning a consistent hash value for flows that have not been modified. Why would this be the case?
For context, I'm testing by registering the flows locally with flow.register()
Kyle McChesney
08/03/2021, 4:05 PM.map
and include the mapped value into either the command or the helper_script. For example:
@task
def files():
return [
'/opt/file1.txt',
'/opt/file2.txt',
'/opt/file3.txt',
'/opt/file4.txt',
]
rm_task = ShellTask(
command='rm $file',
)
with Flow('shell') as flow:
files_to_delete = files()
rm_task.map(files_to_delete, helper_script='file="{mapped_value}"')
Mehdi Nazari
08/03/2021, 4:41 PMMiguel Angel
08/03/2021, 4:45 PMPhilip MacMenamin
08/03/2021, 4:51 PMPhilip MacMenamin
08/03/2021, 4:51 PMMichael Adkins
08/03/2021, 4:52 PMKevin Kho
08/03/2021, 4:53 PMPhilip MacMenamin
08/03/2021, 4:55 PMKevin Kho
08/03/2021, 4:56 PMPhilip MacMenamin
08/03/2021, 4:56 PMKevin Kho
08/03/2021, 5:00 PMflow.serialized_hash
so if you add code to the task, it doesn’t affect the serialized hash because Prefect only stores the metadata , and not the actual code. You can force registration though with:
prefect register --force ...
Philip MacMenamin
08/03/2021, 5:06 PM--force
did the trick -thanks!