Aqib Fayyaz
11/16/2021, 8:25 PMMax Kolasinski
11/16/2021, 9:07 PMVamsi Reddy
11/16/2021, 9:26 PMbucketname/dev
and bucketname/prod
and have our dev and prod flows stored accordingly. i am using storage=S3(bucket="bucketname")
but would like to have something like storage=S3(bucket="bucketname/dev")
. Any idea how to do this?Alexander Chen
11/16/2021, 9:42 PMCarlo
11/16/2021, 9:46 PMGabriel Milan
11/16/2021, 10:08 PMJason Motley
11/16/2021, 11:21 PMmysqlclient
Anaconda package won't run on my version of Python 3.8.8 but when I created a second conda environment on 3.7 I was told that it had a compatibility error with Prefect. Are there alternative ways to connect to a MySQL db using secrets & SSL args?. The specific error is: Error during execution of task: ModuleNotFoundError("No module named 'MySQLdb'")
Billy McMonagle
11/17/2021, 12:09 AMSridhar
11/17/2021, 12:38 AM# specify a base image
FROM python:3.8-slim
# copy all folder contents to the image
COPY . .
# install all dependencies
RUN apt-get update && apt-get -y install libpq-dev gcc && pip install psycopg2
RUN pip install -r requirements.txt
My understanding is COPY . .
should copy all the files required to run the flow into the image. But I'm getting an error saying no module found (image attached).
Also Here's my STORAGE and RUN_CONFIG
STORAGE = Docker(registry_url='<http://aws_id.dkr.ecr.region.amazonaws.com/|aws_id.dkr.ecr.region.amazonaws.com/>',
image_name='name',
image_tag='tag',
dockerfile='Dockerfile')
RUN_CONFIG = ECSRun(run_task_kwargs={'cluster': 'cluster-name'},
execution_role_arn='arn:aws:iam::aws_id:role/role',
labels=['dev-modelling', 'flow-test'])
Am I missing something?? Really appreciate the help. Thanks in advance!!Florian Boucault
11/17/2021, 9:29 AMAsmita
11/17/2021, 11:44 AMFina Silva-Santisteban
11/17/2021, 2:17 PMflow.set_dependencies(
upstream_tasks=[A],
task=B,
keyword_tasks=dict(item=A)
)
Do I need to explicitly set a Task as an upstream_task
when I already have it as a keyword_task
? I’ve read that one is to set state dependencies and the other is to set data dependencies. Since a data dependency is implicitly a state dependency, having task B
as keyword_task should be enough, right?Andreas Tsangarides
11/17/2021, 3:58 PMtask_1 -> task_2 -> [task_a, task_b] # i.e. task_a and task_b are independent of each other but share the same dependencies
if we define tasks as such:
task_1: [a, b]
task_2: [a, b]
task_a: [a]
task_b: [b]
Is there anything like:
prefect run --tags=a
I am really asking for functionalities I was using with kedro
here :pVamsi Reddy
11/17/2021, 4:44 PMLeon Kozlowski
11/17/2021, 4:44 PMRyan Brideau
11/17/2021, 5:11 PM.run()
method in a loop, though, it only the first runs and seems to block before the others run. Is there a way around this?Jacob Warwick
11/17/2021, 5:35 PMFrank Oplinger
11/17/2021, 7:02 PMdef fargate_cluster(n_workers=4):
"""Start a fargate cluster using the same image as the flow run"""
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
class LeoFlow(PrefectFlow):
def generate_flow(self):
with Flow(name=self.name, storage=S3(bucket="raptormaps-prefect-flows")) as flow:
...
flow.executor = DaskExecutor(
cluster_class=fargate_cluster,
cluster_kwargs={"n_workers": 4}
)
return flow
In the dockerfile for the image that I’m specifying in the ECSRun, I have included the following line to install dask-cloudprovider:
RUN pip install dask-cloudprovider[aws]
However, when I execute the flow, I am hitting the following error:
Unexpected error: AttributeError("module 'aiobotocore' has no attribute 'get_session'",)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/prefect/engine/flow_runner.py", line 442, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.6/contextlib.py", line 81, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.6/site-packages/prefect/executors/dask.py", line 238, in start
with self.cluster_class(**self.cluster_kwargs) as cluster:
File "/rprefect/leo_flow.py", line 58, in fargate_cluster
return FargateCluster(n_workers=n_workers, image=prefect.context.image)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 1361, in __init__
super().__init__(fargate_scheduler=True, fargate_workers=True, **kwargs)
File "/usr/local/lib/python3.6/site-packages/dask_cloudprovider/aws/ecs.py", line 726, in __init__
self.session = aiobotocore.get_session()
AttributeError: module 'aiobotocore' has no attribute 'get_session'
Is there a specific version of dask_cloudprovider that Prefect requires?Tom Tom
11/17/2021, 7:15 PMUnexpected error: TypeError("write() got multiple values for argument 'self'")
Traceback (most recent call last):
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "c:\users\tom\anaconda3\envs\part1_end_to_end_ml_model\lib\site-packages\prefect\engine\task_runner.py", line 926, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
TypeError: write() got multiple values for argument 'self'
It fails by executing this function:
def load_data_task(self):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(os.getcwd())
... do something ...
return self.load_data()
Regarding to this issue (https://github.com/PrefectHQ/prefect/issues/3034), it fails because write()
function also got "self" input. But i cant delete "self" input in function load_data_task
because it has to load function load_data
.
If i run the flow on local machine without server and agent, it works.Arfa
11/17/2021, 9:12 PMMartim Lobao
11/17/2021, 9:44 PMAbhas P
11/17/2021, 10:50 PMTom Shaffner
11/17/2021, 11:23 PMJason Motley
11/18/2021, 12:22 AM- cloudpickle: (flow built with '1.6.0', currently running with '2.0.0')\n - prefect: (flow built with '0.15.6', currently running with '0.15.9')\n - python: (flow built with '3.8.8', currently running with '3.7.11')")
Jacob Blanco
11/18/2021, 9:13 AMconfig.toml
?Ramtin
11/18/2021, 9:42 AMGabriel Milan
11/18/2021, 10:22 AMagniva
11/18/2021, 12:07 PMfailed to get destination image "xxx": image with reference xxx was found but does not match the specified platform: wanted linux/amd64, actual: linux/arm64/v8
I have build args as: build_kwargs={"buildargs": {"PYTHON_VERSION": "3.8.12"}, "platform": ["linux/amd64"]},
can anybody please help me 🙏?Nacho Rodriguez
11/18/2021, 12:24 PMPiyush Bassi
11/18/2021, 1:06 PMPiyush Bassi
11/18/2021, 1:06 PMAnna Geller
11/18/2021, 1:12 PM