Greg Kennedy
04/13/2022, 12:08 AMCarlos Cueto
04/13/2022, 2:09 AM<Task: fetch_data>
. This is what I'm trying to do:
@task
def fetch_data():
return {"data": "random data"}
with Flow('Get-Data') as flow:
flow.run_config = LocalRun()
data = fetch_data()
<http://logger.info|logger.info>(data)
flow.run()
Bihag Kashikar
04/13/2022, 5:40 AMLeanna Morinishi
04/13/2022, 6:27 AMJacob Blanco
04/13/2022, 7:01 AMTrung Đô Trần
04/13/2022, 8:24 AMOlivér Atanaszov
04/13/2022, 9:19 AMStephen Lloyd
04/13/2022, 11:04 AMwith Flow(PREFECT_FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG) as flow:
creds = get_credentials()
conn = get_connection(creds)
tables = get_tables()
save_data = load_to_s3.map(tables, conn=unmapped(conn))
conn.set_upstream(creds)
save_data.set_upstream(tables)
save_data.set_upstream(conn)
It’s failing on get_connection
and the relevant code is:
@task
def get_credentials():
return PrefectSecret(PREFECT_FLOW_NAME).run()
@task
def get_connection(creds):
return connectors.get_redshift_connector(creds)
# from another file...
import redshift_connector
def get_redshift_connector(creds: dict) -> object:
conn = redshift_connector.connect(
host=creds['host'],
database=creds['database'],
user=creds['user'],
password=creds['password']
)
return conn
When I move to running in ECS, it fails with the following trace…
Unexpected error: TypeError("cannot pickle 'Struct' object")
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 930, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/results/s3_result.py", line 71, in write
binary_data = new.serializer.serialize(new.value)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/serializers.py", line 73, in serialize
return cloudpickle.dumps(value)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'Struct' object
Malthe Karbo
04/13/2022, 12:21 PM14:17:55.129 | ERROR | Flow run 'arrogant-yak' - Crash detected! Execution was interrupted by an unexpected exception.
followed by prefect.exceptions.Abort: This run has already terminated.
.
This happens regardless of task success/failure. Again, running without cloud this works fineStephen Herron
04/13/2022, 12:40 PMinvokeLambda
task.
I think we need to pass it some extra config through boto_kwargs. Has anyone came across needing to do this before?Tom Klein
04/13/2022, 1:39 PMRunNamespacedJob
as a failure (within the job itself) ? or is it impossible because the command is async?Matthew Seligson
04/13/2022, 2:07 PMTom Klein
04/13/2022, 3:31 PMRunNamespacedJob
example (from https://github.com/anna-geller/packaging-prefect-flows/blob/master/flows_task_library/s3_kubernetes_run_RunNamespacedJob_and_get_logs.py ) --- we implemented it and got it to work, but it seems that it’s now failing on :
VALIDATIONFAIL signal raised: VALIDATIONFAIL('More than one dummy pod')
because there seems to be many pod “resiudes” of previous runs:
['prefect-agent-7745fb9694-6fwk4', 'prefect-job-47d072a8-4pbsf', 'seg-pred-test-cm54l', 'seg-pred-test-doron', 'seg-pred-test-l2j5l', 'seg-pred-test-zvwld']
so wouldn’t k8s keep the pods around given that we gave a “delete_job_after_completion” = False ? and even if the job is deleted successfully, wouldn’t it keep the pods around? or are the pods supposed to be deleted automatically if the job is deleted…?David Yang
04/13/2022, 3:49 PMChris Reuter
04/13/2022, 4:00 PMKevin Mullins
04/13/2022, 5:02 PMAzureResult
to store task results for my flows. I’ve already got separate storage accounts per environment (dev, qa, prod). I’m curious if it would be recommended to use separate containers for different Prefect projects and/or flows or if it is ok to just store all the results in the same blob container for the environment.
My hesitation for further separation is it appears that the AzureResult requires the container to already exist, so I would need to orchestrate something creating containers for each Prefect project. Not a big deal but just trying to get a feel for good practices.
Any thoughts appreciated.Matt Alhonte
04/13/2022, 6:39 PMChris Reuter
04/13/2022, 6:45 PMZach Munro
04/13/2022, 7:03 PMZach Munro
04/13/2022, 7:03 PMZach Munro
04/13/2022, 7:13 PMRajan Subramanian
04/13/2022, 10:15 PMtemporary failure in name resolution
I'm not even sure whom to ask this, but since i was using prefect i figured someone here knows about this.
I raised it here, https://stackoverflow.com/questions/71864208/unable-to-connect-to-redis-elasticcache-from-fargate, curious if someone had any suggestions? My fargate profile has same 4 subnets that my cluster in elastic cache has. they also have the same security group.Apoorva Desai
04/14/2022, 12:20 AMAlexander Butler
04/14/2022, 1:57 AM- name: elt-salesforce
flow_location: ./salesforce_flows.py
flow_name: elt-salesforce
tags:
- salesforce
- core
parameters:
destination: "gcp"
schedule:
interval: 3600
Assuming interval is seconds? Can I specify another grain? Can schedule take a dict? If it takes cron, does that take a dict?
Honestly schedule is the primary question point. Everything else is straightforward enough.Salim Doost
04/14/2022, 2:53 AM404 Client Error for <http+docker://localhost/v1.41/containers/create?name=quantum-squid>: Not Found ("No such image: <account-id>.<http://dkr.ecr.ap-northeast-1.amazonaws.com/datascience-prefect:<image-tag-name>%22|dkr.ecr.ap-northeast-1.amazonaws.com/datascience-prefect:<image-tag-name>">)
However, we’re able to confirm that the image with this tag exists on EMR.
Updating an existing flow by overriding an existing image-tag leads to the following error:
KeyError: 'Task slug <task-name> is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.
- Did you change the flow without re-registering it?
- Did you register the flow without updating it in your storage location (if applicable)?'
Again, we’re able to confirm in AWS ECR that the image got pushed and updated successfully. Our deployment job didn’t throw any error messages as well.
Any idea what we can do to resolve this issue?Carlos Cueto
04/14/2022, 4:27 AMLocalRun
flow's working_dir
parameter. Whenever I specify the following:
flow.run_config = LocalRun(_working_dir_='C:/scripts/GetADUsers', _labels_=["SVRBIPTH01"])
Whenever I register the flow (I'm using Prefect 1.2.0 on MacOS python 3.10) I get the following working_dir on the UI of Prefect Cloud:
/Users/carloscueto/Documents/Python_Scripts/Prefect-Flows/PowerShell/GetADUsers/C:/scripts/GetADUsers
It seems to be adding the path from where I register the script from (on the local machine) to the working_dir string I specified on the run_config.
Has anybody encountered this before? Everything works fine when I register the flow from a Windows computer.Alexander Butler
04/14/2022, 6:13 AMprefect orion start
prefect deployment create ...
prefect work-queue create -t etl -- etl-queue
HERE IS THE GAP -- the response to the above command is something like UUID('...')
, which is useless when setting something up from the CLI without sed/awk?
prefect agent start 'no simple headless way to derive id...'
The less appealing part afterwards is that prefect work-queue ls
renders a table which is pretty in a CLI but useless again to simply get an ID.
Has anyone set up Prefect 2.0 to self deploy in an image along with all their code? The ephemeral nature makes this very advantageous with what seems to be a tiny unconsidered gap.
I am pretty sure a more reliable consistent way to get work queue is all thats needed basically, but if I am totally missing it just lmk.
I am a big fan of the package for the record but now its crunch time production use attempts 🙂Jacob Blanco
04/14/2022, 6:32 AMStéphanie Cérulli
04/14/2022, 6:43 AMStéphanie Cérulli
04/14/2022, 6:44 AM2022-04-14T06:42:11.384990+00:00 - - urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Max retries exceeded with url: / (Caused by ReadTimeoutError("HTTPSConnectionPool(host='<http://api.prefect.io|api.prefect.io>', port=443): Read timed out. (read timeout=15)"))