David Kuda
12/04/2020, 9:58 PMprefect register flow -f flows/my_flow.py -p MyProject
This results in this error:
ValueError: Flow could not be deserialized successfully. Error was: ValidationError({'storage': {'ref': ['Field may not be null.']}})
Best wishes from Berlin!Riley Hun
12/05/2020, 9:35 PMprefect agent kubernetes start
from outside the cluster w/in my own local machine's CLI, the error goes away and the flow runs successfully. Some insight on this would be much appreciated. I can confirm that the Prefect version of the deployed agent is correct too.
Here's the error I'm getting from the using built-in agent that comes with the helm chart deployment:
[2020-12-05 10:30:03+0000] ERROR - prefect.CloudTaskRunner | Failed to set task state with error: ConnectionError(MaxRetryError("HTTPConnectionPool(host='prefect-server-gke-apollo.default', port=4200): Max retries exceeded with url: /graphql//graphql (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f5a560d3690>: Failed to establish a new connection: [Errno -2] Name or service not known'))"))
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/urllib3/connection.py", line 157, in _new_conn
(self._dns_host, self.port), self.timeout, **extra_kw
File "/opt/conda/lib/python3.7/site-packages/urllib3/util/connection.py", line 61, in create_connection
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
File "/opt/conda/lib/python3.7/socket.py", line 752, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
Von Christian Pizarro
12/07/2020, 4:05 AMKostas Chalikias
12/07/2020, 10:26 AMAdam
12/07/2020, 2:28 PMFailed to set task state with error: HTTPError('413 Client Error: Request Entity Too Large for url: <https://api.prefect.io/graphql>')
— whats the best way to debug this?Charles Lariviere
12/07/2020, 3:52 PMprefect agent fargate start -t $PREFECT_TOKEN
locally which appeared to work (it successfully started and was waiting for flows), but;
1. It never received any flows triggered from Prefect Cloud
2. This seems to be tied to my local terminal session (i.e. if I close my terminal, the agent shuts down)
3. I can’t see where this is actually deployed — I was expecting a new cluster to be created in AWS ECS, but that’s not the case
Am I misunderstanding how this is supposed to work? If so, how can we go about deploying an always-running agent that’s ready to deploy new Fargate tasks whenever a flow is scheduled? Or is this not how Prefect is supposed to work either?Brian Mesick
12/07/2020, 4:27 PMSean Talia
12/07/2020, 5:42 PMDocker Task
is; it seems like if you want to string together a flow that pulls an arbitrary set of docker images and runs them as containers in some sequence, the canonical way to do this is to make each instance of pulling + running a container its own Flow
(that leverages docker storage
), and then to orchestrate a flow of flows. Given this, in what circumstance would you actually want to use a Docker Task? On the face of it, if you wanted to use one, it seems like you'd have to use some form of storage other than Docker for the task's parent flow to avoid a Docker-in-Docker situation; and I think not being able to use Docker Storage also implies that you wouldn't be able to use a Docker Agent for executing that flow + task? Does that sound right, or am I confusing the intended relationship between agent <-> flow <-> task?Richard Hughes
12/07/2020, 6:01 PMliren zhang
12/07/2020, 7:16 PMGary Levin
12/07/2020, 9:44 PMGary Levin
12/07/2020, 9:54 PMwith Flow('Arithmetic') as flow:
x, y = Parameter('x'), Parameter('y')
operations = {
'+': x + y,
'-': x - y,
'*': x * y,
'/': x / y
}
resulted in operations containing
{'+': <Task: Add>, '-': <Task: Sub>, '*': <Task: Mul>, '/': <Task: Div>}
That is not what python would normally produce for the dict. "x+y" is coerced to <Task: Add>. "+" must be overridden in the Flow context.
Where will I find the description of what magic is going on here? For instance, which operators are overridden?Gary Levin
12/07/2020, 10:01 PM+
is defined as something like this in the Flow Context:
lambda(x:Parameter, y:Parameter) Task("Add". lambda()(x.value + y.value))
Pedro Machado
12/08/2020, 3:58 AMPedro Machado
12/08/2020, 5:46 AMsimone
12/08/2020, 12:41 PMHTCondor
as workload manager and using a DaskExecutor
connected to a cluster started with dask jobqueue
.
I am mapping a function over a large number of images (20000). I started running a subset of data (5000) using a predefine number of workers and the function is mapped and run correctly using all the workers available.
If the cluster is started in adaptive mode using dask jobqueue
prefect is able to increase the number of processes run as expected by the adaptive mode and monitored in the workload manager however the size of the dask cluster doesn't change and the function isn't mapped, not even to the minimum number of workers that are predefined in the cluster. Interestingly HTCondor
allocates new running processing but it seems to be independent from the dask cluster. It seems that the dask jobqueue
initialised cluster cannot communicate with the processes started by prefect. After few minutes the processes started by prefect die printing out this error:
distributed.scheduler - ERROR - Workers don't have promised key: ['<tcp://192.168.0.2:40093>']
# the tcp address change
Any help will be greatly appreciated! Thanks!Mark McDonald
12/08/2020, 3:16 PMliren zhang
12/08/2020, 3:35 PMZach
12/08/2020, 6:29 PMZach
12/08/2020, 6:39 PMSergey Gerasimov
12/08/2020, 6:58 PMEmil B
12/08/2020, 7:39 PMDJ Erraballi
12/08/2020, 10:07 PMwith SRMFlow(
'TriggeredPatientLoaderFlow',
feature_flag='patient_upsert_events_enabled'
) as trigger_patient_loader_flow:
client_abbrev: str = Parameter("client_abbrev", default="")
event_source_id: int = Parameter("event_source_id", default=None)
lock_acquired: bool = acquire_event_source_lock(client_abbrev, event_source_id)
linked: bool = link_patient_events(client_abbrev, event_source_id, lock_acquired)
aggregated_patients: List[PersonInputProperties] = aggregate_patients(client_abbrev, linked, event_source_id)
bulk_activity_metadata: dict = get_bulk_activity_metadata(client_abbrev, aggregated_patients)
loaded: bool = load_aggregated_patients(client_abbrev, aggregated_patients, bulk_activity_metadata)
marked_complete: bool = mark_event_source_as_completed(client_abbrev, event_source_id, loaded, lock_acquired)
lock_released: bool = release_event_source_lock(client_abbrev, event_source_id, marked_complete, lock_acquired)
jack
12/09/2020, 12:00 AMEric
12/09/2020, 1:28 AMmutation {
update_flow (_set: {name: "After Modified"}, where: {_and: {project_id: {_eq:"ea8106f2-1642-4e54-955a-3af1d7c7465e"}, name: {_eq: "Ori Flow Name"}} }){
affected_rows
}
}
it returned success and affected_rows=5 (5 different versions), but when I click 'start now' to create a new flow-run, it shows that:
Last State Message
[9 Dec 2020 9:26am]: Failed to load and execute Flow's environment: KeyError('After Modified')
Is there any step I missed? I just want to modify the flow name after I registered the flow. Thank you🙏Hamed Sheykhlou
12/09/2020, 8:48 AMimport requests
from prefect import Flow, task
from prefect.engine.results import SecretResult
@task(name='extract_task')
def extract():
res = requests.get('<https://jsonplaceholder.typicode.com/todos/1>')
res_txt = res.text
print(res_txt)
return res_txt
@task()
def load():
"""Print the data to indicate it was received"""
print(SECRET_RESULT.location)
res_txt = SECRET_RESULT.read('secret1')
print("Here's your data: {}".format(res_txt))
with Flow("Callback-Example") as flow:
SECRET_RESULT = SecretResult(extract, location='somewhere.json')
e = extract()
l = load()
l.set_upstream(e)
flow.run()
but when I run it, gave some error on prefect source:
File "/home/hamed/PycharmProjects/etlprefectcloud/venv/lib/python3.8/site-packages/prefect/engine/results/secret_result.py", line 38, in read
new.value = self.secret_task.run(name=location)
TypeError: extract() got an unexpected keyword argument 'name'
its on prefect/engine/results/secret_result.py
line 38: new.value = self.secret_task.run(name=location)
. and if I delete the name argumans, it works.
am I using the SecretResult wrong way?Sébastien
12/09/2020, 11:46 AMtask2.set_upstream(task1)
to manually create a dependency, but that gives me:
/usr/local/lib/python3.7/dist-packages/prefect/core/task.py:596: UserWarning: You are making a copy of a task that has dependencies on or to other tasks in the active flow context. The copy will not retain those dependencies.
What's the right way to say (in functional API) that task2 can only run after task1 was completed, without messing up the flow?
The entire flow:
schedule = CronSchedule("0 * * * *", start_date=datetime.now())
with Flow("Q", schedule=schedule) as flow:
task2.set_upstream(task1)
task1()
some_result = task2()
task3(some_result)
Julio Venegas
12/09/2020, 1:15 PMprefect agent kubernetes start
would start a Kubernetes agent in the environment where Prefect is installed but prefect agent install kubernetes
installs the agent in the Kubernetes cluster right?
Having diagrams for these interactions would make it more intuitive to read the docs.Marc Lipoff
12/09/2020, 2:07 PMs3_client = boto3.client('s3')
with requests.request("GET", url, stream=True) as r:
r.raise_for_status()
for chunk in r:
s3_client.put_object(Body=chunk,
Bucket='xxx',
Key='xxx')
Dotan Asselmann
12/09/2020, 2:09 PM