Jeremy Phelps
09/27/2021, 8:00 PMHugo Slepicka
09/27/2021, 11:09 PMimport random
from prefect import task, Flow
class Generator:
def __init__(self, data):
self.particle = data
@task
def run_generator():
return Generator(random.random())
@task
def compute(particle):
print(particle**2)
with Flow('flow1') as flow:
gen = run_generator()
compute(gen.particle)
When I try to create the flow I get the following error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/var/folders/7g/2_wk8ddx7h5_4r12flp9031m0000gn/T/ipykernel_3073/2988601640.py in <module>
1 with Flow('flow1') as flow:
2 gen = run_generator()
----> 3 val = compute(gen.particle)
AttributeError: 'FunctionTask' object has no attribute 'particle'
Is there any chance I can have the delayed execution of the parameter until the flow is executed?
I know that changing compute
to receive the object and handling the data access there works but not always I have the possibility to change the function and it would be bad in my case to create many wrappers for users of my library.Ben Muller
09/28/2021, 1:08 AMMap a function that adds tasks to a flow elementwise across one or more tasks. Arguments that should not be mapped over should be wrapped with prefect.unmapped.
but I feel like an example is needed
trying to do something like this:
users = ["ben", "kevin"]
CONSTANT_DATA = "something"
with Flow():
apply_map(notify, CONSTANT_DATA)
How do I hold this constant?Ryan Sattler
09/28/2021, 5:05 AMlatest
)jake lee
09/28/2021, 6:04 AMSon Mai
09/28/2021, 8:34 AMSridhar
09/28/2021, 9:33 AMfrom prefect.agent.ecs.agent import ECSAgent
AGENT = ECSAgent(cluster="prefect-fargate-cluster", labels=['ecs', 'dev'])
AGENT.start()
flow.py code:
import prefect
from prefect.storage import Docker
from prefect.run_configs import ECSRun
from prefect import task, Flow, Parameter
STORAGE = Docker(registry_url='<http://037961805145.dkr.ecr.ap-southeast-2.amazonaws.com/|037961805145.dkr.ecr.ap-southeast-2.amazonaws.com/>',
image_name='prefect-etl-flow',
image_tag='latest',
dockerfile='Dockerfile.txt')
RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'prefect-fargate-cluster'},
execution_role_arn='arn:aws:iam::037961805145:role/prefect-ecs',
labels=['ecs', 'dev'])
@task
def testfunc():
print("Hello prefect!")
with Flow('prefect-etl-flow', storage=STORAGE, run_config=RUN_CONFIG) as flow:
testfunc()
flow.register('emmi-etl')
Really appreciate the help. Thanks in advance.Haseeb Ahmad
09/28/2021, 11:33 AMwith Flow("impact partners",schedule=schedule) as flow:
#setting default date to today - 1 to get the records from previous day
default_date = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
start_date = Parameter('start_date', default=default_date)
end_date = Parameter('end_date', default=default_date)
records = get_impact_records(start_date,end_date)
impact_partners = rename_columns(records)
upload(data=impact_partners)
if __name__ == "__main__":
main(flow)
i think the default_date section gets defined at compile time rather than run time, So, that is why it is static. And the job get run for 2021-09-23 everyday. What is the best way to make default_date dynamic.
ThanksMatthew Seligson
09/28/2021, 12:24 PMAlejandro A
09/28/2021, 12:24 PMAlejandro A
09/28/2021, 12:26 PMSanil Khurana
09/28/2021, 1:08 PMFailed to load and execute Flow's environment: UnpicklingError("invalid load key, '{'.")
I have configured the roles to have complete S3, ECS access. I have checked the file on S3 as well and it gets uploaded fine, {"flow": "gASVPgk....aFtdlHViLg==", "versions": {"cloudpickle": "2.0.0", "prefect": "0.15.6", "python": "3.7.10"}}
. I think the only place it is messing up is that the Fargate instance is not able to pick up the task properly.
Any idea what I may be missing? Really appreciate the help. Thanks in advance.Alejandro A
09/28/2021, 1:48 PMKevin Weiler
09/28/2021, 1:53 PMLocalDaskExecutor
with threads
. Is there a max parallelization parameter of some kind that I’m unaware of?Samuel Hinton
09/28/2021, 4:12 PMConstantino Schillebeeckx
09/28/2021, 5:54 PMmount_glue_catalog.run(schema_name=schema_name, glue_db=glue_db_name, upstream_tasks=[create_catalog])
where create_catalog
is the return from another task. When I execute this I'm getting
TypeError: mount_glue_catalog() got an unexpected keyword argument 'upstream_tasks'
When I call this from a flow context, the invocation of mount_glue_catalog
doesn't have any issues. Does the interface to running this task somehow change based on my calling it with run() vs from within a flow context?Jeremy Phelps
09/28/2021, 6:28 PMtask(result=OurGCSResult(bucket='our-bucket'))
, where OurGCSResult
is a copied-and-modified version of the GCSResult
class found in Prefect. The difference is that OurGCSResult
is compatible with the old version of the Google Cloud Storage library that our code uses. We would not be able to use the standard GCSResult
without first rewriting a significant portion of our code.
This class works fine for tasks that are not mapped, but something goes wrong for mapped tasks.
I dug around using the GraphQL client and noticed that all task runs with a non-negative map_index
seem to fail to have any storage information:
{
"id": "eba36675-c15a-43ea-ad5b-2540936477b5",
"map_index": 0,
"name": null,
"serialized_state": {
"type": "Success",
"_result": {
"type": "Result",
"location": null, // WAT?!
"__version__": "0.14.22+9.g61192a3ee"
},
"context": {
"tags": []
},
"message": "Task run succeeded.",
"__version__": "0.14.22+9.g61192a3ee",
"cached_inputs": {}
}
}
As far as I can tell from the logs, no errors are happening, and the only possible result of running the task in question is a possibly-empty array being returned, or an exception being thrown (which would be logged).Tony Yun
09/28/2021, 7:06 PMValueError: Local Secret "CRITEO_SECRET_KEEPS" was not found.
But running locally is totally fine:
>>> prefect.client.Secret('CRITEO_SECRET_KEEPS').get()
'1stxxxxg'
Is there any possible reasons? I clearly defined the secret in local config file.Abhas P
09/29/2021, 12:46 AM#flows/smaple_flow.py
@task
def load():
I try to test the flow composition like this :
from flows.sample_flow import load
from flows import sample_flow as flow_file
def test_flow_composition():
load_task = load
flow_tasks = flow_file.flow.tasks
assert flow_file.flow.terminal_tasks() == set([load_task])
# assert load_task in flow_tasks - same things happens here , load_task is clearlya n element of the flow_tasks set but it asserts false
I get this error, which is little weird (I suspect object signatures here :
> assert flow_file.flow.terminal_tasks() == set([load_task])
E assert {<Task: load>} == {<Task: load>}
E Extra items in the left set:
E <Task: load>
E Extra items in the right set:
E <Task: load>
E Full diff:
E {<Task: load>}
Can you help me with this ?Antti Tupamäki
09/29/2021, 7:27 AMclient = docker.from_env()
host_config = client.api.create_host_config(binds=[
'/psql/data/:/data'])
print(host_config)
container_id = CreateContainer(command="cat /data/test",
host_config=host_config)
ended up doing
container_id = CreateContainer(command="cat /data/test", host_config={'binds': ['/psql/data/:/data']}
which works I think that part of documentation needs to update.Ben Collier
09/29/2021, 7:51 AMVarun Joshi
09/29/2021, 8:34 AMHugo Polloli
09/29/2021, 8:40 AMBruno Murino
09/29/2021, 9:47 AMChris L.
09/29/2021, 9:48 AMjob_template.yaml
Is the flow download process:
1. Run on the Kubernetes job spun up by the Agent (and hence takes the environment variables as specified in job_template.yaml
); OR
2. Run on the agent, which then passes the flow metadata to the spun up job?Will
09/29/2021, 10:57 AMEd Morris
09/29/2021, 11:41 AMprefect register
method, during building of the custom docker image on the local machine. I can build the image using the underlying docker command, but only if I use the flag --network=host
in the build call. I have been successful in doing this before, I am using a newly installed VM for the submission, and so I am left wondering if there are default settings required for Docker.
Is there a flag or setting that needs to be added somewhere, either to Docker or to Prefect, that enables the container to access the external network and internet during build time?Koby Kilimnik
09/29/2021, 12:05 PMBarbara Abi Khoriati
09/29/2021, 12:12 PMGOOGLE_APPLICATION_CREDENTIALS
in the environment variables of my ECSRun configuration. But, with that said, should I create an image with a volume that contains my json key file? And then I would use the path of this volume as GOOGLE_APPLICATION_CREDENTIALS
, right? I wonder what's the best practice in this case. Thanks in advance!Alain Prasquier
09/29/2021, 12:17 PM